RocketMQ 4.x 入门教程之四

大纲

前言

学习资源

版本说明

本文所使用的各软件版本如下表所示:

组件版本说明
JDK11Java、SpringBoot、SpringCloud 项目案例使用的 JDK 版本
RocketMQ Server4.9.0RocketMQ 服务端(包括 NameServer 和 Broker),运行时依赖 JDK 1.8+

RocketMQ 的工作原理

消息的消费

  • 消费者从 Broker 中获取消息的方式有两种:Pull(拉取)和 Push(推送)。
  • 消费者组对于消息的消费模式又分为两种:Broadcasting(广播消费)和 Clustering(集群消费)。

Pull(拉取)消息的时间间隔

由于 Pull(拉取)消息的时间间隔是由用户指定的,所以在设置该间隔时需要注意平稳:间隔太短,空请求比例会增加;间隔太长,消息的实时性太差。

消息的获取方式

拉取式消费
  • Consumer 采用主动拉取(Pull)方式从 Broker 获取消息,消费的主动权掌握在 Consumer 端;一旦拉取到一批消息,就会触发消费流程。
  • 不过,这种消费方式的实时性相对较弱,当 Broker 中产生新消息时,Consumer 无法立即感知并进行消费。
  • 由于拉取消息的时间间隔由用户自行设置,因此需要在实时性与资源消耗之间做好权衡:间隔过短会导致空请求增多,间隔过长则会降低消息的实时性。
推送式消费
  • Broker 在接收到消息后会向 Consumer 发送 “推送” 通知,但并不会真正将消息数据直接推送给客户端,而是触发客户端主动拉取消息,因此整体实时性较高。
  • 这种消费方式本质上属于发布 - 订阅模式:Consumer 会向其关联的 Queue 注册监听器,当有新消息到达时,会触发回调函数执行;而回调函数内部通常仍是由 Consumer 主动从 Queue 中拉取(Pull)消息
  • 这一机制依赖于 Consumer 与 Broker 之间建立的长连接(TCP),而长连接的维持会消耗一定的系统资源。

RocketMQ 的 Push 模式本质上仍然是 “拉取(Pull)+ API 封装”,只是由客户端代替用户自动完成拉取调度。因此,客户端并不能像 Pull 模式那样完全手动控制消息的拉取速度,而是通过一些参数和机制来进行间接调节拉取速度。常见的控制参数包括:

  • 控制消费并发度(最直接)

    • consumeThreadMin / consumeThreadMax
    • 并发越高 → 消费越快 → 客户端会更积极拉取
    • 并发越低 → 消费变慢 → 拉取速度也会被拖慢
  • 控制单次拉取数量

    • pullBatchSize(每次从 Broker 拉取多少条消息)
    • 数量越大 → 单次吞吐高,但可能会堆积消息
    • 数量越小 → 更平滑,但吞吐降低
  • 流控机制(非常关键)

    • RocketMQ 内置了 “消费堆积感知” 机制,例如:
      • pullThresholdForQueue
      • pullThresholdSizeForQueue
    • 当本地缓存(ProcessQueue)积压过多消息时:
      • 客户端会自动暂停拉取消息
消息获取方式对比
  • Pull 方式:需要应用自行实现对关联 Queue 的遍历与拉取,实时性相对较差;但优点是应用可以更灵活地控制消息的拉取速度。
  • Push 方式:对关联 Queue 的遍历已由系统封装完成,实时性较高;但相应地会占用更多的系统资源(比如:TCP 长连接)。

特别注意

在 RocketMQ 的 Push 模式下,客户端不能直接控制消息的拉取速度,但可以通过 "消费能力 + 参数配置 + 流控机制" 间接控制拉取速度。

消息的消费模式

广播消费

在广播消费模式(Broadcasting)下,同一个 Consumer Group(消费者组)内的每个 Consumer 实例都会独立消费同一个 Topic 的全部消息;也就是说,每一条消息都会被该 Consumer Group 中的所有 Consumer 实例各自消费一次。

集群消费

在集群消费模式(Clustering)下,同一个 Consumer Group(消费者组)内的多个 Consumer 实例会分摊消费同一个 Topic 下多个 Queue 中的消息;每条消息在同一个 Consumer Group 内只会被其中一个 Consumer 实例消费。

消息进度的保存方式

RocketMQ 在不同的消费模式下,消费进度(offset)的保存方式是不同的。

  • 广播消费模式:

    • 消费进度(offset)保存在 Consumer 端。
    • 由于在广播消费模式下,同一 Consumer Group(消费者组)中的每个 Consumer 实例都会消费同一个 Topic 的全部消息,但各自的消费进度(offset)互不相同,因此需要由每个 Consumer 实例独立维护自己的消费进度(offset)。
  • 集群消费模式:

    • 消费进度(offset)保存在 Broker 端。
    • 在集群消费模式下,Consumer Group(消费者组)中的所有 Consumer 实例共同消费同一个 Topic 的全部消息,且每条消息只会被其中一个 Consumer 实例消费。
    • 由于消费进度(offset)会参与负载均衡过程,需要在各个 Consumer 实例之间共享,因此统一由 Broker 进行维护和管理。
  • 下面展示了 Broker 中存储的各个 Topic 下不同 Queue 的消费进度(offset)情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# cat ~/store/config/consumerOffset.json
{
"offsetTable": {
"TopicTest@please_rename_unique_group_name_4": {
"0": 60,
"1": 59,
"2": 104,
"3": 75
},
"TopicOrder@order_consumer_group": {
"0": 1280,
"1": 1325,
"2": 1198,
"3": 1402
},
"TopicTrade@trade_consumer_group": {
"0": 5600,
"1": 5588,
"2": 5623,
"3": 5599
}
}
}
消费进度的提交方式

在 RocketMQ 中,消费进度(offfset)有以下两种提交方式,默认采用异步提交方式(可能会导致消息重复消费问题)

  • 同步提交(Sync Commit)

    • Consumer 实例在消费完一批消息后,将消费进度(offset)提交给 Broker
    • 需要等待 Broker 返回成功 ACK 后,才会继续拉取并消费下一批消息
    • 在等待 ACK 响应的期间,Consumer 实例处于阻塞状态
  • 异步提交(Async Commit)

    • Consumer 实例在消费完一批消息后,将消费进度(offset)提交给 Broker
    • 不需要等待 Broker 返回 ACK,即可继续拉取并消费下一批消息
    • 吞吐量更高,但可能带来一定程度的重复消费问题

Rebalance 机制

在 RocketMQ 中,Rebalance(再均衡)机制讨论的前提是:客户端采用集群消费模式

特别注意

Rebalance 机制是以 Consumer Group(消费者组)为粒度触发和执行的,而不是单个 Consumer 实例。

Rebalance 的概述

Rebalance(再均衡)是指:将一个 Topic 下的多个 Queue,在同一 Consumer Group(消费者组)中的多个 Consumer 实例之间重新分配的过程。Rebalance 机制的核心目的是提升消息的并行消费能力。例如:一个 Topic 下有 5 个 Queue,在只有 1 个 Consumer 实例时,该 Consumer 实例需要负责消费全部 5 个 Queue 的消息;当新增一个 Consumer 实例后,这 5 个 Queue 会在两个 Consumer 实例之间重新分配,例如一个分配 2 个 Queue,另一个分配 3 个 Queue,从而提升整体的并行消费能力。

Rebalance 的限制

在 RocketMQ 的集群消费模式下,由于一个 Queue 在同一时刻只能分配给一个 Consumer 实例,因此当某个 Consumer Group(消费者组)中的 Consumer 实例数量多于 Queue 数量时,部分 Consumer 实例将无法分配到任何 Queue。

Rebalance 的危害

Rebalance 在提升消费并行能力的同时,也会带来一些问题:

  • 消费暂停:在某个 Consumer Group(消费者组)中,当只有一个 Consumer 实例时,其负责消费所有 Queue;当新增 Consumer 实例后会触发 Rebalance 机制。此时原有的 Consumer 实例需要暂停部分 Queue 的消费,等待这些 Queue 重新分配给新的 Consumer 实例后,相关 Queue 才会恢复消费。

  • 重复消费:当 Consumer 实例开始消费新分配到的 Queue 时,需要从之前 Consumer 实例提交的消费进度(offset)继续消费。但默认情况下,消费进度(offset)是异步提交的,这可能导致 Broker 中记录的消费进度(offset)与实际消费进度不一致,这段差值对应的消息就可能会被重复消费。

  • 消费突刺:由于 Rebalance 机制可能会引发重复消费,或者在 Rebalance 的执行过程中因消费暂停导致消息积压,当 Rebalance 执行结束后,Consumer 实例可能在短时间内需要处理大量消息,从而出现瞬时的消费压力(即消费突刺)。

Rebalance 产生的原因
  • Rebalance 机制产生的原因主要有两个:
    • 一是 Consumer Group(消费者组)所订阅的 Topic 的 Queue 数量发生变化,比如:
      • Broker 扩容或缩容
      • Broker 升级或运维操作
      • Broker 与 NameServer 之间发生网络异常
      • Topic 下的 Queue 扩容或缩容
    • 二是 Consumer Group(消费者组)中 Consumer 实例的数量发生变化,比如:
      • Consumer 升级或运维操作
      • Consumer Group(消费者组)扩容或缩容
      • Consumer 与 NameServer 之间发生网络异常
Rebalance 的执行过程

在 Broker 中维护着多个 Map 集合(如下所示),这些集合中动态保存着当前 Topic 的 Queue 信息以及 Consumer Group(消费者组)中 Consumer 实例的信息。一旦检测到当前 Topic 的 Queue 数量发生变化,或者 Consumer Group(消费者组)中的 Consumer 实例数量发生变化,Broker 会通知该 Consumer Group(消费者组)中的所有 Consumer 实例触发 Rebalance 执行。Consumer 实例在接收到通知后,会根据既定的 Queue 分配策略自行计算并获取应分配的 Queue,即由 Consumer 端自主完成 Rebalance 的执行过程。

Broker 中与 Rebalance 机制相关的核心 Map 集合

  • TopicConfigManager

    • Key:Topic 名称
    • Value:TopicConfig
    • 作用:维护 Topic 下所有 Queue 的相关信息(Queue 数量等)
    • 关系:当 Queue 数量发生变化时,会触发 Rebalance 执行
  • ConsumerManager

    • Key:Consumer Group Id
    • Value:ConsumerGroupInfo
    • 作用::维护 Consumer Group 中所有 Consumer 实例的信息(客户端列表、连接等)
    • 关系:当 Consumer 实例数量发生变化时,会触发 Rebalance 执行
  • ConsumerOffsetManager

    • Key:Topic 与 Consumer Group 的组合(topic@group
    • Value:一个内层 Map(Map<QueueId, Offset>
      • 内层 Map Key:QueueId
      • 内层 Map Value:该 Queue 的消费进度(offset)
    • 作用:记录每个 Consumer Group 在各个 Queue 上的消费进度(offset)
    • 关系:Rebalance 执行完成后,新分配的 Consumer 实例需要基于该消费进度(offset)继续消费
与 Kafka 的对比
  • 在 Kafka 中,一旦触发 Rebalance 条件,Broker 会通过 Group Coordinator 来协调完成 Rebalance。Coordinator 是运行在 Broker 上的一个组件,它会在 Consumer Group(消费者组)中选举出一个 Group Leader。随后由该 Group Leader 根据当前组内的成员情况,负责完成 Partition 的重新分配,并将分配结果提交给 Coordinator,再由 Coordinator 将分配结果同步给该 Consumer Group 中的所有 Consumer 实例。

  • 相比之下,在 RocketMQ 中,Rebalance 机制则有所不同。RocketMQ 并不存在 Group Leader 的概念,Queue 的分配是由每个 Consumer 实例根据相同的分配策略自行计算完成的,即 Rebalance 是由各个 Consumer 实例独立完成的,而不是由某一个 Leader 统一负责。

Queue 分配策略

在一个 Topic 中,每个 Queue 在同一个 Consumer Group(消费者组)内只能被一个 Consumer 实例独占消费,但一个 Consumer 实例可以同时消费多个 Queue。至于 Queue 与 Consumer 实例之间的分配关系,是通过特定的分配策略来确定的,即决定每个 Queue 分配给哪个 Consumer 实例进行消费。常见的 Queue 分配策略有四种,这些分配策略通常在创建 Consumer 实例时通过构造函数或相关配置进行指定。

平均分配策略

平均分配策略是指基于 avg = queueCount / consumerCount 的结果进行分配的:如果能够整除,则按照顺序为每个 Consumer 实例分配 avg 个 Queue;如果不能整除,则将剩余的 Queue 按照 Consumer 实例的顺序依次分配。其核心思想是先计算出每个 Consumer 实例理论上应分配的 Queue 数量,然后再按顺序将对应数量的 Queue 逐个分配给各个 Consumer 实例,从而尽可能保证分配的均衡性。

环形平均分配策略

环形平均分配策略是指在由多个 Queue 构成的 “环形结构” 中,按照 Consumer 实例的顺序,依次逐个进行分配 Queue。该算法不需要事先计算每个 Consumer 实例应分配的 Queue 数量,而是采用轮询(Round-Robin)的方式,将 Queue 一个接一个地分配给各个 Consumer 实例,从而实现相对均衡的分配效果。

一致性 Hash 分配策略

一致性 Hash 分配策略是指将 Consumer 实例的哈希值作为节点映射到哈希环上,同时将 Queue 的哈希值也映射到该环上,然后按照顺时针方向查找距离该 Queue 最近的 Consumer 实例,该 Consumer 实例即为该 Queue 的分配对象。该算法的优点是当 Consumer 实例数量发生变化时,分配结果的变动较小,但其缺点是容易出现分配不均衡的问题。

同机房分配策略

同机房分配策略是指根据 Queue 所在机房与 Consumer 实例所在机房进行匹配,优先筛选出与当前 Consumer 实例处于同一机房的 Queue,然后再对这些同机房的 Queue 采用平均分配策略或者环形平均分配策略进行分配;如果不存在同机房的 Queue,则退化为对全部 Queue 按照平均分配策略或环形平均分配策略进行分配,从而在优先保证就近消费的同时兼顾负载均衡。

分配策略的对比
  • 平均分配策略与环形平均分配策略的分配效率较高,而一致性 Hash 分配策略的效率相对较低。
  • 这是因为一致性 Hash 算法的实现更复杂,同时其分配结果也更容易出现 Queue 分配不均衡的问题。
  • 不过,一致性 Hash 分配策略的优势在于,当 Consumer Group(消费者组)发生扩容或缩容时,能够显著减少 Queue 与 Consumer 实例之间的重新分配范围,从而降低 Rebalance(再均衡)带来的影响。
  • 因此,一致性 Hash 分配策略更适用于 Consumer 实例数量变化较为频繁的场景

采用平均分配策略时,在 Consumer Group(消费者组)发生扩容前后,Queue 的分配情况如下:

采用一致性 Hash 分配策略时,在 Consumer Group(消费者组)发生缩容前后,Queue 的分配情况如下:

至少一次原则

  • RocketMQ 遵循 “至少一次(At-Least-Once)消费” 原则,即每条消息至少会被成功消费一次。
  • 所谓 “成功消费”,是指 Consumer 实例在完成消息处理后,会向消费进度记录器(offset store)提交消费进度(offset)。当该消费进度(offset)被成功持久化后,就认为这条消息已经消费成功。
  • 需要特别注意的是,由于 “至少一次” 的语义,在异常情况下(例如提交消费进度(offset)失败或消费者处理消息超时),同一条消息可能会被重复消费。

什么是消费进度记录器(Offset Store)

  • 消费进度记录器是用于记录 Consumer 实例已经消费到哪个位置(offset)的组件,用来标识消息消费进度,避免重复消费或实现断点续消费。
  • 在广播消费模式(Broadcasting)下,每个 Consumer 实例都会独立消费同一个 Topic 的全部消息,即每条消息会被所有 Consumer 实例各自消费一次,彼此之间互不影响。消费进度(offset)由 Consumer 实例本地维护,通常存储在本地文件或内存中,因此可以认为 Consumer 实例本身就是消费进度记录器。
  • 在集群消费模式(Clustering)下,同一个 Consumer Group(消费者组)内的多个 Consumer 实例共同分摊消费同一个 Topic 下的多个 Queue 中的消息,需要对消费进度(offset)进行统一管理和协调。此时,消费进度(offset)由 Broker 负责集中存储和维护,因此可以认为 Broker 是消费进度记录器。