RocketMQ 4.x 入门教程之八

大纲

前言

学习资源

版本说明

本文所使用的各软件版本如下表所示:

组件版本说明
JDK11Java、SpringBoot、SpringCloud 项目案例使用的 JDK 版本
RocketMQ Server4.9.0RocketMQ 服务端(包括 NameServer 和 Broker),运行时依赖 JDK 1.8+
RocketMQ Client4.8.0RocketMQ 客户端(SDK),要求跟 RocketMQ Server 的版本严格匹配

RocketMQ 的应用

引入依赖

本文提供的 Java 代码,均基于以下版本的 RocketMQ Client 实现,要求部署的 RocketMQ Server 版本是 4.9.0

1
2
3
4
5
6
7
8
<dependencies>
<!-- RocketMQ 客户端(SDK),对应 RocketMQ Server 4.9.0 版本 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
</dependencies>

延迟消息

延迟消息的核心概念

  • 延迟消息的核心概念

    • 延迟消息是指:当消息写入到 Broker 后,只有在指定的时长之后才可被消费处理的消息。
    • 使用 RocketMQ 的延时消息可以实现定时任务的功能,而无需额外使用定时器。
    • 典型应用场景包括:电商交易中超时未支付则关闭订单,以及 12306 平台订票超时未支付则取消订票等场景。
  • 延迟消息的使用场景

    • 在电商平台中,订单创建成功后会发送一条延迟消息,该消息将在 30 分钟后投递给后台业务系统(Consumer)。后台业务系统收到消息后,会判断对应订单是否已完成支付:如果未完成支付,则取消订单,并将商品重新放回库存;如果已完成支付,则忽略该消息。
    • 在 12306 平台中,车票预订成功后也会发送一条延迟消息,该消息将在 45 分钟后投递给后台业务系统(Consumer)。后台业务系统收到消息后,会判断对应订单是否已完成支付:如果未完成支付,则取消预订,并将车票重新放回票池;如果已完成支付,则忽略该消息。

延迟消息的延迟等级

RocketMQ 的延时消息不支持任意时长的延迟,而是通过特定的延迟等级来指定。延迟等级(最多 18 个)定义在 RocketMQ 服务端的 MessageStoreConfig 类中的如下变量中:

例如,若指定的延迟等级为 3(延迟等级从 1 开始计数),则表示延迟时长为 10 秒。当然,如果需要自定义延迟等级,可以在 Broker 加载的配置文件中新增相应的配置,而配置文件位于 RocketMQ 安装目录下的 conf 目录中,单机版的配置文件名是 broker.conf。例如,下面增加了一个延迟时长为 1 天的等级 1d

1
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d

延迟等级的配置规则如下:

规则项说明
单位支持s(秒)、m(分)、h(小时)、d(天)
顺序要求必须按时间升序排列
数量限制默认最多支持 18 个级别(支持用户配置多于 18 个延迟等级)
配置后需要重启 Broker 才能生效

延迟等级使用的注意事项:

  • 事务消息不支持设置延迟等级
  • 如果 Broker 采用集群部署,需要确保所有 Broker 的延迟等级配置保持一致
  • 所有 Broker 修改完延迟等级后,延迟等级需要在所有 Broker 都重启后才能生效
  • 延迟等级的默认范围是:1 ≤ level ≤ 18(支持用户配置多于 18 个延迟等级),不支持精确到毫秒级的延迟等级
  • 增加或者删除延迟等级后,代码中发送消息时设置的延迟等级数值需要对应新的延迟等级顺序位置(从 1 开始计数)
  • 延迟消息不保证严格时序(网络抖动可能会导致存在微小的时间偏差)
  • 延迟消息消费失败并重试投递时,原延迟等级会失效,消息将按照默认的重试退避策略重新投递,而不会再次遵循原延时等级设置

延迟等级使用的最佳实践:

  • 优先使用预设延迟等级,避免频繁修改 Broker 配置
  • 高并发场景建议分散不同的延迟等级,防止大量延时消息在同一时刻同时投递,造成瞬间流量尖峰,压垮下游消费者

延迟消息的实现原理

工作流程

在 RocketMQ 中,延迟消息的工作流程图如下:

底层原理

RocketMQ 不同版本中延迟消息实现

从 RocketMQ 5.0 开始,延时消息的实现机制有了重大重构,引入了时间轮(TimerWheel)和 TimerLog 等组件,支持任意时长延迟,不再受固定延迟等级限制,其存储模型和调度方式与 4.x 版本也有很大不同,详细说明请看 这里

构建延时索引

Producer 将消息发送到 Broker 后,Broker 首先会将其写入到 CommitLog 文件中(第一次写入),然后根据消息中的 Topic 信息和 Queue 信息,将其分发到目标 Topic 的相应 ConsumeQueue。不过,在分发到 ConsumeQueue 之前,系统会先判断消息中是否带有延时等级;若没有,则直接正常分发;如果有就走延迟队列,则执行下面的流程:

  • (1) 不会修改 CommitLog 文件中的消息 Topic;而是在构建 ConsumeQueue 时,将消息路由到系统内部的 SCHEDULE_TOPIC_XXXX

  • (2) 根据延时等级,在 ConsumeQueue 目录中 SCHEDULE_TOPIC_XXXX 主题下创建出相应的 queueId 目录与 ConsumeQueue 文件(如果没有这些目录与文件的话)

    • 延迟等级 delayLevel 与 queueId 的对应关系为:queueId = delayLevel - 1
    • 需要注意,在创建 queueId 目录时,并不是一次性地将所有延迟等级对应的目录全部创建完毕,而是用到哪个延迟等级才创建哪个目录(懒创建机制)
  • (3) 修改消息索引单元内容;消息索引单元中的 Message Tag HashCode 部分原本存放的是消息的 Tag 的 Hash 值,现在将其用于存储消息的投递时间

    • 投递时间 = 消息存储时间 + 延时等级时间
    • 投递时间是指:延迟消息到达指定延时时长后,被恢复为原 Topic 并重新写入 CommitLog 文件的时间戳
    • 消息存储时间是指:消息被 Broker 接收并第一次写入 CommitLog 文件时的时间戳(即消息发送到 Broker 的时间)
    • 在 ConsumeQueue 文件中,单个消息索引单元的组成结构如下:
      • Commit Log Offset:记录消息在 CommitLog 文件中的位置
      • Size:记录消息的大小
      • Message Tag HashCode:记录消息的 Tag 的哈希值,用于消息过滤。注意,对于延迟消息,该字段实际存储的是消息的投递时间
  • (4) 将消息索引单元写入到 SCHEDULE_TOPIC_XXXX 主题下相应的 ConsumeQueue 文件中


在 RocketMQ 的 Broker 中,延迟消息的 ConsumeQueue 存储目录结构如下:

SCHEDULE_TOPIC_XXXX 目录中各个延时等级 Queue 中的消息是如何排序的?

SCHEDULE_TOPIC_XXXX 目录中各延迟等级 Queue 中的消息,是按照投递时间(即消息存储时间 + 延迟时长)进行排序的。在同一个 Broker 中,相同延迟等级的所有延时消息,都会被写入 consumequeue 目录下 SCHEDULE_TOPIC_XXXX 中对应的同一个 Queue 里。由于同一个 Queue 中的消息具有相同的延迟等级,因此它们的投递时间仅取决于消息存储时间(即消息发送到 Broker 的时间)。简而言之,在同一个延迟等级的 Queue 中,消息按投递时间排序,而投递时间由消息存储时间决定,因此本质上是按消息发送到 Broker 的时间先后进行排序

调度到期消息
  • Broker 内部有一个延迟消息服务类 ScheduleMessageService,它负责消费 SCHEDULE_TOPIC_XXXX 主题中的消息,并根据每条延迟消息的投递时间,将延迟消息投递到目标 Topic 中。
  • 在投递之前,ScheduleMessageService 会从 CommitLog 中重新读出原来写入的消息,恢复原始 Topic 和 queueId,并将其原来的延迟等级设置为 0,使该消息变为一条不延迟的普通消息,然后再将其投递到目标 Topic 中。
  • ScheduleMessageService 在 Broker 启动时,会创建并启动一个定时器 Timer,用于执行相应的定时任务。系统会根据延迟等级的个数,创建对应数量的 TimerTask,每个 TimerTask 负责处理一个特定延迟等级的消息消费与投递。
  • 每个 TimerTask 会不断检测对应 Queue 中的第一条消息是否到期。由于同一个 Queue 中的消息是按照投递时间排序的,因此如果第一条消息未到期,则后续所有消息也不会到期;只有当第一条消息到期时,才会将其投递到目标 Topic,然后继续检查下一条消息是否到期。
转投普通消息
  • 延迟消息服务类 ScheduleMessageService 会将到期的延迟消息再次写入 CommitLog,并重新生成新的消息索引条目(ConsumeQueue),最终分发到目标 Topic 对应的 Queue 中。
  • 这一过程本质上就是一次普通消息的发送流程,唯一的区别在于:这次消息的 “Producer” 不再是业务生产者,而是延迟消息服务类 ScheduleMessageService 自身。
整体流程

在 RocketMQ 中,延迟消息整体的工作流程图如下:

  • (1) Producer 将消息发送到 Broker 后,Broker 首先将消息写入 CommitLog 文件(第一次写入,包含原始 Topic 和 delayLevel 属性)

  • (2) Broker 根据消息是否携带延迟等级(delayLevel)进行分支处理:

    • 若无延迟等级:
      • 直接为目标 Topic 构建 ConsumeQueue,消费者可立即消费
    • 若有延迟等级:
      • 进入延迟消息处理流程
  • (3) 延迟消息处理流程(关键):

    • 不会修改 CommitLog 文件中的消息 Topic;
    • 而是在构建 ConsumeQueue 时,将消息路由到系统内部的 SCHEDULE_TOPIC_XXXX 主题
  • (4) 根据延迟等级确定 queueId,并写入延迟队列:

    • queueId 与 delayLevel 的关系为:queueId = delayLevel - 1
    • SCHEDULE_TOPIC_XXXX 主题下:
      • 按需创建对应的 queueId 目录与 ConsumeQueue 文件(如果没有这些目录与文件的话)
      • 在创建 queueId 目录时,并不是一次性地将所有延迟等级对应的目录全部创建完毕,而是用到哪个延迟等级才创建哪个目录
  • (5) 构建延迟队列的索引单元(ConsumeQueue):

    • 每个索引单元的组成结构如下:
      • CommitLog Offset:消息在 CommitLog 文件中的位置
      • Size:消息大小
      • TagsCode:记录消息的 Tag 的哈希值,用于消息过滤。注意,对于延迟消息,这个字段记录的是消息的投递时间
    • 投递时间计算方式:
      • 投递时间 = 消息存储时间 + 延迟时长
      • 消息存储时间:消息被 Broker 接收并第一次写入 CommitLog 文件时的时间戳(即消息发送到 Broker 的时间)
      • 投递时间:延迟消息到达指定延时时长后,被恢复为原 Topic 并重新写入 CommitLog 文件的时间戳
  • (6) 将索引单元写入 SCHEDULE_TOPIC_XXXX 主题对应的 ConsumeQueue 文件中(完成延迟队列的构建)

  • (7) ScheduleMessageService 定时调度处理:

    • Broker 内部的延迟消息服务 ScheduleMessageService 会按 delayLevel 维度进行调度
    • 定时扫描 SCHEDULE_TOPIC_XXXX 主题的各个 ConsumeQueue 文件
    • 判断消息的投递时间是否小于等于当前时间
  • (8) 延迟消息到期后的处理:

    • 若未到期:继续等待
    • 若已到期:
      • 根据索引单元读取 CommitLog 文件中的原始消息
      • 恢复原始 Topic 和 queueId,并移除 delayLevel 属性
      • 重新构造为一条新的 “普通消息”
  • (9) 将新消息重新写入 CommitLog 文件(第二次写入)

  • (10) Broker 为该新消息构建目标 Topic 的 ConsumeQueue

  • (11) 消费者从目标 Topic 拉取消息并进行正常消费

为什么延迟消息的底层实现,需要写两次 CommitLog 文件?

由于 CommitLog 文件是只追加(Append-Only)写入的文件,消息一旦写入,其内容(如 Topic 名称)就无法被修改或覆盖。为了实现 "延迟后投递消息",消息的 Topic 需要从系统内部的 SCHEDULE_TOPIC_XXXX 恢复为业务方的 Topic,这在技术上只能通过生成一条全新的消息,并再次执行 CommitLog 文件的写入操作来完成。

延迟消息的代码案例

生产者的代码
  • 生产者发送延迟消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
* 生产者发送延迟消息
*/
public class DelayProducer {

public static void main(String[] arsgs) throws Exception {
// 创建一个生产者,参数为 Producer Group(生产者组)的名称
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定 NameServer 的地址
producer.setNamesrvAddr("192.168.2.127:9876");
// 指定自动新创建的 Topic 的 Queue 数量,默认为 4
producer.setDefaultTopicQueueNums(4);
// 设置当发送失败时重试发送的次数,默认为 2 次
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时间,默认 3 秒
producer.setSendMsgTimeout(5000);

// 启动生产者
producer.start();

// 生产并发送 10 条消息
for (int i = 0; i < 10; i++) {
byte[] body = ("Hi, " + i).getBytes();
// 定义消息,包括设置 Topic、Tag、消息内容
Message msg = new Message("MyTopic", "MyTag", body);
// 为消息指定 Key
msg.setKeys("key-" + i);
// 指定消息延迟等级为 4(即消息延迟 30 秒才会被消费)
msg.setDelayTimeLevel(4);
// 同步发送消息
SendResult sendResult = producer.send(msg);
// 输出消息被发送的时间
System.out.print(new SimpleDateFormat("HH::mm:ss").format(new Date()));
System.out.println(", " + sendResult);
}

// 关闭生产者
producer.shutdown();
}

}
  • 程序运行输出的结果如下:
1
2
3
4
21:05:16, SendResult [sendStatus=SEND_OK, msgId=ACAF07010A4B512DDF177564BF8E0000, offsetMsgId=C0A8027F00002A9F0000000000075EF4, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=1], queueOffset=60]
21:05:16, SendResult [sendStatus=SEND_OK, msgId=ACAF07010A4B512DDF177564BFA00001, offsetMsgId=C0A8027F00002A9F0000000000075FEE, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=2], queueOffset=61]
21:05:16, SendResult [sendStatus=SEND_OK, msgId=ACAF07010A4B512DDF177564BFA40002, offsetMsgId=C0A8027F00002A9F00000000000760E8, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=3], queueOffset=62]
......
消费者的代码
  • 消费者消费消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* 消费者消费消息
*/
public class OtherConsumer {

public static void main(String[] args) throws Exception {
// 定义一个 Push 消费者(使用 Push 消费方式),参数为 Consumer Group(消费者组)的名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");

// 指定 NameServer 的地址
consumer.setNamesrvAddr("192.168.2.127:9876");

// 指定消费线程的最小数量,默认是 20 个消费线程
consumer.setConsumeThreadMin(2);

// 指定消费线程的最大数量,默认是 20 个消费线程
consumer.setConsumeThreadMax(4);

// 指定从哪个位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

// 指定消费的 Topic 与 Tag
consumer.subscribe("MyTopic", "*");

// 指定消费模式,默认为 "集群消费模式"
consumer.setMessageModel(MessageModel.CLUSTERING);

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {

// 一旦 Broker 中有了其订阅的消息就会触发该方法的执行,其返回值为当前 Consumer 消费的状态
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 遍历消息
for (MessageExt msg : msgs) {
// 输出消息被消费的时间
System.out.print(new SimpleDateFormat("HH::mm:ss").format(new Date()));
System.out.println(", " + msg);
}
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 启动消费者
consumer.start();

// 等待消费者消费完
TimeUnit.SECONDS.sleep(120);

// 关闭消费者
consumer.shutdown();
}

}
  • 程序运行输出的结果如下(从消息被真实消费的时间来看,消息是延迟了 30s 后才被消费者接收到的):
1
2
3
4
21:05:46, MessageExt [brokerName=centos7, queueId=0, storeSize=238, queueOffset=15, sysFlag=0, bornTimestamp=1776942336954, bornHost=/192.168.2.140:39708, storeTimestamp=1776942366955, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F0000000000076F3A, commitLogOffset=487226, bodyCRC=1960927797, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=MyTopic, MAX_OFFSET=16, KEYS=key-7, CONSUME_START_TIME=1776942366959, UNIQ_KEY=ACAF07010A4B512DDF177564BFBA0007, CLUSTER=DefaultCluster, WAIT=true, DELAY=4, TAGS=MyTag, REAL_QID=0}, body=[72, 105, 44, 32, 55], transactionId='null'}]
21:05:46, MessageExt [brokerName=centos7, queueId=1, storeSize=238, queueOffset=17, sysFlag=0, bornTimestamp=1776942336958, bornHost=/192.168.2.140:39708, storeTimestamp=1776942366959, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F0000000000077028, commitLogOffset=487464, bodyCRC=1683914660, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=MyTopic, MAX_OFFSET=18, KEYS=key-8, CONSUME_START_TIME=1776942366963, UNIQ_KEY=ACAF07010A4B512DDF177564BFBE0008, CLUSTER=DefaultCluster, WAIT=true, DELAY=4, TAGS=MyTag, REAL_QID=1}, body=[72, 105, 44, 32, 56], transactionId='null'}]
21:05:46, MessageExt [brokerName=centos7, queueId=2, storeSize=238, queueOffset=18, sysFlag=0, bornTimestamp=1776942336961, bornHost=/192.168.2.140:39708, storeTimestamp=1776942366963, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F0000000000077116, commitLogOffset=487702, bodyCRC=324620082, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=MyTopic, MAX_OFFSET=19, KEYS=key-9, CONSUME_START_TIME=1776942366966, UNIQ_KEY=ACAF07010A4B512DDF177564BFC10009, CLUSTER=DefaultCluster, WAIT=true, DELAY=4, TAGS=MyTag, REAL_QID=2}, body=[72, 105, 44, 32, 57], transactionId='null'}]
......
代码测试结论
  • 最终,在 Broker 中 ConsumeQueue 的存储目录结构如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
~/store/consumequeue/
├── MyTopic
│   ├── 0
│   │   └── 00000000000000000000
│   ├── 1
│   │   └── 00000000000000000000
│   ├── 2
│   │   └── 00000000000000000000
│   └── 3
│   └── 00000000000000000000
├── SCHEDULE_TOPIC_XXXX
│   └── 3
│   └── 00000000000000000000

提示

在 RocketMQ 中,发送延迟消息时,生产者可以通过 msg.setDelayTimeLevel(4) 设置延迟等级;而消费者消费延迟消息的代码与消费普通消息完全一样,无需任何特殊处理,因为延迟消息到达消费者时已经变为普通消息。