RocketMQ 4.x 入门教程之九

大纲

前言

学习资源

版本说明

本文所使用的各软件版本如下表所示:

组件版本说明
JDK11Java、SpringBoot、SpringCloud 项目案例使用的 JDK 版本
RocketMQ Server4.9.0RocketMQ 服务端(包括 NameServer 和 Broker),运行时依赖 JDK 1.8+
RocketMQ Client4.8.0RocketMQ 客户端(SDK),要求跟 RocketMQ Server 的版本严格匹配

RocketMQ 的应用

引入依赖

本文提供的 Java 代码,均基于以下版本的 RocketMQ Client SDK 实现,要求部署的 RocketMQ Server 版本是 4.9.0

1
2
3
4
5
6
7
8
<dependencies>
<!-- RocketMQ 客户端(SDK),对应 RocketMQ Server 4.9.0 版本 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
</dependencies>

批量消息

批量发送消息

消息发送大小

在 RocketMQ 中,单条消息的最大默认大小为 4MB。不过,这里有几种不同类型消息的具体限制,需要留意一下:

消息类型最大的大小限制补充说明
普通消息、顺序消息 4 MB 这是最通用的限制,无论是开源版本还是云厂商服务都遵循此默认值。
事务消息、定时 / 延时消息 64 KB 这类消息的大小限制要严格得多,设计上就要求其体量更轻量。
消息自定义属性 16 KB 无论消息本身多大,其携带的自定义属性(properties)总和不能超过此限制。

生产者通过 send() 方法发送的 Message,并不会被直接序列化后发送到网络,而是会基于该 Message 生成一个字符串再发送出去。这个字符串由四部分组成:Topic、消息 Body、消息日志(占用 20 字节),以及一组用于描述消息的键值对属性(Key-Value)。这些属性中包含了如生产者地址、生产时间、目标 QueueId 等信息。最终写入 Broker 的消息单元中的数据,全部来源于这些属性。

批量发送限制

生产者进行消息发送时可以一次发送多条消息,这可以大大提升生产者的发送效率。不过需要注意以下几点:

  • 批量发送的消息必须具有相同的 Topic
  • 批量发送的消息必须具有相同的刷盘策略
  • 批量发送的消息不能是延时消息与事务消息
批量发送大小

在默认情况下,批量发送消息的总大小不能超过 4MB。如果想超出该值,有两种解决方案:

  • 方案一:

    • 将批量消息进行拆分,拆分为若干不大于 4MB 的消息集合分多次批量发送
  • 方案二:在 Broker 端与 Producer 端修改属性

    • Broker 端需要修改其加载的配置文件中的 maxMessageSize 属性
    • Producer 端需要在发送消息之前,设置 Producer 的 maxMessageSize 属性
    • 特别注意,Broker 端和 Producer 端都需要更改 maxMessageSize 属性的值,缺一不可

特别注意

尽管技术上可以修改批量发送消息的总大小,但非常不推荐超过 1 MB。因为过大的消息会成为系统的性能瓶颈,导致网络带宽紧张、Broker 内存和磁盘压力增大,从而拖垮整个系统的吞吐能力。

批量消费消息

批量消费相关的属性

在 Consumer 的 MessageListenerConcurrently 监听接口中,consumeMessage() 方法的第一个参数是消息列表,但默认情况下每次只能消费一条消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {

// 一旦 Broker 中有了其订阅的消息就会触发该方法的执行,其返回值为当前 Consumer 消费的状态
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 遍历消息
for (MessageExt msg : msgs) {
System.out.println(msg);
}
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

});

若希望一次消费多条消息,可以通过修改 Consumer 的 consumeMessageBatchMaxSize 属性来指定一次消费的最大消息数量。不过,该属性的值默认不能超过 32,因为默认情况下消费者每次最多可以拉取 32 条消息。若要修改一次拉取的最大消息数量,则可以通过修改 Consumer 的 pullBatchSize 属性来实现。

1
2
3
4
5
// 指定消费者每次从 Broker 拉取的最大消息数量,默认值是 32
consumer.setPullBatchSize(32);

// 指定消费者每次消费的最大消息数量,默认值是 1,不能超过 pullBatchSize 属性的值
consumer.setConsumeMessageBatchMaxSize(5);
批量消费存在的问题

Consumer 的 pullBatchSizeconsumeMessageBatchMaxSize 属性是否设置得越大越好?答案:当然不是。

  • pullBatchSize 值越大,Consumer 每次拉取消息所需的时间就越长,且在网络上传输时出现问题的可能性也越高。如果在拉取过程中出现问题,则该批次的所有消息都需要全部重新拉取。
  • consumeMessageBatchMaxSize 值越大,Consumer 的消息并发消费能力就越低,因为这一批次的消息只会被一个线程处理。而且,这批消息会共享相同的消费结果:只要其中有一条消息处理异常,整批消息都需要全部重新消费(可能会出现重复消费问题)。

批量消息的代码案例

在本案例中,将演示 Provider 批量发送消息和 Consumer 批量消费消息。这里批量发送消息的需求是:不修改默认的最大批量发送大小(4MB),同时要防止待发送的批量消息超出该 4MB 的限制。

消息列表分割器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;

/**
* 消息列表分割器(非线程安全),其只会处理每条消息的大小不超过 sizeLimit 的情况
* 若存在某条消息,其本身大小大于 sizeLimit,这个分割器无法处理,其直接将这条消息构成一个子列表返回,并没有再进行分割
*/
public class MessageListSplitter implements Iterator<List<Message>> {

// 指定每条消息的极限大小
private final int sizeLimit;

// 存放所有要发送的消息
private final List<Message> messages;

// 要进行批量发送的消息起始索引
private int currIndex;

public MessageListSplitter(List<Message> messages, int sizeLimit) {
if (sizeLimit <= 0) {
throw new IllegalArgumentException("sizeLimit must be > 0");
}

this.messages = messages;
this.sizeLimit = sizeLimit;
}

@Override
public boolean hasNext() {
// 判断当前开始遍历的消息索引要小于消息总数
return currIndex < messages.size();
}

@Override
public List<Message> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

// 当前遍历的消息的索引
int nextIndex = currIndex;

// 记录当前要发送的这一小批次消息列表的大小
int totalSize = 0;

for (; nextIndex < messages.size(); nextIndex++) {
// 获取当前遍历的消息
Message message = messages.get(nextIndex);

// 统计当前遍历的消息的大小,消息结构:Topic | Boby | Log | Properties,其中 Log 固定 20 个字节
int msgSize = message.getTopic().getBytes(StandardCharsets.UTF_8).length;

byte[] body = message.getBody();
int bodySize = body == null ? 0 : body.length;
msgSize += bodySize;

Map<String, String> properties = message.getProperties();
if (properties != null) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key != null) {
msgSize += key.getBytes(StandardCharsets.UTF_8).length;
}
if (value != null) {
msgSize += value.getBytes(StandardCharsets.UTF_8).length;
}
}
}
msgSize += 20;

// 判断当前消息本身是否大于 4MB
if (msgSize > sizeLimit) {
// 打印日志信息
System.out.printf("Single message size exceeded limit, topic=%s, size=%d%n", message.getTopic(), msgSize);
// 如果当前批次还没有任何消息,强行塞一条消息
if (nextIndex == currIndex) {
nextIndex++;
}
break;
}

if (msgSize + totalSize > sizeLimit) {
break;
} else {
totalSize += msgSize;
}
}

// 获取当前消息列表的子集合[currIndex, nextIndex)
List<Message> subList = new ArrayList<>(messages.subList(currIndex, nextIndex));

// 下次遍历的起始索引
currIndex = nextIndex;

return subList;
}

}
批量发送消息的生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;

/**
* 批量发送的生产者
*/
public class BatchProducer {

public static void main(String[] args) throws Exception {
// 创建一个生产者,参数为 Producer Group(生产者组)的名称
DefaultMQProducer producer = new DefaultMQProducer("pg");

// 指定 NameServer 的地址
producer.setNamesrvAddr("192.168.2.127:9876");

// 指定要发送的消息的最大大小,默认是 4MB
// 注意:仅修改该属性是不行的,还需要同时修改 Broker 加载的配置文件中的 maxMessageSize 属性
producer.setMaxMessageSize(4 * 1024 * 1024);

// 启动生产者
producer.start();

// 定义要发送的消息集合
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 20; i++) {
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("BatchTopic", "BatchTag", body);
messages.add(msg);
}

// 指定每条消息的极限大小(比如 4MB)
final int sizeLimit = 4 * 1024 * 1024;

// 定义消息列表分割器,将消息列表分割为多个不超出 4MB 大小的小列表
MessageListSplitter splitter = new MessageListSplitter(messages, sizeLimit);
while (splitter.hasNext()) {
try {
// 获取经过分割的消息列表
List<Message> listItem = splitter.next();
// 批量发送消息
SendResult result = producer.send(listItem);
// 打印发送结果
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}

// 关闭生产者
producer.shutdown();
}

}
  • 程序运行输出的结果如下:
1
SendResult [sendStatus=SEND_OK, msgId=ACAF0701668A512DDF179494D7B40000,ACAF0701668A512DDF179494D7B40001,ACAF0701668A512DDF179494D7B40002,ACAF0701668A512DDF179494D7B40003,ACAF0701668A512DDF179494D7B40004,ACAF0701668A512DDF179494D7B40005,ACAF0701668A512DDF179494D7B40006,ACAF0701668A512DDF179494D7B40007,ACAF0701668A512DDF179494D7B40008,ACAF0701668A512DDF179494D7B40009,ACAF0701668A512DDF179494D7B4000A,ACAF0701668A512DDF179494D7B4000B,ACAF0701668A512DDF179494D7B4000C,ACAF0701668A512DDF179494D7B4000D,ACAF0701668A512DDF179494D7B5000E,ACAF0701668A512DDF179494D7B5000F,ACAF0701668A512DDF179494D7B50010,ACAF0701668A512DDF179494D7B50011,ACAF0701668A512DDF179494D7B50012,ACAF0701668A512DDF179494D7B50013, offsetMsgId=C0A8027F00002A9F0000000000087BA1,C0A8027F00002A9F0000000000087C6D,C0A8027F00002A9F0000000000087D39,C0A8027F00002A9F0000000000087E05,C0A8027F00002A9F0000000000087ED1,C0A8027F00002A9F0000000000087F9D,C0A8027F00002A9F0000000000088069,C0A8027F00002A9F0000000000088135,C0A8027F00002A9F0000000000088201,C0A8027F00002A9F00000000000882CD,C0A8027F00002A9F0000000000088399,C0A8027F00002A9F0000000000088466,C0A8027F00002A9F0000000000088533,C0A8027F00002A9F0000000000088600,C0A8027F00002A9F00000000000886CD,C0A8027F00002A9F000000000008879A,C0A8027F00002A9F0000000000088867,C0A8027F00002A9F0000000000088934,C0A8027F00002A9F0000000000088A01,C0A8027F00002A9F0000000000088ACE, messageQueue=MessageQueue [topic=BatchTopic, brokerName=centos7, queueId=1], queueOffset=0]
批量消费消息的消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* 批量消费的消费者
*/
public class BatchConsumer {

public static void main(String[] args) throws Exception {
// 定义一个 Push 消费者(使用 Push 消费方式),参数为 Consumer Group(消费者组)的名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");

// 指定 NameServer 的地址
consumer.setNamesrvAddr("192.168.2.127:9876");

// 指定从哪个位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

// 指定消费的 Topic 与 Tag
consumer.subscribe("BatchTopic", "*");

// 指定消费模式,默认为 "集群消费模式"
consumer.setMessageModel(MessageModel.CLUSTERING);

// 指定消费者每次从 Broker 拉取的最大消息数量,默认值是 32
consumer.setPullBatchSize(40);

// 指定每次消费的最大消息数量,默认值是 1,不能超过 pullBatchSize 的值
consumer.setConsumeMessageBatchMaxSize(20);

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {

// 一旦 Broker 中有了其订阅的消息就会触发该方法的执行,其返回值为当前 Consumer 消费的状态
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 打印每次消费的消息数量
System.out.println("Batch consume size: " + msgs.size());

// 遍历消息
for (MessageExt msg : msgs) {
System.out.println(msg);
}

// 批量消费成功时的返回结果
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

// 批量消费失败时的返回结果
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

});

// 启动消费者
consumer.start();

// 等待消费者消费完
TimeUnit.SECONDS.sleep(120);

// 关闭消费者
consumer.shutdown();
}

}
  • 程序运行输出的结果如下:
1
2
3
4
5
Batch consume size: 20
MessageExt [brokerName=centos7, queueId=0, storeSize=204, queueOffset=100, sysFlag=0, bornTimestamp=1777465644222, bornHost=/192.168.2.140:45818, storeTimestamp=1777465644229, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F0000000000088B9B, commitLogOffset=560027, bodyCRC=654967907, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=101, CONSUME_START_TIME=1777465644243, UNIQ_KEY=ACAF07016AE0512DDF179495C7A70000, CLUSTER=DefaultCluster, WAIT=true, TAGS=BatchTag}, body=[72, 105, 44, 48], transactionId='null'}]
MessageExt [brokerName=centos7, queueId=0, storeSize=204, queueOffset=101, sysFlag=0, bornTimestamp=1777465644222, bornHost=/192.168.2.140:45818, storeTimestamp=1777465644229, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F0000000000088C67, commitLogOffset=560231, bodyCRC=1343042805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=120, CONSUME_START_TIME=1777465644248, UNIQ_KEY=ACAF07016AE0512DDF179495C7A70001, CLUSTER=DefaultCluster, WAIT=true, TAGS=BatchTag}, body=[72, 105, 44, 49], transactionId='null'}]
MessageExt [brokerName=centos7, queueId=0, storeSize=204, queueOffset=102, sysFlag=0, bornTimestamp=1777465644222, bornHost=/192.168.2.140:45818, storeTimestamp=1777465644229, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F0000000000088D33, commitLogOffset=560435, bodyCRC=1225024847, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='BatchTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=120, CONSUME_START_TIME=1777465644248, UNIQ_KEY=ACAF07016AE0512DDF179495C7A70002, CLUSTER=DefaultCluster, WAIT=true, TAGS=BatchTag}, body=[72, 105, 44, 50], transactionId='null'}]
......

过滤消息

消息者在进行消息订阅时,除了可以指定要订阅的 Topic 外,还可以对该 Topic 中的消息根据特定条件进行过滤,从而订阅比 Topic 更细粒度的消息类型。针对指定 Topic 的消息过滤,一共有两种方式:Tag 过滤和 SQL 过滤。

基于 Tag 过滤消息

通过 Consumer 的 subscribe() 方法指定要订阅消息的 Tag。如果要订阅多个 Tag 的消息,Tag 之间使用或运算符(双竖线 ||)进行分割。

1
2
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
consumer.subscribe("UserTopic", "TagA || TagB || TagC");

基于 SQL 过滤消息

SQL 过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。通过 SQL 过滤,可以实现对消息的复杂过滤。特别注意,只有使用 Push 消费方式的消费者才能使用 SQL 过滤功能。在 SQL 过滤表达式中,支持多种常量类型与运算符。

  • 支持的常量类型:

    • 数值:比如:123、3.1415
    • 字符:必须用单引号包裹起来,比如:'abc'
    • 布尔:TRUE 或 FALSE
    • NULL:特殊的常量,表示空
  • 支持的运算符有:

    • 数值比较:>、>=、<、<=、BETWEEN、=
    • 字符比较:=、<>、IN
    • 逻辑运算 :AND、OR、NOT
    • NULL 判断:IS NULL 或者 IS NOT NULL

在默认情况下,Broker 没有开启消息的 SQL 过滤功能,需要在 Broker 加载的配置文件中添加如下属性,以开启该功能:

1
enablePropertyFilter = true

在启动 Broker 时,需要指定这个修改过的配置文件。例如,对于单机版 Broker 的启动,其修改的配置文件是 conf/broker.conf,启动时使用如下命令:

1
nohup sh bin/mqbroker -n 127.0.0.1:9876 -c conf/broker.conf &

过滤消息的代码案例

Tag 过滤消息
  • 定义消息生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
* 消息生产者
*/
public class MsgProducer {

public static void main(String[] args) throws Exception {
// 创建一个生产者,参数为 Producer Group(生产者组)的名称
DefaultMQProducer producer = new DefaultMQProducer("pg");

// 指定 NameServer 的地址
producer.setNamesrvAddr("127.0.0.1:9876");

// 启动生产者
producer.start();

String[] tags = {"MyTagA", "MyTagB", "MyTagC"};

// 发送 10 条消息
for (int i = 0; i < 10; i++) {
// 定义消息
byte[] body = ("Hi," + i).getBytes();
String tag = tags[i % tags.length];
Message msg = new Message("MyTopic", tag, body);
// 发送消息
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}

// 关闭生产者
producer.shutdown();
}

}
  • 程序运行输出的结果如下:
1
2
3
4
5
6
7
8
9
10
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A68E512DDF1798E5DFB50000, offsetMsgId=C0A8027F00002A9F000000000008DB7D, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=0], queueOffset=16]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A68E512DDF1798E5E0070001, offsetMsgId=C0A8027F00002A9F000000000008DC3A, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=1], queueOffset=18]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A68E512DDF1798E5E00C0002, offsetMsgId=C0A8027F00002A9F000000000008DCF7, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=2], queueOffset=19]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A68E512DDF1798E5E0150003, offsetMsgId=C0A8027F00002A9F000000000008DDB4, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=3], queueOffset=17]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A68E512DDF1798E5E01A0004, offsetMsgId=C0A8027F00002A9F000000000008DE71, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=0], queueOffset=17]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A68E512DDF1798E5E01D0005, offsetMsgId=C0A8027F00002A9F000000000008DF2E, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=1], queueOffset=19]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A68E512DDF1798E5E0210006, offsetMsgId=C0A8027F00002A9F000000000008DFEB, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=2], queueOffset=20]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A68E512DDF1798E5E0250007, offsetMsgId=C0A8027F00002A9F000000000008E0A8, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=3], queueOffset=18]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A68E512DDF1798E5E02C0008, offsetMsgId=C0A8027F00002A9F000000000008E165, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=0], queueOffset=18]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A68E512DDF1798E5E0300009, offsetMsgId=C0A8027F00002A9F000000000008E222, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=1], queueOffset=20]
  • 消息消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* 使用 Tag 过滤的消息消费者
*/
public class FilterByTagConsumer {

public static void main(String[] args) throws Exception {
// 定义一个 Push 消费者(使用 Push 消费方式),参数为 Consumer Group(消费者组)的名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pg");

// 指定 NameServer 的地址
consumer.setNamesrvAddr("192.168.2.127:9876");

// 指定从哪个位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// 指定消费的 Topic 与 Tag
consumer.subscribe("MyTopic", "MyTagA || MyTagB");

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 遍历消息
for (MessageExt me : msgs) {
System.out.println(me);
}

// 消费成功时的返回结果
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

});

// 启动消费者
consumer.start();

// 等待消费者消费完
TimeUnit.SECONDS.sleep(120);

// 关闭消费者
consumer.shutdown();
}

}
  • 程序运行输出的结果如下:
1
2
3
4
5
6
7
MessageExt [brokerName=centos7, queueId=0, storeSize=189, queueOffset=16, sysFlag=0, bornTimestamp=1777538001846, bornHost=/192.168.2.140:35054, storeTimestamp=1777538001860, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000008DB7D, commitLogOffset=580477, bodyCRC=654967907, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=19, CONSUME_START_TIME=1777538037489, UNIQ_KEY=ACAF0701A68E512DDF1798E5DFB50000, CLUSTER=DefaultCluster, WAIT=true, TAGS=MyTagA}, body=[72, 105, 44, 48], transactionId='null'}]
MessageExt [brokerName=centos7, queueId=2, storeSize=189, queueOffset=20, sysFlag=0, bornTimestamp=1777538001953, bornHost=/192.168.2.140:35054, storeTimestamp=1777538001955, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000008DFEB, commitLogOffset=581611, bodyCRC=1315545430, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=21, CONSUME_START_TIME=1777538037489, UNIQ_KEY=ACAF0701A68E512DDF1798E5E0210006, CLUSTER=DefaultCluster, WAIT=true, TAGS=MyTagA}, body=[72, 105, 44, 54], transactionId='null'}]
MessageExt [brokerName=centos7, queueId=1, storeSize=189, queueOffset=18, sysFlag=0, bornTimestamp=1777538001927, bornHost=/192.168.2.140:35054, storeTimestamp=1777538001929, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000008DC3A, commitLogOffset=580666, bodyCRC=1343042805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=21, CONSUME_START_TIME=1777538037489, UNIQ_KEY=ACAF0701A68E512DDF1798E5E0070001, CLUSTER=DefaultCluster, WAIT=true, TAGS=MyTagB}, body=[72, 105, 44, 49], transactionId='null'}]
MessageExt [brokerName=centos7, queueId=0, storeSize=189, queueOffset=17, sysFlag=0, bornTimestamp=1777538001946, bornHost=/192.168.2.140:35054, storeTimestamp=1777538001947, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000008DE71, commitLogOffset=581233, bodyCRC=543670394, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=19, CONSUME_START_TIME=1777538037489, UNIQ_KEY=ACAF0701A68E512DDF1798E5E01A0004, CLUSTER=DefaultCluster, WAIT=true, TAGS=MyTagB}, body=[72, 105, 44, 52], transactionId='null'}]
MessageExt [brokerName=centos7, queueId=1, storeSize=189, queueOffset=20, sysFlag=0, bornTimestamp=1777538001968, bornHost=/192.168.2.140:35054, storeTimestamp=1777538001969, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000008E222, commitLogOffset=582178, bodyCRC=1591131335, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=21, CONSUME_START_TIME=1777538037489, UNIQ_KEY=ACAF0701A68E512DDF1798E5E0300009, CLUSTER=DefaultCluster, WAIT=true, TAGS=MyTagA}, body=[72, 105, 44, 57], transactionId='null'}]
MessageExt [brokerName=centos7, queueId=3, storeSize=189, queueOffset=18, sysFlag=0, bornTimestamp=1777538001958, bornHost=/192.168.2.140:35054, storeTimestamp=1777538001960, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000008E0A8, commitLogOffset=581800, bodyCRC=963547584, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=19, CONSUME_START_TIME=1777538037489, UNIQ_KEY=ACAF0701A68E512DDF1798E5E0250007, CLUSTER=DefaultCluster, WAIT=true, TAGS=MyTagB}, body=[72, 105, 44, 55], transactionId='null'}]
MessageExt [brokerName=centos7, queueId=3, storeSize=189, queueOffset=17, sysFlag=0, bornTimestamp=1777538001941, bornHost=/192.168.2.140:35054, storeTimestamp=1777538001942, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000008DDB4, commitLogOffset=581044, bodyCRC=1040405977, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=19, CONSUME_START_TIME=1777538037489, UNIQ_KEY=ACAF0701A68E512DDF1798E5E0150003, CLUSTER=DefaultCluster, WAIT=true, TAGS=MyTagA}, body=[72, 105, 44, 51], transactionId='null'}]
SQL 过滤消息
  • 消息生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
* 消息生产者
*/
public class MsgProducer {

public static void main(String[] args) throws Exception {
// 创建一个生产者,参数为 Producer Group(生产者组)的名称
DefaultMQProducer producer = new DefaultMQProducer("pg");

// 指定 NameServer 的地址
producer.setNamesrvAddr("127.0.0.1:9876");

// 启动生产者
producer.start();

// 发送 10 条消息
for (int i = 0; i < 10; i++) {
// 定义消息(不带 Tag)
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("MyTopic", body);

// 新增用户自定义属性
msg.putUserProperty("age", String.valueOf(i));

// 发送消息
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}

// 关闭生产者
producer.shutdown();
}

}
  • 程序运行输出的结果如下:
1
2
3
4
5
6
7
8
9
10
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A911512DDF1798E71E980000, offsetMsgId=C0A8027F00002A9F000000000008E2DF, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=2], queueOffset=21]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A911512DDF1798E71EA70001, offsetMsgId=C0A8027F00002A9F000000000008E396, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=3], queueOffset=19]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A911512DDF1798E71EAA0002, offsetMsgId=C0A8027F00002A9F000000000008E44D, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=0], queueOffset=19]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A911512DDF1798E71EAE0003, offsetMsgId=C0A8027F00002A9F000000000008E504, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=1], queueOffset=21]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A911512DDF1798E71EB60004, offsetMsgId=C0A8027F00002A9F000000000008E5BB, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=2], queueOffset=22]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A911512DDF1798E71EBA0005, offsetMsgId=C0A8027F00002A9F000000000008E672, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=3], queueOffset=20]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A911512DDF1798E71EC00006, offsetMsgId=C0A8027F00002A9F000000000008E729, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=0], queueOffset=20]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A911512DDF1798E71EC40007, offsetMsgId=C0A8027F00002A9F000000000008E7E0, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=1], queueOffset=22]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A911512DDF1798E71EC80008, offsetMsgId=C0A8027F00002A9F000000000008E897, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=2], queueOffset=23]
SendResult [sendStatus=SEND_OK, msgId=ACAF0701A911512DDF1798E71ECC0009, offsetMsgId=C0A8027F00002A9F000000000008E94E, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=3], queueOffset=21]
  • 消息消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* 使用 SQL 过滤的消息消费者
*/
public class FilterBySQLConsumer {

public static void main(String[] args) throws Exception {
// 定义一个 Push 消费者(使用 Push 消费方式),参数为 Consumer Group(消费者组)的名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pg");

// 指定 NameServer 的地址
consumer.setNamesrvAddr("192.168.2.127:9876");

// 指定从哪个位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// 指定消费的 Topic 与 SQL 表达式
consumer.subscribe("MyTopic", MessageSelector.bySql("age between 0 and 5"));

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 遍历消息
for (MessageExt me : msgs) {
System.out.println(me);
}

// 消费成功时的返回结果
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 启动消费者
consumer.start();

// 等待消费者消费完
TimeUnit.SECONDS.sleep(120);

// 关闭消费者
consumer.shutdown();
}

}
  • 程序运行输出的结果如下:
1
2
3
4
5
6
MessageExt [brokerName=broker-a, queueId=2, storeSize=183, queueOffset=22, sysFlag=0, bornTimestamp=1777538083510, bornHost=/192.168.2.140:35744, storeTimestamp=1777538083511, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000008E5BB, commitLogOffset=583099, bodyCRC=543670394, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=24, CONSUME_START_TIME=1777538774123, UNIQ_KEY=ACAF0701A911512DDF1798E71EB60004, CLUSTER=DefaultCluster, WAIT=true, age=4}, body=[72, 105, 44, 52], transactionId='null'}]
MessageExt [brokerName=broker-a, queueId=2, storeSize=183, queueOffset=21, sysFlag=0, bornTimestamp=1777538083480, bornHost=/192.168.2.140:35744, storeTimestamp=1777538083490, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000008E2DF, commitLogOffset=582367, bodyCRC=654967907, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=24, CONSUME_START_TIME=1777538774123, UNIQ_KEY=ACAF0701A911512DDF1798E71E980000, CLUSTER=DefaultCluster, WAIT=true, age=0}, body=[72, 105, 44, 48], transactionId='null'}]
MessageExt [brokerName=broker-a, queueId=0, storeSize=183, queueOffset=19, sysFlag=0, bornTimestamp=1777538083498, bornHost=/192.168.2.140:35744, storeTimestamp=1777538083499, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000008E44D, commitLogOffset=582733, bodyCRC=1225024847, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=21, CONSUME_START_TIME=1777538774123, UNIQ_KEY=ACAF0701A911512DDF1798E71EAA0002, CLUSTER=DefaultCluster, WAIT=true, age=2}, body=[72, 105, 44, 50], transactionId='null'}]
MessageExt [brokerName=broker-a, queueId=1, storeSize=183, queueOffset=21, sysFlag=0, bornTimestamp=1777538083502, bornHost=/192.168.2.140:35744, storeTimestamp=1777538083508, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000008E504, commitLogOffset=582916, bodyCRC=1040405977, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=23, CONSUME_START_TIME=1777538774123, UNIQ_KEY=ACAF0701A911512DDF1798E71EAE0003, CLUSTER=DefaultCluster, WAIT=true, age=3}, body=[72, 105, 44, 51], transactionId='null'}]
MessageExt [brokerName=broker-a, queueId=3, storeSize=183, queueOffset=20, sysFlag=0, bornTimestamp=1777538083514, bornHost=/192.168.2.140:35744, storeTimestamp=1777538083517, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000008E672, commitLogOffset=583282, bodyCRC=1465970924, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=22, CONSUME_START_TIME=1777538774123, UNIQ_KEY=ACAF0701A911512DDF1798E71EBA0005, CLUSTER=DefaultCluster, WAIT=true, age=5}, body=[72, 105, 44, 53], transactionId='null'}]
MessageExt [brokerName=broker-a, queueId=3, storeSize=183, queueOffset=19, sysFlag=0, bornTimestamp=1777538083495, bornHost=/192.168.2.140:35744, storeTimestamp=1777538083495, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000008E396, commitLogOffset=582550, bodyCRC=1343042805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=22, CONSUME_START_TIME=1777538774123, UNIQ_KEY=ACAF0701A911512DDF1798E71EA70001, CLUSTER=DefaultCluster, WAIT=true, age=1}, body=[72, 105, 44, 49], transactionId='null'}]

消息发送重试机制

Producer 对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。对于消息重投,需要注意以下几点:

  • 生产者在发送消息时,若采用同步或异步发送方式,发送失败默认会重试 2 次,但单向发送方式(Oneway)是没有失败重试机制的。
  • 普通消息支持跨 Broker / Queue 的发送重试,而顺序消息的发送重试仅限于同一队列,不具备跨队列的容错重试能力,因此其容错能力较弱。
  • 消息重投机制可以保证消息尽可能发送成功、不丢失,但可能会造成消息重复。消息重复在 RocketMQ 中是无法避免的问题。
  • 消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会成为大概率事件。
  • Producer 主动重发消息、Consumer 负载变化(发生 Rebalance,不会导致消息重复,但可能出现重复消费)也会导致重复消息。
  • 消息重复是无法避免的,但可以避免消息重复消费。
  • 避免消息重复消费的解决方案是,为消息添加唯一标识(例如消息 Key),使消费者对消息进行消费判断来避免重复消费。
  • 消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略。

同步发送失败策略

对于同步发送消息,如果消息发送失败,默认会重试发送 2 次。但在重试发送时是不会选择上次发送失败的 Broker,而是选择其它 Broker。当然,若只有一个 Broker 其也只能发送到该 Broker,但其会尽量发送到该 Broker 上的其它 Queue(分区)。

1
2
3
4
5
6
7
8
9
10
11
// 创建一个 Producer,参数为 Producer Group 名称
DefaultMQProducer producer = new DefaultMQProducer("pg");

// 指定 NameServer 地址
producer.setNamesrvAddr("127.0.0.1:9876");

// 设置同步发送失败时重试发送的次数,默认为 2 次
producer.setRetryTimesWhenSendFailed(3);

// 设置发送超时时限为 5s,默认 3s
producer.setSendMsgTimeout(5000);

同时,Broker 还具备故障隔离机制,使 Producer 优先选择那些未发生过发送失败的 Broker 作为目标 Broker。该机制能够有效避免将消息发送到存在问题的 Broker,从而提升消息发送效率,降低发送耗时。当消息发送重试次数超过设定上限后,系统会抛出异常,此时由 Producer 自身负责保证消息不丢失。另外,当 Producer 出现 RemotingExceptionMQClientExceptionMQBrokerException 异常时,Producer 也会自动重试发送消息。


思考:让我们自己实现 Broker 的故障隔离功能,该如何实现呢?

  • (1) 方案一

    • 在 Producer 中,可以维护一个线程安全的 Map 集合,key 为 Broker 实例,value 为其最近一次消息发送失败的时间戳。同时,Producer 还维护一个 Set 集合,用于存放当前认为可用的 Broker(即长时间未发生发送异常的 Broker)。在选择目标 Broker 时,从该 Set 集合中进行选择。当某个 Broker 发生发送失败时,将其从 Set 集合中移除,并放入到 Map 中。此外,可以定义一个定时任务,定期扫描该 Map 集合,对于已经超过一定冷却时间且未再次发生异常的 Broker,将其从 Map 中移除,并重新加入到 Set 集合中,从而实现 Broker 的自动恢复。
  • (2) 方案二

    • 在 Producer 中,可以借助 Redis 的 ZSET 实现 Broker 故障隔离。使用 mq:fault:brokers 作为 key,member 为 Broker,score 为 Broker 的恢复时间(当前时间 + 冷却时间);消息发送失败时执行 ZADD mq:fault:brokers <recoverTime> <brokerName> 将 Broker 加入隔离列表,选择可用 Broker 时先通过 ZRANGEBYSCORE mq:fault:brokers -inf now 获取已过冷却期的 Broker,再执行 ZREM mq:fault:brokers <brokerName>(或批量删除)清理已过冷却期的 Broker,然后再通过 ZRANGEBYSCORE mq:fault:brokers (now +inf 获取仍在冷却期的 Broker 并在本地选择可用 Broker 时过滤掉,从而实现 Broker 的故障隔离与恢复机制。
  • (3) 方案三

    • 为 Producer 中的每个 Broker 实例添加一个标识字段,例如一个 AtomicLong 属性。只要某个 Broker 上发生过发送异常,就将该属性值增一。选择目标 Broker 时,优先选择该属性值最小的 Broker;若多个 Broker 的属性值相同,则采用轮询方式进行选择。

异步发送失败策略

对于异步发送消息,如果消息发送失败,默认会重试发送 2 次。但在异步重试时不会选择其他 Broker,仅在同一个 Broker 上做重试,所以该策略无法保证消息不丢失。

1
2
3
4
5
6
7
8
// 创建一个 Producer,参数为 Producer Group 名称
DefaultMQProducer producer = new DefaultMQProducer("pg");

// 指定 NameServer 地址
producer.setNamesrvAddr("127.0.0.1:9876");

// 设置异步发送失败时重试发送的次数,默认为 2 次
producer.setRetryTimesWhenSendAsyncFailed(3);

消息刷盘失败策略

当出现消息刷盘超时(Master 或 Slave)或 Slave 不可用时(即 Slave 在做数据同步时向 Master 返回的状态不是 SEND_OK),默认情况下不会将该消息尝试发送到其他 Broker。但对于重要消息,可以通过在 Broker 的配置文件中将 retryAnotherBrokerWhenNotStoreOK 属性设置为 true 来开启此功能。

1
retryAnotherBrokerWhenNotStoreOK=true

消息消费重试机制

顺序消息的消费重试

对于顺序消息,当 Consumer 消费消息失败后,为了保证消息的顺序性,RocketMQ 会在原 Topic 的原 Queue 中自动进行无限次消费重试(默认),直到消费成功为止。消费重试默认间隔时间为 1000 毫秒。消费重试期间应用会出现消息消费被阻塞的情况,这与普通消息超过最大消费重试次数后发往 %RETRY% 重试队列的机制不同。

1
2
3
4
5
// 定义一个 Push 消费者(使用 Push 消费方式),参数为 Consumer Group(消费者组)的名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");

// 指定顺序消息消费失败的消费重试时间间隔,单位毫秒,默认为1000,其取值范围为 [10, 30000]
consumer.setSuspendCurrentQueueTimeMillis(100);

由于对顺序消息的重试是无休止的,不间断的,直至消费成功为止;所以,对于顺序消息的消费,务必要保证应用能够及时监控并处理消费失败的情况,避免消费被永久性阻塞。特别注意,顺序消息的发送重试机制受到严格限制(不支持跨 Broker / Queue 重试),而消费端则提供了严格的失败重试机制以保证消费顺序性。

无序消息的消费重试

对于无序消息(包括普通消息、延时消息、事务消息),当 Consumer 消费消息失败时,可以通过设置返回状态(ConsumeConcurrentlyStatus.RECONSUME_LATER)达到消息重试的效果。特别注意,无序消息的重试只对集群消费方式生效,广播消费方式不提供失败重试特性。即对于广播消费,消息消费失败后,失败的消息不再重试消费,而是直接继续消费后续消息。

消费重试次数与间隔

对于无序消息 + 集群消费模式下的重试消费,每条消息默认最多重试消费 16 次,但每次重试的间隔时间是不同的,会逐渐变长。消费重试的时间间隔与延迟消息的延时等级十分相似,除了没有延时等级的前两个时间间隔外,其他的时间间隔都是相同的。每次消费重试的间隔时间如下表:

重试次数与上次重试的间隔时间
110 秒
230 秒
31 分钟
42 分钟
53 分钟
64 分钟
75 分钟
86 分钟
97 分钟
108 分钟
119 分钟
1210 分钟
1320 分钟
1430 分钟
151 小时
162 小时

若一条消息在一直消费失败的前提下,将会在第一次消费失败后的第 4 小时 46 分后进行第 16 次重试。若仍然消费失败(超过最大消费重试次数),则该消息将被投递到死信队列。可以通过代码修改最大消费重试次数:

1
2
3
4
5
6
7
8
// 定义一个 Push 消费者(使用 Push 消费方式),参数为 Consumer Group(消费者组)的名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");

// 指定消费模式,默认为 "集群消费模式"
consumer.setMessageModel(MessageModel.CLUSTERING);

// 指定最大消费重试次数,默认 16 次
consumer.setMaxReconsumeTimes(10);

对于修改过的消费重试次数,将按照以下策略执行消费重试:

  • 若修改值小于 16,则按照指定时间间隔(如上表所示)进行重试
  • 若修改值大于 16,则超过 16 次的重试时间间隔均为 2 小时

对于 Consumer Group(消费者组),若仅修改了一个 Consumer 的消费重试次数,则会应用到该消费者组中的所有其它 Consumer 实例。若出现多个 Consumer 均修改了消费重试次数的情况,则会采用覆盖方式生效,即最后被修改的值会覆盖前面设置的值。

重试队列的实现原理

对于需要重试消费的消息,并不是 Consumer 在等待了指定时长后再次从 Broker 拉取原来的消息进行消费,而是将这些需要重试消费的消息放入到了一个特殊 Topic 的队列中,然后再进行消费的。这个特殊的队列就是重试队列。

  • 当出现需要进行重试消费的消息时,Broker 会自动为每个消费组创建一个 Topic 名称为 %RETRY%{consumerGroup} 的重试队列。
  • 消费失败的消息会被投递到该重试队列中,并按照一定的延迟策略再次投递和消费,从而实现消息的重试机制。
  • 只有出现需要进行重试消费的消息时,Broker 才会为对应的消费者组创建重试队列。这个重试队列是针对消费者组的,而不是针对每个 Topic 创建的(因为一个 Topic 的消息可以被多个消费者组进行消费,所以会为这些消费者组各创建一个重试队列)。

特别注意

在重试队列中,Broker 对重试消息的处理是通过延迟消息机制实现的。当消息消费失败后,消息会被重新发送到对应的 %RETRY%{consumerGroup} 重试主题(即重试队列),同时设置延迟等级。在底层实现中,Broker 会先将该消息存入内部的 SCHEDULE_TOPIC_XXXX 调度主题(即延迟队列),待延迟时间到达后,再将消息投递回重试主题 %RETRY%{consumerGroup} 的对应队列供消费者再次消费。

消费重试的配置方式

在集群消费模式下,若消息消费失败后希望触发消费重试,需要在消息监听器接口的实现中明确采用以下三种方式之一进行配置。

  • 方式一:返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 或者 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
  • 方式二:返回 Null
  • 方式三:抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 定义一个 Push 消费者(使用 Push 消费方式),参数为 Consumer Group(消费者组)的名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");

// 指定消费模式,默认为 "集群消费模式"
consumer.setMessageModel(MessageModel.CLUSTERING);

// 指定最大消费重试次数,默认 16 次
consumer.setMaxReconsumeTimes(10);

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// ......(处理消息)
}catch(Exception e) {
// 以下三种情况都可以触发消费重试
// return null;
// throw new RuntimeException("消费异常");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

在某些场景下,开发者可能需要主动决定是否重试消费。例如,当消息处理失败时,可以结合业务逻辑判断是否重新投递消息。具体实现方式:将消息发送到开发者自己创建的重试 Topic,由专门的消费者处理。

1
2
3
4
5
if (reconsumeTimes >= 3) {
// 消费重试超过指定的次数后,手动将消息发送到自定义的重试 Topic
producer.send(msg, "MyRetryTopic");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

消费不重试配置方式

在集群消费模式下,若消费失败后不希望触发重试,只需在捕获异常后返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS(即与消费成功时相同的返回状态),即可避免消费重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 定义一个 Push 消费者(使用 Push 消费方式),参数为 Consumer Group(消费者组)的名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");

// 指定消费模式,默认为 "集群消费模式"
consumer.setMessageModel(MessageModel.CLUSTERING);

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// ......(处理消息)
}catch(Exception e) {
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

死信队列

什么是死信队列

当一条消息首次消费失败时,消息队列会自动触发消费重试。当重试次数达到上限后消费仍失败,表明消费者无法正常处理该消息。此时,消息队列不会立即将其丢弃,而是发送到该消费者对应的特殊队列中。这个队列称为死信队列(Dead-Letter Queue,DLQ),其中的消息称为死信消息(Dead-Letter Message,DLM)。死信队列专门用于处理无法被正常消费的消息。

死信产生的原因

当消息的消费重试次数超过 maxReconsumeTimes(默认值是 16)后仍无法被消费,RocketMQ 会将其标记为死信,并存储到对应的死信队列中。消息消费失败的常见原因包括:

  • 消费者持续抛出异常
  • 消息体格式错误导致消息解析失败
  • 业务系统依赖的服务不可用(如数据库连接超时)

死信队列的特征

死信队列具有如下特征:

  • 死信队列中的消息不会再被消费者正常消费,即 DLQ 对于消费者是不可见的
  • 死信消息的存储有效期与正常消息相同,均为 3 天(即 commitlog 文件的过期时间),3 天后会被自动删除
  • 死信队列就是一个特殊的 Topic,名称为 %DLQ%{consumerGroup},即每个消费者组对应一个死信队列
  • 一个死信队列对应一个 Consumer Group(消费组),而不是对应单个 Consumer 实例;只有当消费组产生死信消息后,Broker 才会为其创建对应的死信队列
  • 消费死信队列与消费普通 Topic 的方式完全相同,只需将 Topic 参数设置为死信队列的名称即可

死信队列的使用注意事项:

注意事项说明
仅支持集群消费模式死信队列机制只在集群消费模式下生效,因为广播消费模式下不会自动进行消费重试
有效期死信消息与正常消息有效期相同,默认保留 48 小时(即 3 天),超时自动删除
幂等处理死信消息可能被多次投递,消费端需做好幂等设计

死信消息的处理

实际上,当一条消息进入死信队列时,通常意味着系统中某些地方存在问题,导致消费者无法正常消费该消息(例如代码中存在 Bug、依赖服务故障、数据格式异常等)。因此,死信消息通常需要开发人员介入进行特殊处理。最关键的一步是排查可疑因素、修复代码中可能存在的 Bug,然后再将原有的死信消息重新投递到原 Topic 进行消费。

  • 死信消息的处理流程:

    • 自动进入死信队列:当消息的消费重试次数达到 maxReconsumeTimes(默认值是 16)时,RocketMQ 会自动将其投递到死信队列。
    • 手动放入死信队列:在消费者代码中,显式判断消费重试次数,将消息发送到死信队列。
  • 死信消息的处理策略:

    • 监控与告警:通过 RocketMQ 控制台或 Prometheus 监控死信队列的消息堆积量,并设置阈值进行告警。
    • 人工干预:将死信消息导出至日志系统(比如 ELK),分析消费失败原因(例如数据库字段缺失、接口变更等)。
    • 重新投递:使用脚本或管理工具将死信消息重新投递到原 Topic 进行重新消费,但需要确保产生死信消息的问题已解决。
    • 补偿机制:编写专用消费者处理死信队列,执行补偿操作(如数据修复、人工审核等)。
  • 将死信消息重新投递到原 Topic 的意义:

    • 问题已经修复:开发人员排查并解决了代码中的 Bug 后,原消费者已经能够正常处理这类消息了。此时将死信消息重新投递回原 Topic,让它走正常的消费流程,是最自然的方式。
    • 保持业务完整性:原 Topic 可能被多个消费者订阅(例如用于不同业务场景),直接消费死信队列无法触达这些消费者。
    • 复用成熟机制:重新投递后,消息可以再次享受 RocketMQ 提供的消费重试、负载均衡、顺序消费等能力。

为什么不建议直接消费死信消息,而是将其重新投递到原 Topic?

  • 消息进入死信队列通常是因为代码 Bug、依赖服务故障、数据格式异常等根本性问题。直接消费死信消息,同样的异常会再次抛出。
  • 消费者处理死信队列消息时,如果执行的还是同一套业务逻辑(复用同一 Consumer 的话);问题不解决,消费必然再次失败。
  • 死信队列的设计初衷是 "隔离" 无法处理的消息,而不是继续用相同的方式去处理它们。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* 死信队列消费者
*
* 功能说明:
* 1. 消费死信队列中的消息
* 2. 处理完死信消息后,将消息重新投递到原始 Topic
*/
public class DeadLetterQueueConsumer {

// NameServer 地址
private static final String NAMESRV_ADDR = "127.0.0.1:9876";

// 消费死信队列的消费者组
private static final String CONSUMER_GROUP = "DLQ_Consumer_Group";

// 重新投递消息的生产者组
private static final String PRODUCER_GROUP = "Resend_Producer_Group";

// 死信队列的名称
private static final String DLQ_TOPIC = "%DLQ%OriginalConsumerGroup";

// 消费者实例
private DefaultMQPushConsumer consumer;

// 生产者实例
private DefaultMQProducer producer;

// 运行状态标志
private AtomicBoolean running = new AtomicBoolean(true);

/**
* 初始化消费者和生产者
*/
public void init() throws MQClientException {
// 1. 初始化生产者(用于重新投递消息)
initProducer();

// 2. 初始化消费者(用于消费死信队列)
initConsumer();
}

/**
* 初始化生产者
*/
private void initProducer() throws MQClientException {
// 创建生产者
producer = new DefaultMQProducer(PRODUCER_GROUP);

// 设置 NameServer 地址
producer.setNamesrvAddr(NAMESRV_ADDR);

// 启动生产者
producer.start();
System.out.println("生产者已启动,Group: " + PRODUCER_GROUP);
}

/**
* 初始化消费者
*/
private void initConsumer() throws MQClientException {
// 创建消费者
consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);

// 设置 NameServer 地址
consumer.setNamesrvAddr(NAMESRV_ADDR);

// 订阅死信队列
consumer.subscribe(DLQ_TOPIC, "*");

// 指定消费模式,默认为 "集群消费模式"
consumer.setMessageModel(MessageModel.CLUSTERING);

// 设置消费起始位置:从第一个偏移量开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
// 处理单条死信消息
handleDeadLetterMessage(msg);
} catch (Exception e) {
// 打印错误日志
System.err.println("处理死信消息失败: " + e.getMessage());
e.printStackTrace();
// 返回 RECONSUME_LATER 触发消费重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}

// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 启动消费者
consumer.start();
System.out.println("死信队列消费者已启动,Topic: " + DLQ_TOPIC);
}

/**
* 处理单条死信消息
*/
private void handleDeadLetterMessage(MessageExt msg) throws Exception {
// 1. 解析死信消息内容
String messageBody = new String(msg.getBody(), "UTF-8");
String originalTopic = msg.getProperty("ORIGIN_TOPIC");
String originalTags = msg.getTags();
String originalKeys = msg.getKeys();
int reconsumeTimes = msg.getReconsumeTimes();

System.out.println("========================================");
System.out.println("收到死信消息:");
System.out.println(" 消息ID: " + msg.getMsgId());
System.out.println(" 原始Topic: " + originalTopic);
System.out.println(" 原始Tags: " + originalTags);
System.out.println(" 原始Keys: " + originalKeys);
System.out.println(" 已重试次数: " + reconsumeTimes);
System.out.println(" 消息内容: " + messageBody);
System.out.println(" 存储时间: " + msg.getStoreTimestamp());
System.out.println("========================================");

// 2. 业务处理:记录死信消息到数据库或日志文件
saveDeadLetterToLog(msg);

// 3. 发送告警通知(可选)
sendAlertNotification(msg);

// 4. 排查问题并修复后,重新投递到原始 Topic
// 注意:实际业务中,这一步通常由人工触发或在问题确认修复后执行
// 此处示例为自动重新投递,实际使用时可注释掉
resendDeadLetterMessage(msg);
}

/**
* 重新投递死信消息到原始 Topic
*/
public void resendDeadLetterMessage(MessageExt deadLetterMsg) throws Exception {
// 获取原始 Topic
String originalTopic = deadLetterMsg.getProperty("ORIGIN_TOPIC");

// 校验原始 Topic 是否存在
if (originalTopic == null || originalTopic.isEmpty()) {
System.err.println("原始 Topic 为空,无法重新投递,消息ID: " + deadLetterMsg.getMsgId());
return;
}

// 创建新消息,使用原始 Topic
Message newMsg = new Message();
newMsg.setTopic(originalTopic);
newMsg.setTags(deadLetterMsg.getTags());
newMsg.setKeys(deadLetterMsg.getKeys());
newMsg.setBody(deadLetterMsg.getBody());

// 设置原始消息ID等属性(便于追踪)
newMsg.putUserProperty("ORIGIN_MSG_ID", deadLetterMsg.getMsgId());
newMsg.putUserProperty("RECONSUME_TIMES", String.valueOf(deadLetterMsg.getReconsumeTimes()));
newMsg.putUserProperty("FROM_DLQ", "true");
newMsg.putUserProperty("RESEND_TIME", String.valueOf(System.currentTimeMillis()));

// 重新发送消息(同步发送)
producer.send(newMsg);

System.out.println("死信消息已重新投递到原 Topic:");
System.out.println(" 目标Topic: " + originalTopic);
System.out.println(" 原始消息ID: " + deadLetterMsg.getMsgId());
System.out.println(" 新消息ID: " + newMsg.getMsgId());
}

/**
* 记录死信消息到日志或数据库
*/
private void saveDeadLetterToLog(MessageExt msg) {
// 实际项目中可将死信消息写入数据库,便于后续分析和处理
System.out.println("[日志记录] 死信消息已记录,消息ID: " + msg.getMsgId());
}

/**
* 发送告警通知
*/
private void sendAlertNotification(MessageExt msg) {
// 实际项目中可发送邮件、短信、钉钉机器人等告警
System.out.println("[告警] 检测到死信消息,请及时处理!消息ID: " + msg.getMsgId());
}

/**
* 关闭消费者和生产者,释放资源
*/
public void shutdown() {
running.set(false);

if (consumer != null) {
consumer.shutdown();
System.out.println("消费者已关闭");
}

if (producer != null) {
producer.shutdown();
System.out.println("生产者已关闭");
}
}

/**
* 主方法(用于独立运行)
*/
public static void main(String[] args) {
final DeadLetterQueueConsumer dlqConsumer = new DeadLetterQueueConsumer();

// 添加 JVM 关闭钩子,确保资源正确释放
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("JVM 正在关闭,释放资源...");
dlqConsumer.shutdown();
}));

try {
// 初始化并启动消费者和生产者
dlqConsumer.init();
System.out.println("服务已启动,等待死信消息...");

// 保持主线程运行
Thread.sleep(Long.MAX_VALUE);
} catch (MQClientException e) {
System.err.println("初始化失败: " + e.getMessage());
e.printStackTrace();
dlqConsumer.shutdown();
} catch (InterruptedException e) {
System.err.println("主线程被中断");
Thread.currentThread().interrupt();
dlqConsumer.shutdown();
}
}
}

死信队列的最佳实践

  • 幂等性设计:避免重复消费

    • 消息消费重试可能会导致重复消费,需要保障消费幂等性
    • 比如:使用业务唯一标识,在消息中设置业务 ID(如订单号),通过数据库唯一索引或 Redis 去重。
  • 合理配置最大重试次数与间隔

    • 高频业务:降低最大重试次数(如 3 ~ 5 次),避免资源浪费
    • 低频高可靠性业务:保留默认最多重试消费 16 次,并结合指数退避算法(比如 coolDown = base * 2^failCount)减少负载
  • 死信队列的清理与归档

    • 定期清理:对已确认无法修复消费的死信消息,通过定时任务清理死信队列(死信消息默认只保留 3 天)
    • 归档存储:将死信消息持久化到日志系统(比如 ELK),供后续审计
  • RocketMQ 与 Kafka 的对比

特性 RocketMQKafka
自动发送重试默认支持,可配置重试次数和间隔支持(通过 retriesretry.backoff.ms 等参数配置)
自动消费重试支持渐进式消费重试,可配置最大重试次数不支持,需手动实现重试逻辑
死信队列内置 DLQ,支持自动 / 手动重投无原生支持,需自定义逻辑
适用场景企业级高可靠性业务(如支付、订单)流式处理、日志聚合