RocketMQ消息为什么会被重复消费?
发布时间 - 2025-07-12 00:00:00 点击率:次在使用rocketmq时,消息可能会被重复消费。让我们从全局视角探讨消息发送和消费的过程。rocketmq-dashboard是一个非常实用的图形化界面工具。
首先,我们在RocketMQ-Dashboard上创建一个topic,每个topic下有4个队列。
每个topic是一类消息的集合,topic下再细分queue是为了提高消息消费的并发度。
「当producer发送topic消息时,应该发送到topic下的哪个queue呢?」
producer会采用轮询策略来发送消息。
「那么consumer应该消费哪个queue下的消息呢?」
当只有一个消费者时,它会消费所有queue中的消息。
「如果有多个消费者呢?」
只需根据各种负载均衡策略将队列分配给消费者即可,如下图展示了两种负载均衡方式。
你问我这两种负载策略是如何实现的?去查看源码吧,我就不详细分析了。
「如果消费者数量超过队列的数量会发生什么?」
多余的消费者将不会消费任何队列。
「为什么一个consumer只能消费一个queue呢?」
多个消费者消费一个queue肯定会有并发问题,所以需要加锁,这样还不如将topic下的队列数量设置得更多一些。
「我在运行过程中可以设置topic下queue的数量吗?」
当然可以。不仅可以重新设置queue的数量,还可以实时增减consumer,以应对不同流量的场景。
「那这样说当queue或者consumer的数量发生变化的时候,需要重新执行负载均衡吧?」
是的,大家一般把这个过程称为重平衡。
下面我们来分享一下详细的细节。
消息发送流程主要有三种方式:单向发送(只发送,不管结果)、同步发送和异步发送。
消息消
费流程基于推还是拉?消息消费的模式有两种方式:
拉取:Consumer不断从Broker拉取。 推送:Broker向Consumer推送。
这两种方式都有各自的缺点:
拉取:拉取的间隔不好确定,间隔太短没消息时会造成带宽浪费,间隔太长又会造成消息不能及时被消费。 推送:「推送和速率难以适配消费速率」,推的太快,消费者消费不过来怎么办?推的太慢消息不能及时被消费。
「看起来拉取和推送难以抉择。」
然后就有大佬把拉取模式改了一下,即不会造成带宽浪费,也能基于消费的速率来决定拉取的频率!
「你猜怎么改的?」
其实很简单,Consumer发送拉取请求到Broker端,如果Broker有数据则返回,Consumer端再次拉取。如果Broker端没有数据,不立即返回,而是等待一段时间(例如5s)。
如果在等待的这段时间,有要拉取的消息,则将消息返回,Consumer端再次拉取。如果等待超时,也会直接返回,不会将这个请求一直hold住,Consumer端再次拉取。「对了,这种策略就叫做长轮询。」
「RocketMQ中有拉和推两种消费方式,但是推是基于长轮询做的。」
具体消费流程如下图所示。
「拉取到消息后是怎么处理的呢?」
PullRequest类的成员变量如下图所示。
当拉取到消息后,消息会被放入msgTreeMap,其中key为消息的offset,value为消息实体。
「另外还有一个重要的属性dropped,和重平衡相关,重平衡的时候会造成消息的重复消费,具体机制不分析了,看专栏吧。」
msgCount(未消费消息总数)和msgSize(未消费消息大小)是和流控相关的。
「什么是流控呢?」
就是流量控制,当消费者消费的比较慢时,减缓拉取的速度。如下图所示。
当从阻塞队列中获取PullRequest时,并不会直接发起网络请求,而是先看看是否触发流控的规则,比如未消费的消息总数超过一定值,未消费的消息大小超过一定值等。
接着就是收到响应,处理消息,并将PullRequest再次放入阻塞队列。
「是不是落了一个步骤?就是Consumer告诉Broker这部分消息我消费了?」
嗯嗯,你是不是以为提交offset的过程是同步的?其实并不是,「是异步的。」
Consumer怎么提交offset?
当consumer消费完消息只是将offset存在本地,通过定时任务将offset提交到broker,另外broker收到提交offset的请求后,也仅仅是将offset存在map中,通过定时任务持久化到文件中。
「这样就会造成消息的重复消费。」
Consumer消费完消息并不是实时同步到Broker的,而是将offset先保存在本地map中,通过定时任务持久化上去。这就导致消息被消费了,但是此时消费者宕机了导致offset没提交,下次没提交offset的这部分消息会被再次消费。
即使offset被提交到了Broker,在还没来得及持久化的时候Broker宕机了,当重启的时候Broker会读取consumerOffset.json中保存的offset信息,这就会导致没持久化offset的这部分消息会被再次消费。
# node.js
# 工具
# 为什么
# json
# 成员变量
# map
# 并发
# 异步
# rocketmq
# 负载均衡
# 两种
# 如下图
# 这部
# 所示
# 多个
# 大佬
# 定值
# 我就
# 就会
相关栏目:
【
网站优化151355 】
【
网络推广146373 】
【
网络技术251813 】
【
AI营销90571 】
相关推荐:
网站制作软件有哪些,制图软件有哪些?
Laravel怎么实现模型属性转换Casting_Laravel自动将JSON字段转为数组【技巧】
JS中使用new Date(str)创建时间对象不兼容firefox和ie的解决方法(两种)
Laravel如何使用Spatie Media Library_Laravel图片上传管理与缩略图生成【步骤】
如何解决hover在ie6中的兼容性问题
网站优化排名时,需要考虑哪些问题呢?
googleplay官方入口在哪里_Google Play官方商店快速入口指南
Laravel怎么使用Collection集合方法_Laravel数组操作高级函数pluck与map【手册】
浅述节点的创建及常见功能的实现
Swift中循环语句中的转移语句 break 和 continue
如何快速打造个性化非模板自助建站?
HTML透明颜色代码怎么让下拉菜单透明_下拉菜单透明背景指南【技巧】
如何获取PHP WAP自助建站系统源码?
Laravel如何使用Service Container和依赖注入?(代码示例)
如何确认建站备案号应放置的具体位置?
Laravel如何实现API资源集合?(Resource Collection教程)
Laravel如何实现邮箱地址验证功能_Laravel邮件验证流程与配置
Linux安全能力提升路径_长期防护思维说明【指导】
Laravel如何实现多表关联模型定义_Laravel多对多关系及中间表数据存取【方法】
如何快速搭建高效WAP手机网站?
Laravel如何使用Blade模板引擎?(完整语法和示例)
Android中Textview和图片同行显示(文字超出用省略号,图片自动靠右边)
佛山网站制作系统,佛山企业变更地址网上办理步骤?
Python自动化办公教程_ExcelWordPDF批量处理案例
Android Socket接口实现即时通讯实例代码
Laravel storage目录权限问题_Laravel文件写入权限设置
java ZXing生成二维码及条码实例分享
Laravel如何正确地在控制器和模型之间分配逻辑_Laravel代码职责分离与架构建议
制作公司内部网站有哪些,内网如何建网站?
Laravel如何配置.env文件管理环境变量_Laravel环境变量使用与安全管理
如何快速搭建虚拟主机网站?新手必看指南
微信小程序 wx.uploadFile无法上传解决办法
如何用PHP快速搭建CMS系统?
如何在橙子建站上传落地页?操作指南详解
如何在阿里云通过域名搭建网站?
如何基于PHP生成高效IDC网络公司建站源码?
Win11搜索栏无法输入_解决Win11开始菜单搜索没反应问题【技巧】
HTML透明颜色代码在Angular里怎么设置_Angular透明颜色使用指南【详解】
Laravel中间件如何使用_Laravel自定义中间件实现权限控制
Laravel路由怎么定义_Laravel核心路由系统完全入门指南
微信小程序 HTTPS报错整理常见问题及解决方案
制作无缝贴图网站有哪些,3dmax无缝贴图怎么调?
JS中页面与页面之间超链接跳转中文乱码问题的解决办法
中山网站制作网页,中山新生登记系统登记流程?
Laravel如何监控和管理失败的队列任务_Laravel失败任务处理与监控
油猴 教程,油猴搜脚本为什么会网页无法显示?
Win10如何卸载预装Edge扩展_Win10卸载Edge扩展教程【方法】
Android仿QQ列表左滑删除操作
如何破解联通资金短缺导致的基站建设难题?
焦点电影公司作品,电影焦点结局是什么?

