RocketMQ 4.x 入门教程之七

大纲

前言

学习资源

版本说明

本文所使用的各软件版本如下表所示:

组件版本说明
JDK11Java、SpringBoot、SpringCloud 项目案例使用的 JDK 版本
RocketMQ Server4.9.0RocketMQ 服务端(包括 NameServer 和 Broker),运行时依赖 JDK 1.8+
RocketMQ Client4.8.0RocketMQ 客户端(SDK),要求跟 RocketMQ Server 的版本严格匹配

RocketMQ 的应用

引入依赖

本文提供的 Java 代码,均基于以下版本的 RocketMQ Client 实现,要求部署的 RocketMQ Server 版本是 4.9.0

1
2
3
4
5
6
7
8
<dependencies>
<!-- RocketMQ 客户端(SDK),对应 RocketMQ Server 4.9.0 版本 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
</dependencies>

普通消息

消息发送的方式

Producer 对于消息的发送方式有多种选择,不同的发送方式会产生不同的系统效果。

同步发送消息

同步发送消息是指:Producer 发出一条消息后,必须等到 MQ 返回 ACK 确认才能发送下一条消息;该方式消息可靠性最高,但发送效率较低。

异步发送消息

异步发送消息是指:Producer 发出一条消息后无需等待 MQ 返回 ACK 确认,即可直接发送下一条消息;该方式可以保障一定的消息可靠性,同时也提升了发送效率。

单向发送消息

单向发送消息是指:Producer 仅负责发送消息(从调用线程的角度看是同步执行的,并不会在后台另起线程异步执行),不等待、也不处理 MQ 返回的 ACK 确认(MQ 本身也不会返回 ACK);该方式发送效率最高,但消息可靠性较差。

发送方式的区别
发送方式方法调用是否等待 Broker 响应(ACK)可靠性典型场景
单向 (One-way)sendOneway()不阻塞等待可能丢失消息日志收集、监控数据上报
异步 (Async)send() + SendCallback不阻塞等待,通过回调获取消息发送结果不丢失消息视频转码、实时数据处理
同步 (Sync)send()阻塞等待直到 Broker 返回 ACK 不丢失消息订单交易、重要通知

消息发送代码案例

  • 在 RocketMQ 中,消息发送的状态有以下几种:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 消息发送的状态
*/
public enum SendStatus {

/**
* 发送成功
*/
SEND_OK,

/**
* 刷盘超时
* 当 Broker 设置的刷盘策略为同步刷盘时才可能出现这种异常状态,异步刷盘不会出现
*/
FLUSH_DISK_TIMEOUT,

/**
* Slave 同步超时
* 当 Broker 集群设置的 Master-Slave 的复制方式为同步复制时才可能出现这种异常状态,异步复制不会出现
*/
FLUSH_SLAVE_TIMEOUT,

/**
* 没有可用的 Slave
* 当 Broker 集群设置为 Master-Slave 的复制方式为同步复制时才可能出现这种异常状态,异步复制不会出现
*/
SLAVE_NOT_AVAILABLE
}
同步发送消息
  • 生产者同步发送普通消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
* 生产者同步发送普通消息
*/
public class SyncProducer {

public static void main(String[] arsgs) throws Exception {
// 创建一个生产者,参数为 Producer Group(生产者组)的名称
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定 NameServer 的地址
producer.setNamesrvAddr("192.168.2.127:9876");
// 指定自动新创建的 Topic 的 Queue 数量,默认为 4
producer.setDefaultTopicQueueNums(4);
// 设置当发送失败时重试发送的次数,默认为 2 次
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时间,默认 3 秒
producer.setSendMsgTimeout(5000);

// 启动生产者
producer.start();

// 生产并发送 10 条消息
for (int i = 0; i < 10; i++) {
byte[] body = ("Hi, " + i).getBytes();
// 定义消息,包括设置 Topic、Tag、消息内容
Message msg = new Message("MyTopic", "MyTag", body);
// 为消息指定 Key
msg.setKeys("key-" + i);
// 同步发送消息
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}

// 关闭生产者
producer.shutdown();
}

}
  • 程序运行输出的结果如下:
1
2
3
4
5
6
7
8
9
10
SendResult [sendStatus=SEND_OK, msgId=ACAF070152A9512DDF176F0403AC0000, offsetMsgId=C0A8027F00002A9F000000000006EC94, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=ACAF070152A9512DDF176F0403D60001, offsetMsgId=C0A8027F00002A9F000000000006ED5C, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=ACAF070152A9512DDF176F0403DB0002, offsetMsgId=C0A8027F00002A9F000000000006EE24, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=ACAF070152A9512DDF176F0403E20003, offsetMsgId=C0A8027F00002A9F000000000006EEEC, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=ACAF070152A9512DDF176F0403E50004, offsetMsgId=C0A8027F00002A9F000000000006EFB4, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=ACAF070152A9512DDF176F0403E90005, offsetMsgId=C0A8027F00002A9F000000000006F07C, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=ACAF070152A9512DDF176F0403EC0006, offsetMsgId=C0A8027F00002A9F000000000006F144, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=ACAF070152A9512DDF176F0403F00007, offsetMsgId=C0A8027F00002A9F000000000006F20C, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=ACAF070152A9512DDF176F0403F30008, offsetMsgId=C0A8027F00002A9F000000000006F2D4, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=ACAF070152A9512DDF176F0403F70009, offsetMsgId=C0A8027F00002A9F000000000006F39C, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=2], queueOffset=2]
异步发送消息
  • 生产者异步发送普通消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* 生产者异步发送普通消息
*/
public class AsyncProducer {

public static void main(String[] arsgs) throws Exception {
// 创建一个生产者,参数为 Producer Group(生产者组)的名称
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定 NameServer 的地址
producer.setNamesrvAddr("192.168.2.127:9876");
// 指定自动新创建的 Topic 的 Queue 数量,默认为 4
producer.setDefaultTopicQueueNums(4);
// 设置当异步发送失败时重试发送的次数
producer.setRetryTimesWhenSendAsyncFailed(0);

// 启动生产者
producer.start();

// 消息数量
final int size = 10;

// 消息发送计数器
CountDownLatch countDownLatch = new CountDownLatch(size);

// 生产并发送 10 条消息
for (int i = 0; i < size; i++) {
byte[] body = ("Hi, " + i).getBytes();
try {
// 定义消息,包括设置 Topic、Tag、消息内容
Message msg = new Message("MyTopic", "MyTag", body);
// 为消息指定 Key
msg.setKeys("key-" + i);
// 异步发送消息,指定回调接口
producer.send(msg, new SendCallback() {

// 当生产者接收到 MQ 发送来的 ACK 后,就会触发该回调方法的执行
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.println(sendResult);
}

@Override
public void onException(Throwable e) {
countDownLatch.countDown();
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}

// 由于采用的是异步发送,所以如果这里不等待消息发送完成,就会将生产者提前关闭掉,导致消息发送失败
countDownLatch.await(5, TimeUnit.SECONDS);

// 关闭生产者
producer.shutdown();
}

}
  • 程序运行输出的结果如下:
1
2
3
4
5
6
7
8
9
10
SendResult [sendStatus=SEND_OK, msgId=ACAF070182E6512DDF176F3461A30001, offsetMsgId=C0A8027F00002A9F00000000000715FC, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=1], queueOffset=16]
SendResult [sendStatus=SEND_OK, msgId=ACAF070182E6512DDF176F3461A60003, offsetMsgId=C0A8027F00002A9F00000000000713A4, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=3], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=ACAF070182E6512DDF176F3461AA0004, offsetMsgId=C0A8027F00002A9F00000000000716C4, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=0], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=ACAF070182E6512DDF176F3461A30000, offsetMsgId=C0A8027F00002A9F0000000000071534, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=2], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=ACAF070182E6512DDF176F3461A40002, offsetMsgId=C0A8027F00002A9F000000000007146C, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=3], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=ACAF070182E6512DDF176F3461AF0005, offsetMsgId=C0A8027F00002A9F000000000007178C, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=3], queueOffset=13]
SendResult [sendStatus=SEND_OK, msgId=ACAF070182E6512DDF176F3461B60006, offsetMsgId=C0A8027F00002A9F0000000000071854, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=2], queueOffset=12]
SendResult [sendStatus=SEND_OK, msgId=ACAF070182E6512DDF176F3461BA0007, offsetMsgId=C0A8027F00002A9F000000000007191C, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=1], queueOffset=17]
SendResult [sendStatus=SEND_OK, msgId=ACAF070182E6512DDF176F3461BE0008, offsetMsgId=C0A8027F00002A9F00000000000719E4, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=2], queueOffset=13]
SendResult [sendStatus=SEND_OK, msgId=ACAF070182E6512DDF176F3461C10009, offsetMsgId=C0A8027F00002A9F0000000000071AAC, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=1], queueOffset=18]
单向发送消息
  • 生产者单向发送普通消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

/**
* 生产者单向发送普通消息
*/
public class OnewayProducer {

public static void main(String[] arsgs) throws Exception {
// 创建一个生产者,参数为 Producer Group(生产者组)的名称
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定 NameServer 的地址
producer.setNamesrvAddr("192.168.2.127:9876");
// 指定自动新创建的 Topic 的 Queue 数量,默认为 4
producer.setDefaultTopicQueueNums(4);

// 启动生产者
producer.start();

// 生产并发送 10 条消息
for (int i = 0; i < 10; i++) {
byte[] body = ("Hi, " + i).getBytes();
// 定义消息,包括设置 Topic、Tag、消息内容
Message msg = new Message("MyTopic", "MyTag", body);
// 为消息指定 Key
msg.setKeys("key-" + i);
// 单向发送消息(从调用线程的角度看是同步执行的,并不会在后台另起线程异步执行)
producer.sendOneway(msg);
}

// 关闭生产者
producer.shutdown();
}

}

消息消费代码案例

消费位置说明

ConsumeFromWhere 枚举中常用且未废弃的常量

常量值含义适用场景
CONSUME_FROM_FIRST_OFFSET从最早位置开始消费
消费者组首次启动时,从队列的最前位置开始,会消费队列中所有已存在(未被删除)的历史消息。
需要全量数据的场景。例如数据迁移、重新构建状态、故障复盘分析,或确保新接入的消费者能处理所有历史消息。
CONSUME_FROM_LAST_OFFSET从最新位置开始消费
消费者组首次启动时,预期行为是从队列的最后位置开始,只消费此后新产生的消息。但此行为强依赖 RocketMQ 版本:
5.x 版本:符合预期,始终从最新消息开始,不消费历史消息。
4.x 及更早版本:存在特殊逻辑,若队列中有存在保留时间少于等于 3 天的历史消息,会自动回溯消费,实际从 3 天内的最早位置(第一条消息)开始消费;仅当所有历史消息均超过 3 天或队列为空时,才会从最新位置开始消费。
大多数常规业务场景。新上线的消费者只关心未来的实时数据,避免处理大量无关的历史堆积消息,实现快速启动。
版本建议:若需严格保证 “不消费历史消息”,推荐升级至 5.x 版本,或在 4.x 版本中通过手动设置消费位置实现。
CONSUME_FROM_TIMESTAMP从指定时间点开始消费
消费者组首次启动时,从用户指定的一个具体时间戳开始消费,消费该时间点之后到达的消息。
回溯消费。需要从某个特定时间点(如昨天上午 10 点)开始处理消息,而不是处理全部历史或只处理未来的消息。

特别注意

ConsumeFromWhere 的配置仅在消费者组首次启动且查询不到任何消费进度时生效。一旦该消费者组在 Broker 端或者 Consumer 端上保留了消费进度(offset),此后无论 ConsumeFromWhere 设置为何值,都将从上一次记录的消费位置继续消费,以确保消息不丢失也不重复消费

CONSUME_FROM_FIRST_OFFSET 的作用

CONSUME_FROM_FIRST_OFFSET 在 RocketMQ 中表示:对于一个全新的消费者组(即没有已记录的任何消费进度),当它第一次启动时,会从队列中最早的消息开始消费;这个参数只在消费者组第一次启动时起作用,后续该消费者组再次启动时,会遵循已记录的消费进度(offset)继续消费,此设置不再生效。下表展示了在不同场景下,设置为 CONSUME_FROM_FIRST_OFFSET 的具体表现:

场景消费行为说明
消费者组首次启动从队列中最早的消息开始消费消费位置会被初始化为 MinOffset(最小消息位点),即历史所有消息都会被消费一次。
已有消费进度(offset)的消费者组再次启动从上一次提交的消费位置继续消费 Broker 端已保存了该消费者组的消费进度,新的启动会基于该进度继续,与 CONSUME_FROM_FIRST_OFFSET 设置无关。
消费进度(offset)已过期(被删除)从服务端当前可查的最早消息开始消费如果保存的消费位置因为消息过期被删除,服务端会自动将其重置为 MinOffset

假设一个 Topic 的某个队列中已经存有 Msg1、Msg2、Msg3 三条消息,一个全新的消费者组配置了 CONSUME_FROM_FIRST_OFFSET,则:

启动时机消费者表现
消费者组第一次启动从 Msg1 开始,按顺序消费 Msg1、Msg2、Msg3。
消费者组消费完 Msg2 后,程序重启从 Broker 端记录的消费进度(Msg3)开始,只消费 Msg3。

CONSUME_FROM_FIRST_OFFSET 确保了新接入的消费者组可以访问到 Topic 内的全量历史数据,但请根据实际业务需求选择,避免不必要的全量消费。

CONSUME_FROM_LAST_OFFSET 在 RocketMQ 不同版本中的行为差异

RocketMQ 版本CONSUME_FROM_LAST_OFFSET 行为
3.x / 4.x消费者组首次启动时:如果有最近 3 天内的消息(默认消息保留时间为 3 天),则会从服务器中当前仍然保存的最早消息开始消费;如果所有消息的保留时间都超过 3 天或队列为空,则从最新消息开始消费
5.x消费者组首次启动时:始终从最新消息开始消费,不受消息年龄影响

简而言之,RocketMQ 3.x / 4.x 版本的 SDK 对于消费者组的首次启动,CONSUME_FROM_LAST_OFFSET 的行为取决于队列中消息的年龄。

队列中消息的情况实际消费起点原因
有消息且消息存在时间 < 3 天从 3 天内的第一条消息开始消费触发 “回溯消费” 逻辑,默认会消费近 3 天的历史消息
有消息但消息存在时间 ≥ 3 天从最新消息开始消费消息超过 3 天,被视为 “已过期”,不再进行回溯消费
队列为空从最新位置开始消费无消息可消费,等待新消息
消费消息案例
  • 消费者消费普通消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* 消费者消费普通消息
*/
public class MessageConsumer {

public static void main(String[] args) throws Exception {
// 如果需要使用 Pull 消费者(使用 Pull 消费方式),可以使用 DefaultLitePullConsumer(如下所示)
// DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");

// 定义一个 Push 消费者(使用 Push 消费方式),参数为 Consumer Group(消费者组)的名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");

// 指定 NameServer 的地址
consumer.setNamesrvAddr("192.168.2.127:9876");

// 指定消费线程的最小数量,默认是 20 个消费线程
consumer.setConsumeThreadMin(2);

// 指定消费线程的最大数量,默认是 20 个消费线程
consumer.setConsumeThreadMax(4);

// 指定从哪个位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

// 指定消费的 Topic 与 Tag
consumer.subscribe("MyTopic", "*");

// 指定消费模式,默认为 "集群消费模式"
consumer.setMessageModel(MessageModel.CLUSTERING);

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {

// 一旦 Broker 中有了其订阅的消息就会触发该方法的执行,其返回值为当前 Consumer 消费的状态
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 遍历消息
for (MessageExt msg : msgs) {
System.out.println(msg);
}
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 启动消费者
consumer.start();

// 等待消费者消费完
TimeUnit.SECONDS.sleep(15);

// 关闭消费者
consumer.shutdown();
}
}
  • 程序运行输出的结果如下:
1
2
3
4
MessageExt [brokerName=centos7, queueId=3, storeSize=200, queueOffset=23, sysFlag=0, bornTimestamp=1776849315067, bornHost=/192.168.2.140:57354, storeTimestamp=1776849315074, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F00000000000732E4, commitLogOffset=471780, bodyCRC=1787164566, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, KEYS=key-0, CONSUME_START_TIME=1776849333630, UNIQ_KEY=ACAF07012CF8512DDF176FD958FA0000, CLUSTER=DefaultCluster, WAIT=true, TAGS=MyTag}, body=[72, 105, 44, 32, 48], transactionId='null'}]
MessageExt [brokerName=centos7, queueId=1, storeSize=200, queueOffset=25, sysFlag=0, bornTimestamp=1776849315082, bornHost=/192.168.2.140:57354, storeTimestamp=1776849315083, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F0000000000073474, commitLogOffset=472180, bodyCRC=76256954, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=27, KEYS=key-2, CONSUME_START_TIME=1776849333630, UNIQ_KEY=ACAF07012CF8512DDF176FD9590A0002, CLUSTER=DefaultCluster, WAIT=true, TAGS=MyTag}, body=[72, 105, 44, 32, 50], transactionId='null'}]
MessageExt [brokerName=centos7, queueId=2, storeSize=200, queueOffset=22, sysFlag=0, bornTimestamp=1776849315100, bornHost=/192.168.2.140:57354, storeTimestamp=1776849315100, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000007385C, commitLogOffset=473180, bodyCRC=1960927797, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=23, KEYS=key-7, CONSUME_START_TIME=1776849333632, UNIQ_KEY=ACAF07012CF8512DDF176FD9591C0007, CLUSTER=DefaultCluster, WAIT=true, TAGS=MyTag}, body=[72, 105, 44, 32, 55], transactionId='null'}]
......

顺序消息

在 RocketMQ 中,提供了延迟消息和事务消息等机制,但并没有单独的 “顺序消息类型” 这一内置消息概念。所谓顺序消息,本质上是开发者通过 Producer 端的 MessageQueueSelector 接口进行 Queue 选择策略设计,并结合 Consumer 端的顺序消费能力(比如:每个线程独立消费一个 Queue)来共同实现的。因此,在 RocketMQ 中,顺序消息并不是一种独立的消息类型,而是一种基于消息发送路由策略与消费顺序控制共同实现的业务语义。

顺序消息的核心概念

  • 顺序消息是指严格按照消息发送的先后顺序进行消费的消息(FIFO,先进先出)。
  • 默认情况下,生产者会以 Round Robin(轮询)的方式将消息轮询发送到多个 Queue(分区)中;而消费者在消费时也会从多个 Queue(分区)拉取消息。在这种情况下,消息的发送与消费都无法保证全局顺序。
  • 如果将相关消息只发送到同一个 Queue(分区),并且消费时保证该 Queue(分区)由单一消费者顺序消费,则可以保证消息的有序性。

为什么需要顺序消息

  • 例如,有一个 Topic:ORDER_STATUS(订单状态),其下包含 4 个 Queue(队列)。该 Topic 中的消息用于描述订单在不同阶段的状态变化,例如:未支付、已支付、发货中、发货成功、发货失败。
  • 对于同一个订单,其状态变化具有严格的时序关系。以订单 T0000001 为例,生产者按照时间顺序会依次发送如下消息:
  • 订单 T0000001:未支付 → 订单 T0000001:已支付 → 订单 T0000001:发货中 → 订单 T0000001:发货失败
  • 如果这些消息在消费时顺序被打乱,例如先消费到 “发货中” 再消费到 “已支付”,就会导致业务状态异常。因此,在这种场景下需要使用顺序消息,以保证同一订单的状态变更能够按照发送顺序被正确消费。

当消息发送到 MQ 之后,如果 Queue 的选择采用轮询策略,消息在 MQ 中的存储情况可能如下:

这种情况下,往往是希望 Consumer 消费消息的顺序和 Producer 发送消息的顺序是一致的,然而上述 MQ 的投递和消费方式,无法保证顺序是正确的。对于顺序异常的消息,Consumer 即使设置有一定的状态容错,也不能完全处理好这么多种随机出现的组合情况。

基于上述情况,可以设计这样的方案:对于相同订单号的消息,通过一定的路由策略(例如基于订单号取模),将其发送到同一个 Queue 中;在消费端,再通过顺序消费策略(例如一个线程独立处理一个 Queue),从而确保同一个订单的消息按照发送顺序被依次处理,保证消费的顺序性。

有序性的分类

根据有序范围的不同,RocketMQ 可以严格地保证两种的消息有序性:全局有序与分区有序。

全局有序

在 RocketMQ 中,当 Topic 只配置一个 Queue(分区),并且在顺序消费模式下,由单个 Consumer 实例以单线程方式顺序消费时,可以保证该 Queue 内的所有消息严格按照发送顺序被消费。这种情况下,由于整个 Topic 只有一个 Queue,其顺序性等价于全局顺序,因此通常称为全局有序。

在创建 Topic 时可以指定 Queue 的数量,常见有以下三种方式:

  • (1) 在代码中创建 Producer 时,通过配置参数指定自动创建 Topic 的 Queue 数量(默认是 4 个 Queue),比如 producer.setDefaultTopicQueueNums(1)
  • (2) 在 RocketMQ 可视化控制台中,手动创建 Topic 时指定 Queue 数量
  • (3) 使用 mqadmin 命令手动创建 Topic 时指定 Queue 数量

特别注意,producer.setDefaultTopicQueueNums(1) 并不是一个强制生效的配置

  • 它只在 Topic 不存在且 Broker 允许自动创建 Topic 的情况下才可能生效,用于在自动创建 Topic 时作为建议的 Queue 数量
  • 如果 Topic 已经存在,或者 Broker 禁止自动创建 Topic,那么该配置将不会生效

在 RocketMQ 中,Topic 的 Queue 数量来源具有明确的优先级,而优先级从高到低依次是:

  • (1) mqadmin / 可视化控制台手动创建 Topic
  • (2) Broker 中已存在的 Topic 配置
  • (3) producer.setDefaultTopicQueueNums(1),仅仅是建议自动创建 Topic 时的默认 Queue 数量,不是强制约束,更不是运行时保证

特别注意

RocketMQ 默认不保证全局顺序,所谓全局有序是 "单 Queue 单 Consumer 顺序消费" 这一极端约束下的特例

分区有序

在 RocketMQ 中,当存在多个 Queue(分区)参与消息发送与消费时,只能保证在顺序消费模式下,每个 Queue(分区)内部的消息按照发送顺序被消费,而无法保证不同 Queue 之间的全局顺序。这种仅保证单个 Queue 内有序的特性称为分区有序。

  • 如何实现 Queue 的选择呢?

    • 在代码中定义 Producer 时,可以指定消息队列选择器(MessageQueueSelector),该选择器需要用户自行实现 MessageQueueSelector 接口,用于决定将消息发送到哪个 Queue。
    • 在设计 Queue 选择算法时,通常需要使用一个 “选择 key”(selectKey),该 key 可以是消息的业务标识,例如订单 ID 或消息 key。但无论使用哪种 key,都必须保证其具有业务唯一性。
  • 一种常见的实现方式:

    • 将 “选择 key”(或其 Hash 值)与 Topic 所包含的 Queue 数量进行取模运算,得到的结果即为目标 Queue 的序号,从而实现将消息路由到固定 Queue 中。
  • 但取模算法存在一个问题:

    • 不同的 “选择 key” 可能会得到相同的取模结果,从而导致不同业务的消息被分配到同一个 Queue 中。此时,同一个 Consumer 可能会拉取到多个不同 “选择 key” 的消息。
  • 为了解决取模算法存在的问题:

    • 一般会在消费端会根据消息中的 “选择 key” 进行再次判断,如果该消息属于当前 Consumer 负责处理的业务范围,则正常消费;否则直接跳过或忽略。
    • 这就要求 “选择 key” 必须能够随消息一起传递给 Consumer,因此通常推荐使用消息 key 作为 “选择 key”。
  • 进一步来看,这种方式是否会引入新的问题?

    • 比如:不属于当前 Consumer 的消息被拉取后,是否会影响正确的 Consumer 消费消息呢?
    • 实际上,在 RocketMQ 的集群消费模式(Clustering)下,同一个 Consumer Group(消费者组)内,一个 Queue 在同一时刻只会被一个 Consumer 消费,因此不会出现多个 Consumer 同时消费同一个 Queue 的情况。如果不同 “选择 key” 的消息被分配到同一个 Queue,这些消息仍然属于同一个 Consumer Group 内的单个 Consumer 处理。
    • 另外,不同 Consumer Group(消费者组)之间的消费是相互隔离的,因此即使不同 Consumer Group 处理不同业务逻辑,也不会产生相互影响。

分区有序的代码案例

  • 生产者自定义 Queue(分区)选择器,根据取模算法选择对应的 Queue,从而保证同一个订单 ID 的消息在单个 Queue 内有序(分区有序)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

/**
* 生产者自定义 Queue 选择器(为了实现消息的有序发送和消费)
*/
public class OrdderMsgProducer {

public static void main(String[] args) throws Exception {
// 创建一个生产者,参数为 Producer Group(生产者组)的名称
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定 NameServer 的地址
producer.setNamesrvAddr("192.168.2.127:9876");
// 指定自动新创建的 Topic 的 Queue 数量,默认为 4
producer.setDefaultTopicQueueNums(4);
// 设置当发送失败时重试发送的次数,默认为 2 次
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时间,默认 3 秒
producer.setSendMsgTimeout(5000);

// 启动生产者
producer.start();

// 生产并发送 10 条消息
for (int i = 0; i < 10; i++) {
// 订单 ID
Integer orderId = i + 1;
// 消息内容
byte[] body = ("Hi," + i).getBytes();
// 定义消息,包括设置 Topic、Tag、消息内容
Message msg = new Message("TopicA", "TagA", body);
// 为消息指定 Key
msg.setKeys(orderId.toString());
// 同步发送消息,并自定义 Queue 选择器
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 使用消息 key 作为 "选择 key"
// String keys = msg.getKeys();
// Integer orderId = Integer.valueOf(keys);

// 或者使用 arg 参数作为 "选择 key"
Integer orderId = (Integer) arg;

// 根据取模算法选择对应的 Queue(分区),从而保证同一个订单 ID 的消息在单个 Queue 内有序(分区有序)
int index = orderId % mqs.size();
return mqs.get(index);
}
}, orderId);
// 打印发送结果
System.out.println(sendResult);
}

// 关闭生产者
producer.shutdown();
}

}