RocketMQ 4.x 入门教程之六

大纲

前言

学习资源

版本说明

本文所使用的各软件版本如下表所示:

组件版本说明
JDK11Java、SpringBoot、SpringCloud 项目案例使用的 JDK 版本
RocketMQ Server4.9.0RocketMQ 服务端(包括 NameServer 和 Broker),运行时依赖 JDK 1.8+

RocketMQ 的工作原理

消息的消费

消费幂等实现

什么是消费幂等
  • 当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费同一条消息并未对业务系统产生任何负面影响,那么这个消费过程就是消费幂等的。
  • 在互联网应用中,尤其在网络不稳定的情况下,消息很有可能会出现重复发送或重复消费。如果重复的消息可能会影响业务处理,那么就应该对消息做幂等处理。

幂等是指什么

若某操作执行多次与执行一次对系统产生的影响是相同的,则称该操作是幂等的。

消息重复的场景

什么情况下可能会出现消息被重复消费呢?最常见的有以下三种情况:

发送时消息重复

当一条消息已经成功发送到 Broker 并完成持久化后,如果此时发生网络闪断,导致 Broker 对 Producer 的应答(ACK)未能成功返回,Producer 可能会误判为发送失败并进行重试发送。这样就可能在 Broker 中出现两条内容相同的消息(在某些情况下 Message ID 也可能相同),从而导致后续 Consumer 对该消息进行重复消费。

消费时消息重复

消息已经投递到 Consumer 并完成业务处理,但在 Consumer 向 Broker 返回消费成功应答(ACK)时发生网络闪断,导致 Broker 未能收到该消费确认。为了保证 “至少消费一次” 的语义,Broker 在网络恢复后会再次投递这条已处理过的消息。此时,Consumer 可能会再次收到一条内容相同(某些情况下 Message ID 也相同)的消息,从而产生重复消费。

Rebalance 时消息重复

当 Consumer Group(消费者组)中的 Consumer 数量发生变化,或其订阅的 Topic 的 Queue 数量发生变化时,会触发 RocketMQ 的 Rebalance(再均衡)机制。此过程中,Queue 的分配关系会发生变化,部分 Queue 会从原 Consumer 转移(分配)到新的 Consumer。由于在 Rebalance 触发之前,Consumer 的消费进度(offset)可能尚未及时提交到 Broker,这样新分配到 Queue 的 Consumer 就可能会从较早的位置开始重新消费消息。因此,Consumer 可能会再次收到之前已经处理过的消息,从而产生重复消费。

通用的幂等解决方案
两核心要素

在幂等解决方案的设计中,涉及到两项要素:幂等标识与唯一性处理。只要充分利用好这两要素,就可以设计出优秀的幂等解决方案。

  • 幂等标识:是生产者和消费者两者中的既定协议,通常指具备唯⼀业务标识的字符串。例如,订单号、支付流水号,一般由 Producer 随着消息一同发送过去的。
  • 唯一性处理:服务端通过采用⼀定的算法策略,保证同⼀个业务逻辑不会被重复执行成功多次。例如,对同一笔订单的多次支付操作,只会成功一次。
解决方案设计

对于常见的系统,幂等性操作的一种通用解决方案如下:

  • (1) 先通过缓存进行快速去重:如果缓存中已存在该幂等标识,则说明请求已处理,直接返回;若未命中,则进入下一步。
  • (2) 通过数据库唯一索引进行幂等控制(写而不是查):在执行业务之前,直接尝试将幂等标识作为唯一键写入数据库(比如,写入幂等表或业务表主键)。若写入失败(唯一键冲突),说明是重复请求,直接返回;若写入成功,则进入下一步。
  • (3) 在同一数据库事务中完成业务处理:在幂等标识成功写入数据库后,执行业务逻辑,并将业务结果持久化到数据库中,必须确保幂等标识与业务结果的写入是在同一个数据库事务中原子提交,即要么全部成功,要么全部失败。
  • (4) 事务提交后再写缓存:事务提交成功后,将幂等标识写入缓存,用于后续请求的快速幂等判断,减轻数据库压力。

特别注意

  • 第 1 步虽然已经通过缓存判断了一次是否为重复操作,但这并不意味着可以省略第 2 步。因为缓存中的数据通常是有过期时间的,一旦缓存失效,就会发生 "缓存穿透",请求会直接到达数据库。
  • 因此,即使请求能够进入第 2 步,也不能简单认为其一定不是重复操作。此时仍需要通过数据库(基于唯一索引的幂等标识)再次进行校验,以确保在缓存失效或异常情况下,依然能够正确识别并拦截重复请求。
  • 在上述幂方案设计中,幂等性保证是以数据库唯一约束为准,缓存只是性能优化,因此必须先将幂等标识写入数据库,然后再写入缓存。
解决方案举例

这里以支付场景为例,介绍如何通过幂等性操作,避免重复支付。

  • (1) 先通过缓存进行快速去重:

    • 当支付请求到达时,先在缓存(如 Redis)中查询以 “支付流水号” 为 Key 的记录。若存在,则说明该请求很可能已处理过,直接返回;若不存在,则执行下一步
  • (2) 通过数据库唯一约束进行幂等控制(写而不是查):

    • 在执行业务之前,直接尝试将 “支付流水号” 作为唯一键写入数据库(比如,写入幂等表或支付表主键)
    • 若写入成功,则说明是首次请求,继续执行下一步
    • 若写入失败(唯一键冲突),则说明是重复请求,直接返回
  • (3) 在同一数据库事务中完成业务处理:

    • 在写入幂等标识的同一个数据库事务中,完成以下操作:
      • 执行支付业务(如扣款、订单状态更新等)
      • 将支付结果及相关数据写入到数据库
    • 最后,确保幂等标识与业务结果的写入是在同一个数据库事务中原子提交,即要么全部成功,要么全部失败
  • (4) 事务提交后再写缓存:

    • 在数据库事务提交成功之后,再将 “支付流水号” 写入缓存,并设置过期时间,比如 SET key value EX seconds
    • 用于后续请求的快速幂等判断,减轻数据库压力

提示

实现支付幂等的本质是:用数据库唯一约束 "抢占执行权",用事务保证结果一致性,用缓存提升性能,而不是依赖缓存保证正确性。

消费幂等的实战方案

消费幂等的核心在于为每条消息设置一个全局唯一且稳定的标识。由于 Message ID 在某些情况下可能出现重复或不具备业务语义,因此不建议将其作为幂等判断的唯一依据。更可靠的做法是使用业务唯一标识作为幂等判断的关键依据,而该标识通常可以通过消息的 Key 来设置或传递,从而保证消费端能够正确识别并处理重复消息。下面以支付场景为例,介绍 RocketMQ 如何实现消费幂等。

  • 生产者发送消息时,可以将消息的 Key 设置为支付流水号,作为幂等处理的依据:
1
2
3
Message message = new Message();
message.setKey("PAYMENT_NO_1000000");
SendResult sendResult = producer.send(message);
  • 消费者接收到消息时,可以根据消息的 Key(即支付流水号)来实现消费幂等:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String key = msg.getKeys();

// 根据业务唯一标识 Key 做幂等处理
// TODO: 幂等校验逻辑
}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

特别注意

RocketMQ 在合理配置(如同步刷盘、Broker 主从复制等)下,可以实现较高可靠性的消息投递,尽量避免消息丢失;但其消费语义属于至少一次(At-Least-Once)投递,无法保证消息绝对不重复。因此,业务层需要自行实现幂等处理或去重机制,以应对重复消息带来的影响。

消息堆积与消费延迟

核心概念

在消息处理流程中,如果 Consumer 的消费速度跟不上 Producer 的生产速度,MQ 中未处理的消息会不断增加(即进入的消息多于被消费的消息),这部分尚未被消费的消息通常称为堆积消息。消息堆积会进一步导致消费延迟。以下场景需要重点关注消息堆积和消费延迟问题:

  • 业务系统上下游处理能力不匹配,导致消息持续堆积,且无法自行恢复。
  • 业务系统对消息消费的实时性要求较高,即使是短时间的堆积,也可能造成不可接受的消费延迟。
产生原因分析

Consumer 使用长轮询 Pull 模式消费消息时,分为以下两个阶段:消息拉取和消息消费。

消息拉取
  • Consumer 通过从服务端批量拉取消息,并将拉取到的消息缓存到本地缓冲队列中。对于这种拉取式消费模式,在内网环境下通常具备较高吞吐能力,因此一般不会成为消息堆积的主要瓶颈。
  • 在较低规格的主机(例如 4C/8G)上,如果采用单线程、单分区(Queue)消费模型,其吞吐量通常可以达到数万 TPS。如果增加分区数量并引入多线程并行消费,则整体吞吐能力可以进一步提升到数十万 TPS 级别。
消息消费
  • Consumer 会将本地缓存队列中的消息提交到消费线程池中,由业务消费逻辑进行处理,并在处理完成后返回执行结果。这一阶段才是真正的消息消费过程。
  • 此时,Consumer 的整体消费能力主要取决于单条消息的处理耗时以及消费并发度(线程数)。如果由于业务逻辑复杂或外部依赖较慢等原因,导致单条消息处理时间较长,那么整体吞吐量必然下降。
  • 在这种情况下,本地缓存队列中的消息会逐渐累积,当队列大小达到上限时,Consumer 会触发流控机制,暂停从服务端继续拉取消息,以避免本地内存溢出和系统过载。
结论说明
  • 消息堆积的主要瓶颈通常在于客户端的消费能力,而消费能力主要由消费耗时和消费并发度(线程数)共同决定。其中,消费耗时的影响优先级高于消费并发度(线程数)。
  • 也就是说,应首先确保单条消息的处理耗时在合理范围内,避免由于业务逻辑复杂或外部依赖调用慢导致消费变慢。在此基础上,再通过提升消费并发度(如增加线程数、提高并行度等)来进一步提升整体吞吐能力。
消费耗时

影响消息处理时长的主要因素在于业务代码逻辑本身。在具体实现中,可能影响消息处理耗时的代码通常可以分为两类:

  • CPU 内部计算型代码:主要指在本地 CPU 中完成的计算逻辑
    • 例如:简单的条件判断、对象转换、字段计算、加解密运算(如 AES/SHA)、以及一般的循环遍历和数据处理等(不涉及外部资源访问)
  • 外部 I/O 操作型代码:主要指依赖外部系统的操作
    • 例如:数据库读写(MySQL)、远程 RPC 调用(HTTP/gRPC)、缓存访问(Redis)、消息系统交互(MQ)、以及文件系统读写等,这类操作通常受网络或磁盘延迟影响较大

一般情况下,如果不存在复杂的递归或大规模循环计算,CPU 计算的耗时相对较低,在整体处理时延中通常可以忽略不计。相比之下,外部 I/O 操作由于涉及网络或磁盘交互,延迟不确定性更高,往往成为影响消息处理时长的主要瓶颈所在。

提示

外部 I/O 操作型代码主要包括对外部系统的访问,例如:读写远程数据库(如 MySQL 的读写操作)、访问分布式缓存系统(如 Redis 的读写操作)、以及调用下游服务(如通过 Dubbo 进行 RPC 远程调用,或通过 Spring Cloud 发起 HTTP 接口请求等)。这些操作通常依赖网络或外部系统资源,因此耗时具有较强的不确定性。 在实际设计与排查过程中,需要提前梳理下游系统调用链路,并明确每个 I/O 操作的预期耗时范围,从而判断消费逻辑中的 I/O 开销是否合理。通常情况下,消息堆积的根本原因并不在消费逻辑本身,而是由于下游系统出现异常或性能瓶颈,导致整体消费耗时显著增加。 这里的 "系统异常" 不仅仅指类似 500 这样的显式错误,还可能包括更隐蔽的问题,例如网络带宽不足、连接延迟升高等。此外,当数据库管理系统(DBMS)达到容量或性能上限时,也会显著增加 I/O 响应时间,从而进一步放大消费延迟,最终引发消息堆积。

消费并发度
  • 一般情况下,消费者端的消费并发度由单节点线程数节点数量共同决定
    • 对于普通消息、延时消息及事务消息,其整体并发能力可近似表示为:单节点线程数 × 节点数量
      • 单节点线程数:单个 Consumer 所包含的消费线程数量
      • 节点数量:Consumer Group(消费者组)中所包含的 Consumer 数量
    • 对于顺序消息,其整体并发能力可近似表示为:Topic 的 Queue(分区)数量
      • 顺序消息可以分为全局顺序消息和分区顺序消息,其简单介绍如下
  • 但在实际优化过程中,通常应优先提升单节点的线程数,以充分利用单机硬件资源(如 CPU、内存等)。当单机资源达到瓶颈或无法继续有效提升时,再通过横向扩展节点数量的方式来提高整体消费并发度,从而扩展系统的消费能力。

  • 全局顺序消息

    • 全局顺序消息是指:该类型消息的 Topic 仅包含一个 Queue(分区),从而可以保证该 Topic 下的所有消息在消费端按发送顺序依次被消费。
    • 为了保证这种全局顺序性,在同一时刻,Consumer Group(消费者组)中只能由一个 Consumer 的一个线程进行消费处理。因此,全局顺序消息的消费并发度为 1。
  • 分区顺序消息

    • 分区顺序消息是指:该类型消息的 Topic 包含多个 Queue(分区)。它只能保证单个 Queue(分区)内的消息按顺序消费,但无法保证整个 Topic 范围内所有 Qeueue(分区)内的消息按顺序消费。
    • 为了实现这种分区内的顺序性,在 Consumer Group(消费者组)中,同一时刻每个 Queue(分区)只能由一个 Consumer 的一个线程进行消费处理。也就是说,不同 Queue(分区)之间可以并行消费,但同一 Queue(分区)内必须串行消费。
    • 因此,在理想情况下(每个 Queue 都被均衡投递消息),消费并发度等于 Topic 的 Queue(分区)数量,即最多可以有多个 Consumer 的多个线程同时处理不同的 Queue(分区)。
单机消费线程数估算

在单机环境中,消费线程池的线程数设置需要谨慎,不能盲目设置过大。过多的消费线程不仅无法提升性能,反而会带来频繁的线程上下文切换开销,降低整体效率。在理想情况下,单节点的最优消费线程数可以通过此公式进行估算:$N = C \cdot \frac {T_1 + T_2}{T_1}$

  • C:CPU 核心数
  • T₁:CPU 内部计算耗时
  • T₂:外部 I/O 操作耗时

最优消费线程数的推导公式: $N = C \cdot \frac {T_1 + T_2}{T_1} = C + C \cdot \frac {T_2}{T_1}$

  • 当 I/O 耗时(T₂)越高时,可以通过增加消费线程数来 “掩盖 I/O 操作的等待时间”
  • 当任务偏 CPU 密集型(T₂ 较小)时,消费线程数应接近 CPU 核心数(C)

需要注意的是,该计算结果只是理想状态下的理论值。在实际生产环境中,不建议直接套用,而应采用以下方式进行调优:

  • 先设置一个略低于理论值的消费线程数
  • 通过压测观察系统吞吐、系统延迟、CPU 使用率等性能指标
  • 再逐步增加消费线程数,找到当前硬件环境下的最优点(性能拐点)

这样可以避免过度配置线程所带来的线程上下文切换开销,并获得更稳定的系统性能。

如何避免消息堆积和消费延迟

为了避免在业务运行过程中出现非预期的消息堆积和消费延迟问题,需要在系统设计阶段对整体业务逻辑进行充分梳理。其中最关键的两点是:明确消息消费耗时以及合理设置消费并发度(线程数)

  • 明确消息的消费耗时

    • 通过压测获取单条消息的消费耗时,并对耗时较高的代码逻辑进行重点分析。
    • 需要重点关注以下方面:
      • 消费逻辑的计算复杂度是否过高,是否存在异常的循环或递归等问题。
      • 消费逻辑中的 I/O 操作是否必要,能否通过本地缓存等方式进行优化或规避。
      • 是否存在耗时较高的操作可以进行异步化处理;若可以,需评估异步化后是否会引入业务一致性或逻辑问题。
  • 合理设置消费并发度(线程数)

    • 消费并发度(线程数)的确定通常分为两步:
      • 逐步调大单个 Consumer 节点的消费线程数,并观察 CPU 负载、延迟、吞吐量等系统指标,确定单节点的最优消费线程数及对应的最大消息吞吐能力。
      • 根据上下游链路的流量峰值,计算所需 Consumer 节点的数量:节点数 = 流量峰值 / 单节点的最大消息吞吐量