RocketMQ 4.x 入门教程之五
大纲
- RocketMQ 4.x 入门教程之一、RocketMQ 4.x 入门教程之二、RocketMQ 4.x 入门教程之三
- RocketMQ 4.x 入门教程之四、RocketMQ 4.x 入门教程之五、RocketMQ 4.x 入门教程之六
- RocketMQ 4.x 入门教程之七、RocketMQ 4.x 入门教程之八、RocketMQ 4.x 入门教程之九
前言
学习资源
版本说明
本文所使用的各软件版本如下表所示:
| 组件 | 版本 | 说明 |
|---|---|---|
| JDK | 11 | Java、SpringBoot、SpringCloud 项目案例使用的 JDK 版本 |
| RocketMQ Server | 4.9.0 | RocketMQ 服务端(包括 NameServer 和 Broker),运行时依赖 JDK 1.8+ |
RocketMQ 的工作原理
消息的消费
订阅关系的一致性
订阅关系的一致性指的是:在一个 Consumer Group(消费者组)中,所有 Consumer 实例订阅的 Topic、Tag 以及消息处理逻辑必须完全一致;否则会导致消息消费逻辑混乱,甚至可能造成消息丢失。
正确的订阅关系
多个 Consumer Group(消费者组)同时订阅了多个 Topic,并且每个 Consumer Group 内的多个 Consumer 实例的订阅关系保持一致(Topic + Tag 语义一致)。

错误的订阅关系
一个 Consumer Group(消费者组)同时订阅了一个或多个 Topic,并且该 Consumer Group 里的多个 Consumer 实例的订阅关系并没有保持一致(Topic + Tag 语义一致)。

订阅了不同的 Topic
在此案例中,错误的地方在于:同一个 Consumer Group(消费者组)内的两个 Consumer 实例订阅了不同的 Topic。
- Consumer 实例 1-1:(订阅的 Topic 为
jodie_test_A,Tag 为所有的消息)
1 | Properties properties = new Properties(); |
- Consumer 实例 1-2:(订阅的 Topic 为
jodie_test_B,Tag 为所有的消息)
1 | Properties properties = new Properties(); |
订阅了不同的 Tag
在此案例中,错误的地方在于:同一个 Consumer Group(消费者组)内的两个 Consumer 实例订阅了相同 Tipic 的不同 Tag。
- Consumer 实例 2-1:(订阅的 Topic 为
jodie_test_A,Tag 为tagA的消息)
1 | Properties properties = new Properties(); |
- Consumer 实例 2-2:(订阅的 Topic 为
jodie_test_A,Tag 为所有的消息)
1 | Properties properties = new Properties(); |
订阅了不同数量的 Topic
在此案例中,错误的地方在于:同一个 Consumer Group(消费者组)中的两个 Consumer 实例订阅了不同数量的 Topic。
- Consumer 实例 3-1:(订阅了 2 个 Topic)
1 | Properties properties = new Properties(); |
- Consumer 实例 3-2:(订阅了 1 个 Topic)
1 | Properties properties = new Properties(); |
消费进度(offset)
消费进度(offset)是用来记录每个 Queue 在不同 Consumer Group(消费者组)中的消费进度。
offset 的存储方式
RocketMQ 在不同的消费模式下,Queue 的消费进度(offset)的存储方式是不同的。根据消费进度记录器的不同,可以分为两种存储方式:本地存储方式和远程存储方式。
- 广播消费(Broadcasting)模式:消费进度(offset)存储在 Consumer 端
- 集群消费(Clustering)模式:消费进度(offset)存储在 Broker 端
本地存储方式
在 RocketMQ 的广播消费(Broadcasting)模式下,消费进度(offset)采用本地存储方式
- 同一个 Consumer Group(消费者组)内的每个 Consumer 实例都会独立消费同一个 Topic 的全部消息
- 每一条消息都会被该 Consumer Group 中的所有 Consumer 实例各自消费一次
- 各个消费者之间的消费进度(offset)互不影响,也不存在共享或交集,所以由每个 Consumer 实例独立维护自己的消费进度(offset)
在 RocketMQ 的广播消费(Broadcasting)模式下,消费进度(offset)存储在 Consumer 端,并以 JSON 形式持久化到 Consumer 实例一侧的磁盘文件中
- 默认的存储路径为:
~/.rocketmq_offsets/${clientId}/${group}/Offsets.json ${clientId}表示当前 Consumer 实例的 ID(默认为ip@DEFAULT)${group}表示 Consumer Group 的名称
- 默认的存储路径为:
远程存储方式
在 RocketMQ 的集群消费(Clustering)模式下,消费进度(offset)采用远程存储方式
- 同一个 Consumer Group(消费者组)内的多个 Consumer 实例会共同分摊消费同一个 Topic 下多个 Queue 中的消息
- 每条消息在同一个 Consumer Group 内只会被其中一个 Consumer 实例消费
- 每个 Queue 的消费进度(offset)在 Consumer Group 内共享维护,从而保证负载均衡消费语义,所以统一由 Broker 管理消费进度(offset)
在 RocketMQ 的集群消费(Clustering)模式下,消费进度(offset)存储在 Broker 端,并以 JSON 形式持久化到 Broker 一侧的磁盘文件中
- 默认的存储路径为
~/store/config/consumerOffset.json - Broker 启动时会加载该 JSON 文件,并构建为一个双层 Map 结构(
ConsumerOffsetManager)进行内存维护- 外层 Key:Topic 与 Consumer Group 的组合(
topic@group) - 外层 Value:一个内层 Map(
Map<QueueId, Offset>)- 内层 Map Key:QueueId
- 内层 Map Value:该 Queue 的消费进度(offset)
- 外层 Key:Topic 与 Consumer Group 的组合(
- 默认的存储路径为
在 RocketMQ 的集群消费(Clustering)模式下,Broker 中存储的各个 Topic 下不同 Queue 的消费进度(offset)情况如下:
1 | # cat ~/store/config/consumerOffset.json |
为什么在集群消费模式下,消费进度(offset)要存储在 Broker 端
在 RocketMQ 的集群消费模式下,消费进度(offset)统一存储在 Broker 端,主要是为了支持 Rebalance(再均衡)机制。当发生 Rebalance 时,Consumer Group 中新分配到 Queue 的 Consumer 实例,会从 Broker 获取对应的消费进度(由内部的双层 Map 结构 ConsumerOffsetManager 维护),从而能够从上次消费的位置继续消费。这样可以保证在多个 Consumer 实例之间实现一致的消费语义,并确保在 Rebalance 之后仍能进行连续、不中断的消息消费。
offset 的提交方式
在 RocketMQ 中,消费进度(offfset)有以下两种提交方式,默认采用异步提交方式(可能会导致消息重复消费问题)。
同步提交(Sync Commit)
- Consumer 实例在消费完一批消息后,将消费进度(offset)提交给 Broker
- 需要等待 Broker 返回成功 ACK 后,才会继续拉取并消费下一批消息
- 在等待 ACK 响应的期间,Consumer 实例处于阻塞状态
异步提交(Async Commit)
- Consumer 实例在消费完一批消息后,将消费进度(offset)提交给 Broker
- 不需要等待 Broker 返回 ACK,即可继续拉取并消费下一批消息
- 吞吐量更高,但可能带来一定程度的重复消费问题
offset 的核心作用
这里以消费进度(offset)的远程存储方式(集群消费模式)举例,介绍消费进度(offset)的核心作用。
- 消费者首次消费时,起始位置由用户通过
consumer.setConsumeFromWhere()方法指定,该方法基于枚举类型ConsumeFromWhere设置常见的三种起始位置,如下:CONSUME_FROM_FIRST_OFFSET:从当前队列(Queue)的第一条消息开始消费(最早);CONSUME_FROM_LAST_OFFSET:从当前队列(Queue)的最后一条消息开始消费(最新);CONSUME_FROM_TIMESTAMP:从指定时间戳对应位置的消息开始消费(该时间戳需通过额外配置进行指定,比如consumer.setConsumeTimestamp("20210701080000"))。
- 在 Consumer 启动后,会按照设定的起始位置开始消费第一条消息。随着消息的不断消费,Consumer 会在每次消费完一批消息后,将当前消费进度(offset)提交给 Broker。
- Broker 在接收到消费进度(offset)后,会将其更新到内存中的双层 Map(
ConsumerOffsetManager)以及持久化文件consumerOffset.json中。随后,Broker 会向该 Consumer 返回 ACK 响应。 - 该 ACK 响应中包含三项关键数据:当前消费队列的最小 offset(
minOffset)、最大 offset(maxOffset),以及下一次消费的起始 offset(nextBeginOffset)。
集群消费模式下的消费闭环
消费者 → 按当前 offset 拉取消息 → 处理消息 → 提交新 offset → 再从新 offset 继续拉取。
重试队列的概述
- 当出现需要进行重试消费的消息时,Broker 会自动为每个消费组创建一个 Topic 名称为
%RETRY%{consumerGroup}的重试队列。 - 消费失败的消息会被投递到该重试队列中,并按照一定的延迟策略再次投递和消费,从而实现消息的重试机制。
- 只有出现需要进行重试消费的消息时,Broker 才会为对应的消费者组创建重试队列。这个重试队列是针对消费者组的,而不是针对每个 Topic 创建的(由于一个 Topic 的消息可以被多个消费者组进行消费,所以会为这些消费者组各创建一个重试队列)。

消息的清理机制
思考问题
在 RocketMQ 中,消息被消费后会被立即删除吗?答案是:不会,RocketMQ 的消息生命周期由 commitlog 文件控制,而不是由 "是否被消费" 决定。
在 RocketMQ 中,消息是按顺序存储在 commitlog 文件中的,且消息大小不固定。因此,消息清理不可能以单条消息为单位进行,而是以 commitlog 文件为单位进行删除。如果按消息粒度清理,不仅实现复杂,还会严重影响性能。
需要注意的是,每个 commitlog 文件都具有过期时间(默认是 72 小时,即 3 天)。除了用户手动删除 commitlog 文件外,在以下情况下也会自动触发 commitlog 文件的清理(无论 commitlog 文件中的消息是否已经被消费):
commitlog 文件过期 + 到达清理时间点
- 当 commitlog 文件超过过期时间,且到达每日清理时间(默认是凌晨 4 点)时,自动删除过期的 commitlog 文件。
commitlog 文件过期 + 磁盘占用率达到过期清理警戒线(默认 75%)
- 当 commitlog 文件超过过期时间,且磁盘占用率达到过期清理警戒线(默认 75%)后,即使未到每日清理时间点,都会立即删除已过期的 commitlog 文件。
磁盘占用率达到清理警戒线(默认 85%)
- 无论 commitlog 文件是否过期,只要磁盘占用率达到清理警戒线(默认 85%)后,即使未到每日清理时间点,都会按照既定策略开始删除文件(通常从最老的 commitlog 文件开始),以释放磁盘空间。
磁盘占用率达到危险警戒线(默认 90%)
- 无论 commitlog 文件是否过期,只要磁盘占用率达到危险警戒线(默认 90%)后,Broker 将拒绝新的消息写入,以保护系统稳定性。
特别注意
- 对于 RocketMQ 系统而言,删除一个约 1GB 大小的 commitlog 文件属于高 I/O 开销操作,在删除过程中会对磁盘造成较大压力,可能导致系统性能明显下降。因此,其默认清理时间设置在凌晨 4 点(业务访问量最低时段)。基于这一机制,应尽量保证磁盘空间有足够冗余,避免系统在其他时间点自动触发 commitlog 文件的删除操作。
- 官方建议 RocketMQ 所运行的 Linux 文件系统采用 ext4。相较于 ext3 文件系统,ext4 在大文件删除场景下具有更好的性能表现,能够更高效地完成文件删除与空间回收操作,从而减少磁盘 I/O 性能抖动并提升系统稳定性。
