RocketMQ 4.x 入门教程之八

大纲

前言

学习资源

版本说明

本文所使用的各软件版本如下表所示:

组件版本说明
JDK11Java、SpringBoot、SpringCloud 项目案例使用的 JDK 版本
RocketMQ Server4.9.0RocketMQ 服务端(包括 NameServer 和 Broker),运行时依赖 JDK 1.8+
RocketMQ Client4.8.0RocketMQ 客户端(SDK),要求跟 RocketMQ Server 的版本严格匹配

RocketMQ 的应用

引入依赖

本文提供的 Java 代码,均基于以下版本的 RocketMQ Client SDK 实现,要求部署的 RocketMQ Server 版本是 4.9.0

1
2
3
4
5
6
7
8
<dependencies>
<!-- RocketMQ 客户端(SDK),对应 RocketMQ Server 4.9.0 版本 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
</dependencies>

延迟消息

延迟消息的核心概念

  • 延迟消息的核心概念

    • 延迟消息是指:当消息写入到 Broker 后,只有在指定的时长之后才可被消费处理的消息。
    • 使用 RocketMQ 的延时消息可以实现定时任务的功能,而无需额外使用定时器。
    • 典型应用场景包括:电商交易中超时未支付则关闭订单,以及 12306 平台订票超时未支付则取消订票等场景。
  • 延迟消息的使用场景

    • 在电商平台中,订单创建成功后会发送一条延迟消息,该消息将在 30 分钟后投递给后台业务系统(Consumer)。后台业务系统收到消息后,会判断对应订单是否已完成支付:如果未完成支付,则取消订单,并将商品重新放回库存;如果已完成支付,则忽略该消息。
    • 在 12306 平台中,车票预订成功后也会发送一条延迟消息,该消息将在 45 分钟后投递给后台业务系统(Consumer)。后台业务系统收到消息后,会判断对应订单是否已完成支付:如果未完成支付,则取消预订,并将车票重新放回票池;如果已完成支付,则忽略该消息。

延迟消息的延迟等级

RocketMQ 的延时消息不支持任意时长的延迟,而是通过特定的延迟等级来指定。延迟等级(最多 18 个)定义在 RocketMQ 服务端的 MessageStoreConfig 类中的如下变量中:

例如,若指定的延迟等级为 3(延迟等级从 1 开始计数),则表示延迟时长为 10 秒。当然,如果需要自定义延迟等级,可以在 Broker 加载的配置文件中新增相应的配置,而配置文件位于 RocketMQ 安装目录下的 conf 目录中,单机版的配置文件名是 broker.conf。例如,下面增加了一个延迟时长为 1 天的等级 1d

1
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d

延迟等级的配置规则如下:

规则项说明
单位支持s(秒)、m(分)、h(小时)、d(天)
顺序要求必须按时间升序排列
数量限制默认最多支持 18 个级别(支持用户配置多于 18 个延迟等级)
配置后需要重启 Broker 才能生效

延迟等级使用的注意事项:

  • 事务消息不支持设置延迟等级
  • 如果 Broker 采用集群部署,需要确保所有 Broker 的延迟等级配置保持一致
  • 所有 Broker 修改完延迟等级后,延迟等级需要在所有 Broker 都重启后才能生效
  • 延迟等级的默认范围是:1 ≤ level ≤ 18(支持用户配置多于 18 个延迟等级),不支持精确到毫秒级的延迟等级
  • 增加或者删除延迟等级后,代码中发送消息时设置的延迟等级数值需要对应新的延迟等级顺序位置(从 1 开始计数)
  • 延迟消息不保证严格时序(网络抖动可能会导致存在微小的时间偏差)
  • 延迟消息消费失败并重试投递时,原延迟等级会失效,消息将按照默认的重试退避策略重新投递,而不会再次遵循原延时等级设置

延迟等级使用的最佳实践:

  • 优先使用预设延迟等级,避免频繁修改 Broker 配置
  • 高并发场景建议分散不同的延迟等级,防止大量延时消息在同一时刻同时投递,造成瞬间流量尖峰,压垮下游消费者

延迟消息的实现原理

工作流程

在 RocketMQ 中,延迟消息的工作流程图如下:

底层原理

RocketMQ 不同版本中延迟消息实现

从 RocketMQ 5.0 开始,延时消息的实现机制有了重大重构,引入了时间轮(TimerWheel)和 TimerLog 等组件,支持任意时长延迟,不再受固定延迟等级限制,其存储模型和调度方式与 4.x 版本也有很大不同,详细说明请看 这里

构建延时索引

Producer 将消息发送到 Broker 后,Broker 首先会将其写入到 CommitLog 文件中(第一次写入),然后根据消息中的 Topic 信息和 Queue 信息,将其分发到目标 Topic 的相应 ConsumeQueue。不过,在分发到 ConsumeQueue 之前,系统会先判断消息中是否带有延时等级;若没有,则直接正常分发;如果有就走延迟队列,则执行下面的流程:

  • (1) 不会修改 CommitLog 文件中的消息 Topic;而是在构建 ConsumeQueue 时,将消息路由到系统内部的 SCHEDULE_TOPIC_XXXX

  • (2) 根据延时等级,在 ConsumeQueue 目录中 SCHEDULE_TOPIC_XXXX 主题下创建出相应的 queueId 目录与 ConsumeQueue 文件(如果没有这些目录与文件的话)

    • 延迟等级 delayLevel 与 queueId 的对应关系为:queueId = delayLevel - 1
    • 需要注意,在创建 queueId 目录时,并不是一次性地将所有延迟等级对应的目录全部创建完毕,而是用到哪个延迟等级才创建哪个目录(懒创建机制)
  • (3) 修改消息索引单元内容;消息索引单元中的 Message Tag HashCode 部分原本存放的是消息的 Tag 的 Hash 值,现在将其用于存储消息的投递时间

    • 投递时间 = 消息存储时间 + 延时等级时间
    • 投递时间是指:延迟消息到达指定延时时长后,被恢复为原 Topic 并重新写入 CommitLog 文件的时间戳
    • 消息存储时间是指:消息被 Broker 接收并第一次写入 CommitLog 文件时的时间戳(即消息发送到 Broker 的时间)
    • 在 ConsumeQueue 文件中,单个消息索引单元的组成结构如下:
      • Commit Log Offset:记录消息在 CommitLog 文件中的位置
      • Size:记录消息的大小
      • Message Tag HashCode:记录消息的 Tag 的哈希值,用于消息过滤。注意,对于延迟消息,该字段实际存储的是消息的投递时间
  • (4) 将消息索引单元写入到 SCHEDULE_TOPIC_XXXX 主题下相应的 ConsumeQueue 文件中


在 RocketMQ 的 Broker 中,延迟消息的 ConsumeQueue 存储目录结构如下:

SCHEDULE_TOPIC_XXXX 目录中各个延时等级 Queue 中的消息是如何排序的?

SCHEDULE_TOPIC_XXXX 目录中各延迟等级 Queue 中的消息,是按照投递时间(即消息存储时间 + 延迟时长)进行排序的。在同一个 Broker 中,相同延迟等级的所有延时消息,都会被写入 consumequeue 目录下 SCHEDULE_TOPIC_XXXX 中对应的同一个 Queue 里。由于同一个 Queue 中的消息具有相同的延迟等级,因此它们的投递时间仅取决于消息存储时间(即消息发送到 Broker 的时间)。简而言之,在同一个延迟等级的 Queue 中,消息按投递时间排序,而投递时间由消息存储时间决定,因此本质上是按消息发送到 Broker 的时间先后进行排序

调度到期消息
  • Broker 内部有一个延迟消息服务类 ScheduleMessageService,它负责消费 SCHEDULE_TOPIC_XXXX 主题中的消息,并根据每条延迟消息的投递时间,将延迟消息投递到目标 Topic 中。
  • 在投递之前,ScheduleMessageService 会从 CommitLog 中重新读出原来写入的消息,恢复原始 Topic 和 queueId,并将其原来的延迟等级设置为 0,使该消息变为一条不延迟的普通消息,然后再将其投递到目标 Topic 中。
  • ScheduleMessageService 在 Broker 启动时,会创建并启动一个定时器 Timer,用于执行相应的定时任务。系统会根据延迟等级的个数,创建对应数量的 TimerTask,每个 TimerTask 负责处理一个特定延迟等级的消息消费与投递。
  • 每个 TimerTask 会不断检测对应 Queue 中的第一条消息是否到期。由于同一个 Queue 中的消息是按照投递时间排序的,因此如果第一条消息未到期,则后续所有消息也不会到期;只有当第一条消息到期时,才会将其投递到目标 Topic,然后继续检查下一条消息是否到期。
转投普通消息
  • 延迟消息服务类 ScheduleMessageService 会将到期的延迟消息再次写入 CommitLog,并重新生成新的消息索引条目(ConsumeQueue),最终分发到目标 Topic 对应的 Queue 中。
  • 这一过程本质上就是一次普通消息的发送流程,唯一的区别在于:这次消息的 “Producer” 不再是业务生产者,而是延迟消息服务类 ScheduleMessageService 自身。
整体流程

在 RocketMQ 中,延迟消息整体的工作流程图如下:

  • (1) Producer 将消息发送到 Broker 后,Broker 首先将消息写入 CommitLog 文件(第一次写入,包含原始 Topic 和 delayLevel 属性)

  • (2) Broker 根据消息是否携带延迟等级(delayLevel)进行分支处理:

    • 若无延迟等级:
      • 直接为目标 Topic 构建 ConsumeQueue,消费者可立即消费
    • 若有延迟等级:
      • 进入延迟消息处理流程
  • (3) 延迟消息处理流程(关键):

    • 不会修改 CommitLog 文件中的消息 Topic;
    • 而是在构建 ConsumeQueue 时,将消息路由到系统内部的 SCHEDULE_TOPIC_XXXX 主题
  • (4) 根据延迟等级确定 queueId,并写入延迟队列:

    • queueId 与 delayLevel 的关系为:queueId = delayLevel - 1
    • SCHEDULE_TOPIC_XXXX 主题下:
      • 按需创建对应的 queueId 目录与 ConsumeQueue 文件(如果没有这些目录与文件的话)
      • 在创建 queueId 目录时,并不是一次性地将所有延迟等级对应的目录全部创建完毕,而是用到哪个延迟等级才创建哪个目录
  • (5) 构建延迟队列的索引单元(ConsumeQueue):

    • 每个索引单元的组成结构如下:
      • CommitLog Offset:消息在 CommitLog 文件中的位置
      • Size:消息大小
      • TagsCode:记录消息的 Tag 的哈希值,用于消息过滤。注意,对于延迟消息,这个字段记录的是消息的投递时间
    • 投递时间计算方式:
      • 投递时间 = 消息存储时间 + 延迟时长
      • 消息存储时间:消息被 Broker 接收并第一次写入 CommitLog 文件时的时间戳(即消息发送到 Broker 的时间)
      • 投递时间:延迟消息到达指定延时时长后,被恢复为原 Topic 并重新写入 CommitLog 文件的时间戳
  • (6) 将索引单元写入 SCHEDULE_TOPIC_XXXX 主题对应的 ConsumeQueue 文件中(完成延迟队列的构建)

  • (7) ScheduleMessageService 定时调度处理:

    • Broker 内部的延迟消息服务 ScheduleMessageService 会按 delayLevel 维度进行调度
    • 定时扫描 SCHEDULE_TOPIC_XXXX 主题的各个 ConsumeQueue 文件
    • 判断消息的投递时间是否小于等于当前时间
  • (8) 延迟消息到期后的处理:

    • 若未到期:继续等待
    • 若已到期:
      • 根据索引单元读取 CommitLog 文件中的原始消息
      • 恢复原始 Topic 和 queueId,并移除 delayLevel 属性
      • 重新构造为一条新的 “普通消息”
  • (9) 将新消息重新写入 CommitLog 文件(第二次写入)

  • (10) Broker 为该新消息构建目标 Topic 的 ConsumeQueue

  • (11) 消费者从目标 Topic 拉取消息并进行正常消费

为什么延迟消息的底层实现,需要写两次 CommitLog 文件?

由于 CommitLog 文件是只追加(Append-Only)写入的文件,消息一旦写入,其内容(如 Topic 名称)就无法被修改或覆盖。为了实现 "延迟后投递消息",消息的 Topic 需要从系统内部的 SCHEDULE_TOPIC_XXXX 恢复为业务方的 Topic,这在技术上只能通过生成一条全新的消息,并再次执行 CommitLog 文件的写入操作来完成。

延迟消息的代码案例

生产者的代码
  • 生产者发送延迟消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
* 生产者发送延迟消息
*/
public class DelayProducer {

public static void main(String[] arsgs) throws Exception {
// 创建一个生产者,参数为 Producer Group(生产者组)的名称
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定 NameServer 的地址
producer.setNamesrvAddr("192.168.2.127:9876");
// 指定自动新创建的 Topic 的 Queue 数量,默认为 4
producer.setDefaultTopicQueueNums(4);
// 设置当发送失败时重试发送的次数,默认为 2 次
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时间,默认 3 秒
producer.setSendMsgTimeout(5000);

// 启动生产者
producer.start();

// 生产并发送 10 条消息
for (int i = 0; i < 10; i++) {
byte[] body = ("Hi, " + i).getBytes();
// 定义消息,包括设置 Topic、Tag、消息内容
Message msg = new Message("MyTopic", "MyTag", body);
// 为消息指定 Key
msg.setKeys("key-" + i);
// 指定消息延迟等级为 4(即消息延迟 30 秒才会被消费)
msg.setDelayTimeLevel(4);
// 同步发送消息
SendResult sendResult = producer.send(msg);
// 输出消息被发送的时间
System.out.print(new SimpleDateFormat("HH::mm:ss").format(new Date()));
System.out.println(", " + sendResult);
}

// 关闭生产者
producer.shutdown();
}

}
  • 程序运行输出的结果如下:
1
2
3
4
21:05:16, SendResult [sendStatus=SEND_OK, msgId=ACAF07010A4B512DDF177564BF8E0000, offsetMsgId=C0A8027F00002A9F0000000000075EF4, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=1], queueOffset=60]
21:05:16, SendResult [sendStatus=SEND_OK, msgId=ACAF07010A4B512DDF177564BFA00001, offsetMsgId=C0A8027F00002A9F0000000000075FEE, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=2], queueOffset=61]
21:05:16, SendResult [sendStatus=SEND_OK, msgId=ACAF07010A4B512DDF177564BFA40002, offsetMsgId=C0A8027F00002A9F00000000000760E8, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=3], queueOffset=62]
......
消费者的代码
  • 消费者消费消息(使用普通消息的消费者即可)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* 消费者消费消息(直接使用普通消息的消费者即可)
*/
public class OtherConsumer {

public static void main(String[] args) throws Exception {
// 定义一个 Push 消费者(使用 Push 消费方式),参数为 Consumer Group(消费者组)的名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");

// 指定 NameServer 的地址
consumer.setNamesrvAddr("192.168.2.127:9876");

// 指定消费线程的最小数量,默认是 20 个消费线程
consumer.setConsumeThreadMin(2);

// 指定消费线程的最大数量,默认是 20 个消费线程
consumer.setConsumeThreadMax(4);

// 指定从哪个位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

// 指定消费的 Topic 与 Tag
consumer.subscribe("MyTopic", "*");

// 指定消费模式,默认为 "集群消费模式"
consumer.setMessageModel(MessageModel.CLUSTERING);

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {

// 一旦 Broker 中有了其订阅的消息就会触发该方法的执行,其返回值为当前 Consumer 消费的状态
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 遍历消息
for (MessageExt msg : msgs) {
// 输出消息被消费的时间
System.out.print(new SimpleDateFormat("HH::mm:ss").format(new Date()));
System.out.println(", " + msg);
}
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 启动消费者
consumer.start();

// 等待消费者消费完
TimeUnit.SECONDS.sleep(120);

// 关闭消费者
consumer.shutdown();
}

}
  • 程序运行输出的结果如下(从消息被真实消费的时间来看,消息是延迟了 30s 后才被消费者接收到的):
1
2
3
4
21:05:46, MessageExt [brokerName=centos7, queueId=0, storeSize=238, queueOffset=15, sysFlag=0, bornTimestamp=1776942336954, bornHost=/192.168.2.140:39708, storeTimestamp=1776942366955, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F0000000000076F3A, commitLogOffset=487226, bodyCRC=1960927797, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=MyTopic, MAX_OFFSET=16, KEYS=key-7, CONSUME_START_TIME=1776942366959, UNIQ_KEY=ACAF07010A4B512DDF177564BFBA0007, CLUSTER=DefaultCluster, WAIT=true, DELAY=4, TAGS=MyTag, REAL_QID=0}, body=[72, 105, 44, 32, 55], transactionId='null'}]
21:05:46, MessageExt [brokerName=centos7, queueId=1, storeSize=238, queueOffset=17, sysFlag=0, bornTimestamp=1776942336958, bornHost=/192.168.2.140:39708, storeTimestamp=1776942366959, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F0000000000077028, commitLogOffset=487464, bodyCRC=1683914660, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=MyTopic, MAX_OFFSET=18, KEYS=key-8, CONSUME_START_TIME=1776942366963, UNIQ_KEY=ACAF07010A4B512DDF177564BFBE0008, CLUSTER=DefaultCluster, WAIT=true, DELAY=4, TAGS=MyTag, REAL_QID=1}, body=[72, 105, 44, 32, 56], transactionId='null'}]
21:05:46, MessageExt [brokerName=centos7, queueId=2, storeSize=238, queueOffset=18, sysFlag=0, bornTimestamp=1776942336961, bornHost=/192.168.2.140:39708, storeTimestamp=1776942366963, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F0000000000077116, commitLogOffset=487702, bodyCRC=324620082, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=MyTopic, MAX_OFFSET=19, KEYS=key-9, CONSUME_START_TIME=1776942366966, UNIQ_KEY=ACAF07010A4B512DDF177564BFC10009, CLUSTER=DefaultCluster, WAIT=true, DELAY=4, TAGS=MyTag, REAL_QID=2}, body=[72, 105, 44, 32, 57], transactionId='null'}]
......
代码测试结论
  • 最终,在 Broker 中 ConsumeQueue 的存储目录结构如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
~/store/consumequeue/
├── MyTopic
│   ├── 0
│   │   └── 00000000000000000000
│   ├── 1
│   │   └── 00000000000000000000
│   ├── 2
│   │   └── 00000000000000000000
│   └── 3
│   └── 00000000000000000000
├── SCHEDULE_TOPIC_XXXX
│   └── 3
│   └── 00000000000000000000

提示

在 RocketMQ 中,发送延迟消息时,生产者可以通过 msg.setDelayTimeLevel(4) 设置延迟等级;而消费者消费延迟消息的代码与消费普通消息完全一样,无需任何特殊处理,因为延迟消息到达消费者时已经变为普通消息。

事务消息

银行转账问题引入

这里的一个需求场景是:工商银行用户 A 向建设银行用户 B 转账 1 万元,可通过同步消息处理,流程如下:

  • (1) 工商银行系统发送一条同步消息 M(内容为给用户 B 增加 1 万元)至消息中间件(Broker)。
  • (2) Broker 成功接收消息 M 后,向工商银行系统返回成功确认(ACK)。
  • (3) 工商银行系统收到 ACK 后,从用户 A 的账户中扣款 1 万元。
  • (4) 建设银行系统从 Broker 中获取到消息 M。
  • (5) 建设银行系统消费消息 M,往用户 B 的账户中增加 1 万元。

特别注意,上面的执行流程是有问题的:如果第 3 步中的扣款操作失败,但消息已经成功发送到了 Broker。对于 MQ 来 说,只要消息写入成功,那么这个消息就可以被消费。此时工商银行系统中 A 用户没有扣款 1 万元,而建设银行系统中用户 B 增加了 1 万元,出现了数据不一致问题。

银行转账问题解决

上面银行转账问题的解决思路是:让第 1、2、3 步具备原子性,即要么全部成功,要么全部失败。也就是说,消息发送成功后,必须保证扣款成功;如果扣款失败,则需要回滚已成功发送的消息。该思路就是使用事务消息。因此,这里需要采用分布式事务的解决方案。

特别注意

RocketMQ 事务消息的标准术语是:发送半消息 -> 执行本地事务 -> 提交或回滚半消息

  • (1) 事务管理器(TM)向事务协调器(TC)发起指令,开启全局事务。
  • (2) 工行系统向 TC 发送一条包含 “给 B 增款 1 万元” 的业务消息 M。此时消息 M 只是普通业务数据,还没变成半消息(prepareHalf)。
  • (3) TC 将消息 M 以半消息(prepareHalf)的形式预提交到 Broker。此时消息 M 处于半消息状态,建行系统还看不到 Broker 中的消息 M(即对消费者不可见)。
  • (4) Broker 将半消息预提交的执行结果上报给 TC。
  • (5) 若半消息预提交失败,TC 向 TM 发送预提交失败的响应,全局事务结束;若半消息预提交成功,TC 调用工行系统的回调操作,完成工行用户 A 的预扣款 1 万元操作。
  • (6) 工行系统向 TC 发送预扣款执行结果(即本地事务的执行状态)。
  • (7) TC 收到预扣款执行结果后,将结果发送给 TM。
    • 预扣款执行结果存在三种可能性:
      1
      2
      3
      4
      5
      6
      // 描述本地事务执行状态
      public enum LocalTransactionState {
      COMMIT_MESSAGE, // 本地事务执行成功
      ROLLBACK_MESSAGE, // 本地事务执行失败
      UNKNOW, // 不确定,表示需要进行状态回查以确定本地事务的执行结果
      }
  • (8) TM 根据上报结果向 TC 发出不同的确认指令:
    • 若预扣款成功(本地事务状态为 COMMIT_MESSAGE),则 TM 向 TC 发送 Global Commit 指令。
    • 若预扣款失败(本地事务状态为 ROLLBACK_MESSAGE),则 TM 向 TC 发送 Global Rollback 指令。
    • 若事务状态未知(本地事务状态为 UNKNOWN),则触发工行系统的本地事务状态回查操作。
      • 回查操作将结果(COMMIT_MESSAGEROLLBACK_MESSAGE)上报给 TC;
      • TC 将结果发送给 TM;
      • TM 再向 TC 发送最终确认指令(Global Commit 或 Global Rollback)。
  • (9) TC 接收到指令后向 Broker 与工行系统发出确认指令:
    • 若 TC 收到 Global Commit 指令,则向 Broker 与工行系统发送 Branch Commit 指令:此时 Broker 中的消息 M 才可被建行系统看到,工行用户 A 中的扣款操作才真正被确认。
    • 若 TC 收到 Global Rollback 指令,则向 Broker 与工行系统发送 Branch Rollback 指令:此时 Broker 中的消息 M 将被撤销,工行用户 A 中的扣款操作将被回滚。

  • 以上事务消息方案的目的:确保消息投递与扣款操作处于同一个事务中,要么全部成功,要么全部失败(任一失败则全部回滚)。
  • 以上事务消息方案并非典型的 XA 模式,区别在于:
    • XA 模式:
      • 将消息发送和扣款作为两个同步的分支事务,通过两阶段提交(2PC)保证全局的强一致性(ACID)。
      • 该模式下,所有参与者在最终决策前都需要同步等待协调器的指令,并通过全局锁和两阶段同步阻塞来实现强一致性,因此性能较低,会短时锁定资源。
      • 该模式下,锁从第一阶段开始持有,直到第二阶段结束才释放,所以可以说两个阶段都是锁定资源的。因为跨多个参与者的网络通信、日志写入、协调者决策都可能耗时,加上可能的重试或故障等待,锁持有的时间比单机本地事务长很多。
    • 事务消息方案:
      • 则将消息预提交与本地扣款通过回调机制同步执行,但允许中间状态的存在:如果本地扣款失败,可以回滚半消息;如果成功,则消息变为可见。
      • 整个流程依赖 “半消息 + 本地事务 + 事务状态回查” 来保证最终一致性,而非两阶段同步阻塞,因此性能较好,资源锁定时间短。

事务消息的核心概念

分布式事务

通俗地说,分布式事务是指一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败。与普通事务一样,分布式事务的目的也是保证操作结果的一致性。

事务消息

RocketMQ 提供了类似 X/Open XA 的分布式事务功能,通过事务消息可以实现分布式事务的最终一致性。XA 是一种分布式事务解决方案,也是一种分布式事务处理模式。

半事务消息

半事务消息是指暂不能被投递的消息。发送方已成功将消息发送到 Broker,但 Broker 尚未收到最终确认指令,此时该消息被标记为 “暂不能投递” 状态,即不能被消费者看到。处于这种状态的消息即为半事务消息。

本地事务状态

本地事务状态是 Producer 回调操作执行后的结果。该状态有以下三种类型,会发送给 TC,然后 TC 再将其发送给 TM。最后,TM 根据 TC 发送过来的本地事务状态决定全局事务的确认指令(Global Commit 或 Global Rollback)。

1
2
3
4
5
6
// 描述本地事务执行状态
public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事务执行成功
ROLLBACK_MESSAGE, // 本地事务执行失败
UNKNOW, // 不确定,表示需要进行状态回查以确定本地事务的执行结果
}
事务状态回查

事务状态回查(也叫消息回差),即查询本地事务的执行状态。在上面的案例中,就是重新到数据库中查看预扣款操作是否执行成功。

  • 事务状态回查不同于重新执行回调操作。

    • 回调操作用于执行预扣款操作。
    • 事务状态回查则是查看预扣款操作已经执行的结果。
  • 触发事务状态回查的最常见原因有两个:

    • (1) 回调操作返回 UNKNOWN
    • (2) TC 没有接收到 TM 的最终全局事务确认指令。

事务状态回查配置

在 RocketMQ 中,关于事务状态回查机制,有三个常见的属性可以设置,均在 Broker 加载的配置文件中进行配置。例如,用于描述本地事务执行状态的枚举如下:

  • COMMIT_MESSAGE:本地事务执行成功
  • ROLLBACK_MESSAGE:本地事务执行失败
  • UNKNOW:不确定,表示需要进行回查以确定本地事务的执行结果

RocketMQ 中三个常见事务属性的配置如下:

  • transactionTimeout=20:指定 TM 应在 20 秒内将最终确认状态发送给 TC,否则会引发事务状态回查。默认值为 60 秒。
  • transactionCheckMax=5:指定最多回查 5 次,超过此次数后将丢弃消息并记录错误日志。默认值为 15 次。
  • transactionCheckInterval=10:指定多次事务状态回查之间的时间间隔为 10 秒。默认值为 60 秒。

事务消息的代码案例

提示

本节代码案例的业务背景是:工商银行用户 A 向建设银行用户 B 转账 1 万元,通过事务消息进行处理,完整的解决思路和处理流程可以看 这里,核心执行流程如下图所示。

  • 事务监听器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

/**
* 事务监听器(以工商银行转账为例)
*/
public class ICBCTransactionListener implements TransactionListener {

/**
* 回调操作方法 <br>
* 半消息预提交成功后,就会触发该方法的执行,用于完成本地事务(分支事务)的执行
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("半消息预提交成功" + msg);

// 假设接收到 TAGA 消息就表示扣款成功,可以提交半消息;TAGB 消息就表示扣款失败,需要回滚半消息;TAGC 消息就表示扣款结果不清楚,需要执行状态回查操作
if (StringUtils.equals("TAGA", msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TAGB", msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (StringUtils.equals("TAGC", msg.getTags())) {
return LocalTransactionState.UNKNOW;
}

return LocalTransactionState.UNKNOW;
}

/**
* 状态回查方法 <br>
* 触发状态回查(查询本地事务的执行状态)的最常见原因有两个:
* (1) 回调操作返回 UNKNWON
* (2) TC 没有接收到 TM 的最终全局事务确认指令
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("执行状态回查:" + msg.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}

}
  • 事务消息的生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 事务消息的生产者
*/
public class TransactionMsgProducer {

public static void main(String[] args) throws Exception {
// 定义一个线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});

// 定义一个事务消息的生产者
TransactionMQProducer producer = new TransactionMQProducer("pg");

// 为生产者指定 NameServer 的地址
producer.setNamesrvAddr("192.168.2.127:9876");

// 为生产者指定一个线程池
producer.setExecutorService(executorService);

// 为生产者添加事务监听器
producer.setTransactionListener(new ICBCTransactionListener());

// 启动生产者
producer.start();

// 生产者发送 3 条事务消息
String[] tags = {"TAGA", "TAGB", "TAGC"};
for (int i = 0; i < 3; i++) {
byte[] body = ("Hi," + i).getBytes();
// 封装事务消息
Message msg = new Message("TxTopic", tags[i], body);
// 发送事务消息,第二个参数用于指定在执行本地事务时要使用的业务参数
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println("事务消息发送结果为:" + sendResult.getSendStatus());
}
}

}
  • 程序运行输出的结果如下(在 ICBCTransactionListener 事务监听器中,如果处理的是 TAGC 消息,会触发状态回查操作的执行):
1
2
3
4
5
6
7
事务消息发送结果为:SEND_OK
消息预提交成功:Message{topic='TxTopic', flag=0, properties={TRAN_MSG=true, UNIQ_KEY=ACAF07017E49512DDF178EEB20940000, WAIT=true, PGROUP=pg, TAGS=TAGA}, body=[72, 105, 44, 48], transactionId='ACAF07017E49512DDF178EEB20940000'}
事务消息发送结果为:SEND_OK
消息预提交成功:Message{topic='TxTopic', flag=0, properties={TRAN_MSG=true, UNIQ_KEY=ACAF07017E49512DDF178EEB20E20001, WAIT=true, PGROUP=pg, TAGS=TAGB}, body=[72, 105, 44, 49], transactionId='ACAF07017E49512DDF178EEB20E20001'}
事务消息发送结果为:SEND_OK
消息预提交成功:Message{topic='TxTopic', flag=0, properties={TRAN_MSG=true, UNIQ_KEY=ACAF07017E49512DDF178EEB20E70002, WAIT=true, PGROUP=pg, TAGS=TAGC}, body=[72, 105, 44, 50], transactionId='ACAF07017E49512DDF178EEB20E70002'}
执行状态回查:TAGC
  • 消息的消费者(使用普通消息的消费者即可)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* 消息消费者(直接使用普通消息的消费者即可)
*/
public class SomeMsgConsumer {

public static void main(String[] args) throws Exception {
// 定义一个 Push 消费者(使用 Push 消费方式),参数为 Consumer Group(消费者组)的名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");

// 指定 NameServer 的地址
consumer.setNamesrvAddr("192.168.2.127:9876");

// 指定从哪个位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// 指定消费的 Topic 与 Tag
consumer.subscribe("TxTopic", "*");

// 指定消费模式,默认为 "集群消费模式"
consumer.setMessageModel(MessageModel.CLUSTERING);

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
// 一旦 Broker 中有了其订阅的消息就会触发该方法的执行,其返回值为当前 Consumer 消费的状态
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 遍历消息
for (MessageExt msg : msgs) {
System.out.println(msg);
}
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 启动消费者
consumer.start();

// 等待消费者消费完
TimeUnit.SECONDS.sleep(120);

// 关闭消费者
consumer.shutdown();
}

}
  • 程序运行输出的结果如下(这里不会接收到 TAGB 消息,因为在 ICBCTransactionListener 事务监听器中,TAGB 消息被 Broker 回滚了,也就是被撤销投递):
1
2
MessageExt [brokerName=centos7, queueId=1, storeSize=267, queueOffset=0, sysFlag=8, bornTimestamp=1777370574055, bornHost=/192.168.2.140:52296, storeTimestamp=1777370637662, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F0000000000077827, commitLogOffset=489511, bodyCRC=1225024847, reconsumeTimes=0, preparedTransactionOffset=489227, toString()=Message{topic='TxTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TxTopic, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, TRAN_MSG=true, CONSUME_START_TIME=1777370743758, UNIQ_KEY=ACAF07017E49512DDF178EEB20E70002, CLUSTER=DefaultCluster, WAIT=true, PGROUP=pg, TAGS=TAGC, REAL_QID=1}, body=[72, 105, 44, 50], transactionId='ACAF07017E49512DDF178EEB20E70002'}]
MessageExt [brokerName=centos7, queueId=3, storeSize=241, queueOffset=0, sysFlag=8, bornTimestamp=1777370573973, bornHost=/192.168.2.140:52296, storeTimestamp=1777370574053, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F0000000000077408, commitLogOffset=488456, bodyCRC=654967907, reconsumeTimes=0, preparedTransactionOffset=487940, toString()=Message{topic='TxTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TxTopic, MAX_OFFSET=1, TRAN_MSG=true, CONSUME_START_TIME=1777370744757, UNIQ_KEY=ACAF07017E49512DDF178EEB20940000, CLUSTER=DefaultCluster, WAIT=true, PGROUP=pg, TAGS=TAGA, REAL_QID=3}, body=[72, 105, 44, 48], transactionId='ACAF07017E49512DDF178EEB20940000'}]

分布式事务的补充内容

XA 协议的介绍
  • XA(Unix Transaction)是一种两阶段提交(2PC)分布式事务解决方案,也是一种两阶段提交(2PC)分布式事务处理模式,它基于 XA 协议实现。
  • XA 协议由 Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式操作扩展之后的 Unix 事务系统)首先提出,并交给 X/Open 组织,作为资源管理器与事务管理器的接口标准。
XA 模式三剑客

在 XA 模式中,有三个重要组件:TC、TM、RM。

  • TC(Transaction Coordinator):

    • 事务协调者。维护全局事务和分支事务的状态,驱动全局事务的提交或回滚。
    • 在 RocketMQ 中,Broker 充当着 TC。
  • TM(Transaction Manager):

    • 事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务,它实际上是全局事务的发起者。
    • 在 RocketMQ 中,事务消息的 Producer 充当着 TM。
  • RM(Resource Manager):

    • 资源管理器。管理分支事务处理的资源,与 TC 交互以注册分支事务和报告分支事务的状态,并驱动分支事务的提交或回滚。
    • 在 RocketMQ 中,事务消息的 Producer 及 Broker 均充当 RM。
XA 模式的架构

XA 模式是一个典型的 2PC(两阶段提交)分布式事务解决方案,其执行流程如下:

  • (1) TM 向 TC 发起指令,开启一个全局事务。
  • (2) 根据业务要求,各个 RM 逐个主动向 TC 注册分支事务。
  • (3) 分支事务注册成功后,各 RM 自行开始执行本地事务(即预执行)。
  • (4) 各个 RM 将预执行结果上报给 TC。该结果可能是成功,也可能是失败。
  • (5) TC 在接收到所有已注册 RM 上报的预执行结果后(实际由 TM 判断是否全部完成,TC 仅汇总已收到的结果),将汇总结果发送给 TM。
  • (6) TM 根据汇总结果向 TC 发出确认指令:
    • 若所有预执行结果均为成功响应,TM 则向 TC 发送 Global Commit 指令。
    • 若任意一个预执行结果存在失败响应,或等待超时,TM 则向 TC 发送 Global Rollback 指令。
  • (7) TC 在接收到确认指令后,再次向所有已注册的分支事务对应的 RM 发送相应的确认指令(提交或回滚)。

特别注意

事务消息方案并不是一种典型的 XA 模式。区别在于:XA 模式中,多个分支事务通过两阶段同步阻塞的方式实现强一致性;而事务消息方案中,消息预提交与预扣款操作之间通过回调机制同步执行,但整体依赖状态回查机制实现最终一致性,而非两阶段的全局锁等待。RocketMQ 的事务消息不支持延时消息。另外,对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交的情况)。