RocketMQ 4.x 入门教程之四
大纲
- RocketMQ 4.x 入门教程之一、RocketMQ 4.x 入门教程之二、RocketMQ 4.x 入门教程之三
- RocketMQ 4.x 入门教程之四、RocketMQ 4.x 入门教程之五、RocketMQ 4.x 入门教程之六
- RocketMQ 4.x 入门教程之七、RocketMQ 4.x 入门教程之八、RocketMQ 4.x 入门教程之九
前言
学习资源
版本说明
本文所使用的各软件版本如下表所示:
| 组件 | 版本 | 说明 |
|---|---|---|
| JDK | 11 | Java、SpringBoot、SpringCloud 项目案例使用的 JDK 版本 |
| RocketMQ Server | 4.9.0 | RocketMQ 服务端(包括 NameServer 和 Broker),运行时依赖 JDK 1.8+ |
RocketMQ 的工作原理
消息的消费
- 消费者从 Broker 中获取消息的方式有两种:Pull(拉取)和 Push(推送)。
- 消费者组对于消息的消费模式又分为两种:Broadcasting(广播消费)和 Clustering(集群消费)。
Pull(拉取)消息的时间间隔
由于 Pull(拉取)消息的时间间隔是由用户指定的,所以在设置该间隔时需要注意平稳:间隔太短,空请求比例会增加;间隔太长,消息的实时性太差。
消息的获取方式
拉取式消费
- Consumer 采用主动拉取(Pull)方式从 Broker 获取消息,消费的主动权掌握在 Consumer 端;一旦拉取到一批消息,就会触发消费流程。
- 不过,这种消费方式的实时性相对较弱,当 Broker 中产生新消息时,Consumer 无法立即感知并进行消费。
- 由于拉取消息的时间间隔由用户自行设置,因此需要在实时性与资源消耗之间做好权衡:间隔过短会导致空请求增多,间隔过长则会降低消息的实时性。
推送式消费
- Broker 并不会真正向 Consumer 发送 “推送通知” 或者直接下发消息,而是由客户端内部持续发起 Pull 请求,从效果上呈现为 “类似推送”,因此整体实时性较高。
- 这种消费方式在使用体验上类似发布 - 订阅模式:Consumer 注册
MessageListener后由框架触发回调,但回调的本质是消息被客户端拉取到本地后再异步分发执行,而非 Broker 主动推送触发。 - 该机制的本质是 “长连接 + 长轮询 Pull”,依赖于 Consumer 与 Broker 之间建立的长连接(TCP),并在此连接上持续进行拉取请求(长轮询),因此会消耗一定的连接和线程资源(但通常可控)。
特别注意
RocketMQ 本质上采用的是 Pull 模型,但通过 "长连接 + 长轮询 Pull" 的方式实现 Push 模型,从而实现更高效的消息获取。Consumer 通过 TCP 长连接不断向 Broker 发起 Pull 请求,当 Broker 发现暂时没有消息时不会立即返回,而是将请求挂起一段时间(长轮询机制),一旦有新消息到达则立即返回给 Consumer,从而既减少了频繁空轮询带来的性能开销,又提升了消息的实时性,使整体效果接近真正的 Push 模型。简而言之,RocketMQ 的 Push 模型是 Pull 内核 + Push 封装,本质上还是基于 Pull 模型。
RocketMQ 的 Push 消费方式本质上仍然是 “拉取(Pull)+ 封装”,只是由客户端代替用户自动完成拉取调度。因此,客户端并不能像 Pull 模式那样完全手动控制消息的拉取速度,而是通过一些参数和机制来进行间接调节拉取速度。常见的控制参数包括:
(1) 控制消费并发度(最直接的手段)
- 参数:
consumeThreadMin,表示最小消费线程数,默认值是 20-
consumeThreadMax,表示最大消费线程数,默认值是 20
- 本质:控制消费线程池大小
- 影响:
- 并发越高 → 消费能力越强 → 本地堆积减少 → 拉取可以更快进行
- 并发越低 → 消费变慢 → 本地堆积增加 → 反向抑制拉取速度(形成背压)
- 参数:
(2) 控制单次拉取的消息数量
- 参数:
pullBatchSize,默认值是 32 - 本质:控制每次从 Broker 批量拉取的消息条数
- 影响:
- 数量越大 → 单次网络吞吐更高,但可能导致本地短时间堆积
- 数量越小 → 拉取更平滑、延迟更低,但整体吞吐下降
- 参数:
(3) 控制批量消费的消息数量
- 参数:
consumeMessageBatchMaxSize,默认值是 1 - 本质:控制每次批量消费的最大消息数量,其值不允许超过
pullBatchSize - 影响:
- 数量越大 → 消息的并发消费能力就越低,因为这一批次的消息只会被一个线程处理,并且只要其中有一条消息处理异常,整批消息都需要全部重新消费
- 数量越小 → 整体的消费吞吐下降
- 参数:
(4) 流控机制(核心:消费端背压)
- RocketMQ 内置了基于本地缓存的流控机制:
pullThresholdForQueue(按消息条数限制),默认值是 1000。表示当本地队列缓存的消息数量超过此阀值时,Consumer 将暂停从 Broker 拉取新消息,以避免内存压力过大pullThresholdSizeForQueue(按消息大小限制),默认值是 100MB。表示当本地缓存消息的总大小超过此阈值时,Consumer 也会触发流控,暂停拉取新消息
- 工作机制:
- 消息会先进入本地缓存(
ProcessQueue) - 当
ProcessQueue中的消息堆积超过阈值时: - 客户端会延迟或暂停拉取请求(而不是完全停止)
- 等待消费线程处理一部分消息后,再恢复拉取
- 消息会先进入本地缓存(
- RocketMQ 内置了基于本地缓存的流控机制:
消息获取方式对比
- Pull 方式:需要应用自行实现对关联 Queue 的遍历与拉取,实时性相对较差;但优点是应用可以更灵活地控制消息的拉取速度。
- Push 方式:对关联 Queue 的遍历已由系统封装完成,实时性较高;但相应地会占用更多的系统资源(比如:TCP 长连接)。
特别注意
在 RocketMQ 的 Push 消费方式下,客户端不能直接控制消息的拉取速度,但可以通过 "消费能力 + 参数配置 + 流控机制" 间接控制拉取速度。
消息的消费模式
广播消费
在广播消费模式(Broadcasting)下,同一个 Consumer Group(消费者组)内的每个 Consumer 实例都会独立消费同一个 Topic 的全部消息;也就是说,每一条消息都会被该 Consumer Group 中的所有 Consumer 实例各自消费一次。

集群消费
在集群消费模式(Clustering)下,同一个 Consumer Group(消费者组)内的多个 Consumer 实例会共同分摊消费同一个 Topic 下多个 Queue 中的消息;每条消息在同一个 Consumer Group 内只会被其中一个 Consumer 实例消费。

Rebalance 机制
在 RocketMQ 中,Rebalance(再均衡)机制讨论的前提是:客户端采用集群消费模式。
特别注意
Rebalance 机制是以 Consumer Group(消费者组)为粒度触发和执行的,而不是单个 Consumer 实例。
Rebalance 的概述
Rebalance(再均衡)是指:将一个 Topic 下的多个 Queue,在同一个 Consumer Group(消费者组)中的多个 Consumer 实例之间重新分配的过程。Rebalance 机制的核心目的是提升消息的并行消费能力。例如:一个 Topic 下有 5 个 Queue,在只有 1 个 Consumer 实例时,该 Consumer 实例需要负责消费全部 5 个 Queue 的消息;当 Consumer Group(消费者组)新增一个 Consumer 实例后,这 5 个 Queue 会在两个 Consumer 实例之间重新分配,例如一个分配 2 个 Queue,另一个分配 3 个 Queue,从而提升整体的并行消费能力。

Rebalance 的限制
在 RocketMQ 的集群消费模式下,由于一个 Queue 在同一时刻只能分配给一个 Consumer 实例,因此当某个 Consumer Group(消费者组)中的 Consumer 实例数量多于 Queue 数量时,部分 Consumer 实例将无法分配到任何 Queue。
Rebalance 的危害
Rebalance 在提升消费并行能力的同时,也会带来一些问题:
消费暂停:在某个 Consumer Group(消费者组)中,当只有一个 Consumer 实例时,其负责消费所有 Queue;当新增 Consumer 实例后会触发 Rebalance 机制。此时原有的 Consumer 实例需要暂停部分 Queue 的消费,等待这些 Queue 重新分配给新的 Consumer 实例后,相关 Queue 才会恢复消费。
重复消费:当 Consumer 实例开始消费新分配到的 Queue 时,需要从之前 Consumer 实例提交的消费进度(offset)继续消费。但默认情况下,消费进度(offset)是异步提交的,这可能导致 Broker 中记录的消费进度(offset)与实际消费进度不一致,这段差值对应的消息就可能会被重复消费。
消费突刺:由于 Rebalance 机制可能会引发重复消费,或者在 Rebalance 的执行过程中因消费暂停导致消息积压,当 Rebalance 执行结束后,Consumer 实例可能在短时间内需要处理大量消息,从而出现瞬时的消费压力(即消费突刺)。
Rebalance 触发的原因
- Rebalance 机制触发的原因主要有两个:
- 一是 Consumer Group(消费者组)所订阅的 Topic 的 Queue 数量发生了变化,可能原因:
- Broker 扩容或缩容
- Broker 升级或运维操作
- Broker 与 NameServer 之间发生网络异常
- Topic 下的 Queue 扩容或缩容
- 二是 Consumer Group(消费者组)中 Consumer 实例的数量发生了变化,可能原因:
- Consumer 升级或运维操作
- Consumer Group(消费者组)扩容或缩容
- Consumer 与 NameServer 之间发生网络异常
- 一是 Consumer Group(消费者组)所订阅的 Topic 的 Queue 数量发生了变化,可能原因:
Rebalance 的执行过程
在 Broker 中维护着多个 Map 集合(如下所示),这些集合中动态保存着当前 Topic 的 Queue 信息以及 Consumer Group(消费者组)中 Consumer 实例的信息。一旦检测到当前 Topic 的 Queue 数量发生变化,或者 Consumer Group(消费者组)中的 Consumer 实例数量发生变化,Broker 会通知该 Consumer Group(消费者组)中的所有 Consumer 实例触发 Rebalance 执行。Consumer 实例在接收到通知后,会根据既定的 Queue 分配策略自行计算并获取应分配的 Queue,即由 Consumer 端自主完成 Rebalance 的执行过程。
Broker 中与 Rebalance 机制相关的核心 Map 集合
TopicConfigManager
- Key:Topic 名称
- Value:TopicConfig
- 作用:维护 Topic 下所有 Queue 的相关信息(Queue 数量等)
- 关系:当 Queue 数量发生变化时,会触发 Rebalance 执行
ConsumerManager
- Key:Consumer Group Id
- Value:ConsumerGroupInfo
- 作用::维护 Consumer Group 中所有 Consumer 实例的信息(客户端列表、连接等)
- 关系:当 Consumer 实例数量发生变化时,会触发 Rebalance 执行
ConsumerOffsetManager
- Key:Topic 与 Consumer Group 的组合(
topic@group) - Value:一个内层 Map(
Map<QueueId, Offset>)- 内层 Map Key:QueueId
- 内层 Map Value:该 Queue 的消费进度(offset)
- 作用:记录每个 Consumer Group 在各个 Queue 上的消费进度(offset)
- 关系:Rebalance 执行完成后,新分配的 Consumer 实例需要基于该消费进度(offset)继续消费
- Key:Topic 与 Consumer Group 的组合(
与 Kafka 的对比
在 Kafka 中,一旦触发 Rebalance 条件,Broker 会通过 Group Coordinator 来协调完成 Rebalance。Coordinator 是运行在 Broker 上的一个组件,它会在 Consumer Group(消费者组)中选举出一个 Group Leader。随后由该 Group Leader 根据当前组内的成员情况,负责完成 Partition 的重新分配,并将分配结果提交给 Coordinator,再由 Coordinator 将分配结果同步给该 Consumer Group 中的所有 Consumer 实例。
相比之下,在 RocketMQ 中,Rebalance 机制则有所不同。RocketMQ 并不存在 Group Leader 的概念,Queue 的分配是由每个 Consumer 实例根据相同的分配策略自行计算完成的,即 Rebalance 是由各个 Consumer 实例独立完成的,而不是由某一个 Leader 统一负责。
Queue 分配策略
在一个 Topic 中,每个 Queue 在同一个 Consumer Group(消费者组)内只能被一个 Consumer 实例独占消费,但一个 Consumer 实例可以同时消费多个 Queue。至于 Queue 与 Consumer 实例之间的分配关系,是通过特定的分配策略来确定的,即决定每个 Queue 分配给哪个 Consumer 实例进行消费。常见的 Queue 分配策略有四种,这些分配策略通常在创建 Consumer 实例时通过构造函数或相关配置进行指定。
平均分配策略
平均分配策略是指基于 avg = queueCount / consumerCount 的结果进行分配的:如果能够整除,则按照顺序为每个 Consumer 实例分配 avg 个 Queue;如果不能整除,则将剩余的 Queue 按照 Consumer 实例的顺序依次分配。其核心思想是先计算出每个 Consumer 实例理论上应分配的 Queue 数量,然后再按顺序将对应数量的 Queue 逐个分配给各个 Consumer 实例,从而尽可能保证分配的均衡性。

环形平均分配策略
环形平均分配策略是指在由多个 Queue 构成的 “环形结构” 中,按照 Consumer 实例的顺序,依次逐个进行分配 Queue。该算法不需要事先计算每个 Consumer 实例应分配的 Queue 数量,而是采用轮询(Round-Robin)的方式,将 Queue 一个接一个地分配给各个 Consumer 实例,从而实现相对均衡的分配效果。

一致性 Hash 分配策略
一致性 Hash 分配策略是指将 Consumer 实例的哈希值作为节点映射到哈希环上,同时将 Queue 的哈希值也映射到该环上,然后按照顺时针方向查找距离该 Queue 最近的 Consumer 实例,该 Consumer 实例即为该 Queue 的分配对象。该算法的优点是当 Consumer 实例数量发生变化时,分配结果的变动较小,但其缺点是容易出现分配不均衡的问题。

同机房分配策略
同机房分配策略是指根据 Queue 所在机房与 Consumer 实例所在机房进行匹配,优先筛选出与当前 Consumer 实例处于同一机房的 Queue,然后再对这些同机房的 Queue 采用平均分配策略或者环形平均分配策略进行分配;如果不存在同机房的 Queue,则退化为对全部 Queue 按照平均分配策略或环形平均分配策略进行分配,从而在优先保证就近消费的同时兼顾负载均衡。

分配策略的对比
- 平均分配策略与环形平均分配策略的分配效率较高,而一致性 Hash 分配策略的效率相对较低。
- 这是因为一致性 Hash 算法的实现更复杂,同时其分配结果也更容易出现 Queue 分配不均衡的问题。
- 不过,一致性 Hash 分配策略的优势在于,当 Consumer Group(消费者组)发生扩容或缩容时,能够显著减少 Queue 与 Consumer 实例之间的重新分配范围,从而降低 Rebalance(再均衡)带来的影响。
- 因此,一致性 Hash 分配策略更适用于 Consumer 实例数量变化较为频繁的场景。
采用平均分配策略时,在 Consumer Group(消费者组)发生扩容前后,Queue 的分配情况如下:

采用一致性 Hash 分配策略时,在 Consumer Group(消费者组)发生缩容前后,Queue 的分配情况如下:

至少一次原则
- RocketMQ 遵循 “至少一次(At-Least-Once)消费” 原则,即每条消息至少会被成功消费一次。
- 所谓 “成功消费”,是指 Consumer 实例在完成消息处理后,会向消费进度记录器(offset store)提交消费进度(offset)。当该消费进度(offset)被成功持久化后,就认为这条消息已经消费成功。
- 需要特别注意的是,由于 “至少一次” 的语义,在异常情况下(例如提交消费进度(offset)失败或消费者处理消息超时),同一条消息可能会被重复消费。
什么是消费进度记录器(Offset Store)
- 消费进度记录器是用于记录 Consumer 实例已经消费到哪个位置(offset)的组件,用来标识消息消费进度,避免重复消费或实现断点续消费。
- 在广播消费模式(Broadcasting)下,每个 Consumer 实例都会独立消费同一个 Topic 的全部消息,即每条消息会被所有 Consumer 实例各自消费一次,彼此之间互不影响。消费进度(offset)由 Consumer 实例本地维护,通常存储在本地文件或内存中,因此可以认为 Consumer 实例本身就是消费进度记录器。
- 在集群消费模式(Clustering)下,同一个 Consumer Group(消费者组)内的多个 Consumer 实例共同分摊消费同一个 Topic 下的多个 Queue 中的消息,需要对消费进度(offset)进行统一管理和协调。此时,消费进度(offset)由 Broker 负责集中存储和维护,因此可以认为 Broker 是消费进度记录器。
