RocketMQ 4.x 入门教程之九
大纲
- RocketMQ 4.x 入门教程之一、RocketMQ 4.x 入门教程之二、RocketMQ 4.x 入门教程之三
- RocketMQ 4.x 入门教程之四、RocketMQ 4.x 入门教程之五、RocketMQ 4.x 入门教程之六
- RocketMQ 4.x 入门教程之七、RocketMQ 4.x 入门教程之八、RocketMQ 4.x 入门教程之九
前言
学习资源
版本说明
本文所使用的各软件版本如下表所示:
| 组件 | 版本 | 说明 |
|---|---|---|
| JDK | 11 | Java、SpringBoot、SpringCloud 项目案例使用的 JDK 版本 |
| RocketMQ Server | 4.9.0 | RocketMQ 服务端(包括 NameServer 和 Broker),运行时依赖 JDK 1.8+ |
| RocketMQ Client | 4.8.0 | RocketMQ 客户端(SDK),要求跟 RocketMQ Server 的版本严格匹配 |
RocketMQ 的应用
引入依赖
本文提供的 Java 代码,均基于以下版本的 RocketMQ Client SDK 实现,要求部署的 RocketMQ Server 版本是 4.9.0。
1 | <dependencies> |
批量消息
批量发送消息
消息发送大小
在 RocketMQ 中,单条消息的最大默认大小为 4MB。不过,这里有几种不同类型消息的具体限制,需要留意一下:
| 消息类型 | 最大的大小限制 | 补充说明 |
|---|---|---|
| 普通消息、顺序消息 | 4 MB | 这是最通用的限制,无论是开源版本还是云厂商服务都遵循此默认值。 |
| 事务消息、定时 / 延时消息 | 64 KB | 这类消息的大小限制要严格得多,设计上就要求其体量更轻量。 |
| 消息自定义属性 | 16 KB | 无论消息本身多大,其携带的自定义属性(properties)总和不能超过此限制。 |

生产者通过 send() 方法发送的 Message,并不会被直接序列化后发送到网络,而是会基于该 Message 生成一个字符串再发送出去。这个字符串由四部分组成:Topic、消息 Body、消息日志(占用 20 字节),以及一组用于描述消息的键值对属性(Key-Value)。这些属性中包含了如生产者地址、生产时间、目标 QueueId 等信息。最终写入 Broker 的消息单元中的数据,全部来源于这些属性。
批量发送限制
生产者进行消息发送时可以一次发送多条消息,这可以大大提升生产者的发送效率。不过需要注意以下几点:
- 批量发送的消息必须具有相同的 Topic
- 批量发送的消息必须具有相同的刷盘策略
- 批量发送的消息不能是延时消息与事务消息
批量发送大小
在默认情况下,批量发送消息的总大小不能超过 4MB。如果想超出该值,有两种解决方案:
方案一:
- 将批量消息进行拆分,拆分为若干不大于 4MB 的消息集合分多次批量发送
方案二:在 Broker 端与 Producer 端修改属性
- Broker 端需要修改其加载的配置文件中的
maxMessageSize属性 - Producer 端需要在发送消息之前,设置 Producer 的
maxMessageSize属性 - 特别注意,Broker 端和 Producer 端都需要更改
maxMessageSize属性的值,缺一不可
- Broker 端需要修改其加载的配置文件中的
特别注意
尽管技术上可以修改批量发送消息的总大小,但非常不推荐超过 1 MB。因为过大的消息会成为系统的性能瓶颈,导致网络带宽紧张、Broker 内存和磁盘压力增大,从而拖垮整个系统的吞吐能力。
批量消费消息
批量消费相关的属性
在 Consumer 的 MessageListenerConcurrently 监听接口中,consumeMessage() 方法的第一个参数是消息列表,但默认情况下每次只能消费一条消息。
1 | // 注册消息监听器 |
若希望一次消费多条消息,可以通过修改 Consumer 的 consumeMessageBatchMaxSize 属性来指定一次消费的最大消息数量。不过,该属性的值默认不能超过 32,因为默认情况下消费者每次最多可以拉取 32 条消息。若要修改一次拉取的最大消息数量,则可以通过修改 Consumer 的 pullBatchSize 属性来实现。
1 | // 指定消费者每次从 Broker 拉取的最大消息数量,默认值是 32 |
批量消费存在的问题
Consumer 的 pullBatchSize 与 consumeMessageBatchMaxSize 属性是否设置得越大越好?答案:当然不是。
pullBatchSize值越大,Consumer 每次拉取消息所需的时间就越长,且在网络上传输时出现问题的可能性也越高。如果在拉取过程中出现问题,则该批次的所有消息都需要全部重新拉取。consumeMessageBatchMaxSize值越大,Consumer 的消息并发消费能力就越低,因为这一批次的消息只会被一个线程处理。而且,这批消息会共享相同的消费结果:只要其中有一条消息处理异常,整批消息都需要全部重新消费(可能会出现重复消费问题)。
批量消息的代码案例
在本案例中,将演示 Provider 批量发送消息和 Consumer 批量消费消息。这里批量发送消息的需求是:不修改默认的最大批量发送大小(4MB),同时要防止待发送的批量消息超出该 4MB 的限制。
消息列表分割器
1 | import org.apache.rocketmq.common.message.Message; |
批量发送消息的生产者
1 | import org.apache.rocketmq.client.producer.DefaultMQProducer; |
- 程序运行输出的结果如下:
1 | SendResult [sendStatus=SEND_OK, msgId=ACAF0701668A512DDF179494D7B40000,ACAF0701668A512DDF179494D7B40001,ACAF0701668A512DDF179494D7B40002,ACAF0701668A512DDF179494D7B40003,ACAF0701668A512DDF179494D7B40004,ACAF0701668A512DDF179494D7B40005,ACAF0701668A512DDF179494D7B40006,ACAF0701668A512DDF179494D7B40007,ACAF0701668A512DDF179494D7B40008,ACAF0701668A512DDF179494D7B40009,ACAF0701668A512DDF179494D7B4000A,ACAF0701668A512DDF179494D7B4000B,ACAF0701668A512DDF179494D7B4000C,ACAF0701668A512DDF179494D7B4000D,ACAF0701668A512DDF179494D7B5000E,ACAF0701668A512DDF179494D7B5000F,ACAF0701668A512DDF179494D7B50010,ACAF0701668A512DDF179494D7B50011,ACAF0701668A512DDF179494D7B50012,ACAF0701668A512DDF179494D7B50013, offsetMsgId=C0A8027F00002A9F0000000000087BA1,C0A8027F00002A9F0000000000087C6D,C0A8027F00002A9F0000000000087D39,C0A8027F00002A9F0000000000087E05,C0A8027F00002A9F0000000000087ED1,C0A8027F00002A9F0000000000087F9D,C0A8027F00002A9F0000000000088069,C0A8027F00002A9F0000000000088135,C0A8027F00002A9F0000000000088201,C0A8027F00002A9F00000000000882CD,C0A8027F00002A9F0000000000088399,C0A8027F00002A9F0000000000088466,C0A8027F00002A9F0000000000088533,C0A8027F00002A9F0000000000088600,C0A8027F00002A9F00000000000886CD,C0A8027F00002A9F000000000008879A,C0A8027F00002A9F0000000000088867,C0A8027F00002A9F0000000000088934,C0A8027F00002A9F0000000000088A01,C0A8027F00002A9F0000000000088ACE, messageQueue=MessageQueue [topic=BatchTopic, brokerName=centos7, queueId=1], queueOffset=0] |
批量消费消息的消费者
1 | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
- 程序运行输出的结果如下:
1 | Batch consume size: 20 |
过滤消息
消息者在进行消息订阅时,除了可以指定要订阅的 Topic 外,还可以对该 Topic 中的消息根据特定条件进行过滤,从而订阅比 Topic 更细粒度的消息类型。针对指定 Topic 的消息过滤,一共有两种方式:Tag 过滤和 SQL 过滤。
基于 Tag 过滤消息
通过 Consumer 的 subscribe() 方法指定要订阅消息的 Tag。如果要订阅多个 Tag 的消息,Tag 之间使用或运算符(双竖线 ||)进行分割。
1 | DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg"); |
基于 SQL 过滤消息
SQL 过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。通过 SQL 过滤,可以实现对消息的复杂过滤。特别注意,只有使用 Push 消费方式的消费者才能使用 SQL 过滤功能。在 SQL 过滤表达式中,支持多种常量类型与运算符。
支持的常量类型:
- 数值:比如:123、3.1415
- 字符:必须用单引号包裹起来,比如:
'abc' - 布尔:TRUE 或 FALSE
- NULL:特殊的常量,表示空
支持的运算符有:
- 数值比较:>、>=、<、<=、BETWEEN、=
- 字符比较:=、<>、IN
- 逻辑运算 :AND、OR、NOT
- NULL 判断:IS NULL 或者 IS NOT NULL
在默认情况下,Broker 没有开启消息的 SQL 过滤功能,需要在 Broker 加载的配置文件中添加如下属性,以开启该功能:
1 | enablePropertyFilter = true |
在启动 Broker 时,需要指定这个修改过的配置文件。例如,对于单机版 Broker 的启动,其修改的配置文件是 conf/broker.conf,启动时使用如下命令:
1 | nohup sh bin/mqbroker -n 127.0.0.1:9876 -c conf/broker.conf & |
过滤消息的代码案例
Tag 过滤消息
- 定义消息生产者
1 | import org.apache.rocketmq.client.producer.DefaultMQProducer; |
- 程序运行输出的结果如下:
1 | SendResult [sendStatus=SEND_OK, msgId=ACAF0701A68E512DDF1798E5DFB50000, offsetMsgId=C0A8027F00002A9F000000000008DB7D, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=0], queueOffset=16] |
- 消息消费者
1 | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
- 程序运行输出的结果如下:
1 | MessageExt [brokerName=centos7, queueId=0, storeSize=189, queueOffset=16, sysFlag=0, bornTimestamp=1777538001846, bornHost=/192.168.2.140:35054, storeTimestamp=1777538001860, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000008DB7D, commitLogOffset=580477, bodyCRC=654967907, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=19, CONSUME_START_TIME=1777538037489, UNIQ_KEY=ACAF0701A68E512DDF1798E5DFB50000, CLUSTER=DefaultCluster, WAIT=true, TAGS=MyTagA}, body=[72, 105, 44, 48], transactionId='null'}] |
SQL 过滤消息
- 消息生产者
1 | import org.apache.rocketmq.client.producer.DefaultMQProducer; |
- 程序运行输出的结果如下:
1 | SendResult [sendStatus=SEND_OK, msgId=ACAF0701A911512DDF1798E71E980000, offsetMsgId=C0A8027F00002A9F000000000008E2DF, messageQueue=MessageQueue [topic=MyTopic, brokerName=centos7, queueId=2], queueOffset=21] |
- 消息消费者
1 | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
- 程序运行输出的结果如下:
1 | MessageExt [brokerName=broker-a, queueId=2, storeSize=183, queueOffset=22, sysFlag=0, bornTimestamp=1777538083510, bornHost=/192.168.2.140:35744, storeTimestamp=1777538083511, storeHost=/192.168.2.127:10911, msgId=C0A8027F00002A9F000000000008E5BB, commitLogOffset=583099, bodyCRC=543670394, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=24, CONSUME_START_TIME=1777538774123, UNIQ_KEY=ACAF0701A911512DDF1798E71EB60004, CLUSTER=DefaultCluster, WAIT=true, age=4}, body=[72, 105, 44, 52], transactionId='null'}] |
消息发送重试机制
Producer 对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。对于消息重投,需要注意以下几点:
- 生产者在发送消息时,若采用同步或异步发送方式,发送失败默认会重试 2 次,但单向发送方式(Oneway)是没有失败重试机制的。
- 普通消息支持跨 Broker / Queue 的发送重试,而顺序消息的发送重试仅限于同一队列,不具备跨队列的容错重试能力,因此其容错能力较弱。
- 消息重投机制可以保证消息尽可能发送成功、不丢失,但可能会造成消息重复。消息重复在 RocketMQ 中是无法避免的问题。
- 消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会成为大概率事件。
- Producer 主动重发消息、Consumer 负载变化(发生 Rebalance,不会导致消息重复,但可能出现重复消费)也会导致重复消息。
- 消息重复是无法避免的,但可以避免消息重复消费。
- 避免消息重复消费的解决方案是,为消息添加唯一标识(例如消息 Key),使消费者对消息进行消费判断来避免重复消费。
- 消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略。
同步发送失败策略
对于同步发送消息,如果消息发送失败,默认会重试发送 2 次。但在重试发送时是不会选择上次发送失败的 Broker,而是选择其它 Broker。当然,若只有一个 Broker 其也只能发送到该 Broker,但其会尽量发送到该 Broker 上的其它 Queue(分区)。
1 | // 创建一个 Producer,参数为 Producer Group 名称 |
同时,Broker 还具备故障隔离机制,使 Producer 优先选择那些未发生过发送失败的 Broker 作为目标 Broker。该机制能够有效避免将消息发送到存在问题的 Broker,从而提升消息发送效率,降低发送耗时。当消息发送重试次数超过设定上限后,系统会抛出异常,此时由 Producer 自身负责保证消息不丢失。另外,当 Producer 出现 RemotingException、MQClientException 或 MQBrokerException 异常时,Producer 也会自动重试发送消息。
思考:让我们自己实现 Broker 的故障隔离功能,该如何实现呢?
(1) 方案一
- 在 Producer 中,可以维护一个线程安全的 Map 集合,key 为 Broker 实例,value 为其最近一次消息发送失败的时间戳。同时,Producer 还维护一个 Set 集合,用于存放当前认为可用的 Broker(即长时间未发生发送异常的 Broker)。在选择目标 Broker 时,从该 Set 集合中进行选择。当某个 Broker 发生发送失败时,将其从 Set 集合中移除,并放入到 Map 中。此外,可以定义一个定时任务,定期扫描该 Map 集合,对于已经超过一定冷却时间且未再次发生异常的 Broker,将其从 Map 中移除,并重新加入到 Set 集合中,从而实现 Broker 的自动恢复。
(2) 方案二
- 在 Producer 中,可以借助 Redis 的 ZSET 实现 Broker 故障隔离。使用
mq:fault:brokers作为 key,member 为 Broker,score 为 Broker 的恢复时间(当前时间 + 冷却时间);消息发送失败时执行ZADD mq:fault:brokers <recoverTime> <brokerName>将 Broker 加入隔离列表,选择可用 Broker 时先通过ZRANGEBYSCORE mq:fault:brokers -inf now获取已过冷却期的 Broker,再执行ZREM mq:fault:brokers <brokerName>(或批量删除)清理已过冷却期的 Broker,然后再通过ZRANGEBYSCORE mq:fault:brokers (now +inf获取仍在冷却期的 Broker 并在本地选择可用 Broker 时过滤掉,从而实现 Broker 的故障隔离与恢复机制。
- 在 Producer 中,可以借助 Redis 的 ZSET 实现 Broker 故障隔离。使用
(3) 方案三
- 为 Producer 中的每个 Broker 实例添加一个标识字段,例如一个
AtomicLong属性。只要某个 Broker 上发生过发送异常,就将该属性值增一。选择目标 Broker 时,优先选择该属性值最小的 Broker;若多个 Broker 的属性值相同,则采用轮询方式进行选择。
- 为 Producer 中的每个 Broker 实例添加一个标识字段,例如一个
异步发送失败策略
对于异步发送消息,如果消息发送失败,默认会重试发送 2 次。但在异步重试时不会选择其他 Broker,仅在同一个 Broker 上做重试,所以该策略无法保证消息不丢失。
1 | // 创建一个 Producer,参数为 Producer Group 名称 |
消息刷盘失败策略
当出现消息刷盘超时(Master 或 Slave)或 Slave 不可用时(即 Slave 在做数据同步时向 Master 返回的状态不是 SEND_OK),默认情况下不会将该消息尝试发送到其他 Broker。但对于重要消息,可以通过在 Broker 的配置文件中将 retryAnotherBrokerWhenNotStoreOK 属性设置为 true 来开启此功能。
1 | retryAnotherBrokerWhenNotStoreOK=true |
消息消费重试机制
顺序消息的消费重试
对于顺序消息,当 Consumer 消费消息失败后,为了保证消息的顺序性,RocketMQ 会在原 Topic 的原 Queue 中自动进行无限次消费重试(默认),直到消费成功为止。消费重试默认间隔时间为 1000 毫秒。消费重试期间应用会出现消息消费被阻塞的情况,这与普通消息超过最大消费重试次数后发往 %RETRY% 重试队列的机制不同。
1 | // 定义一个 Push 消费者(使用 Push 消费方式),参数为 Consumer Group(消费者组)的名称 |
由于对顺序消息的重试是无休止的,不间断的,直至消费成功为止;所以,对于顺序消息的消费,务必要保证应用能够及时监控并处理消费失败的情况,避免消费被永久性阻塞。特别注意,顺序消息的发送重试机制受到严格限制(不支持跨 Broker / Queue 重试),而消费端则提供了严格的失败重试机制以保证消费顺序性。
无序消息的消费重试
对于无序消息(包括普通消息、延时消息、事务消息),当 Consumer 消费消息失败时,可以通过设置返回状态(ConsumeConcurrentlyStatus.RECONSUME_LATER)达到消息重试的效果。特别注意,无序消息的重试只对集群消费方式生效,广播消费方式不提供失败重试特性。即对于广播消费,消息消费失败后,失败的消息不再重试消费,而是直接继续消费后续消息。
消费重试次数与间隔
对于无序消息 + 集群消费模式下的重试消费,每条消息默认最多重试消费 16 次,但每次重试的间隔时间是不同的,会逐渐变长。消费重试的时间间隔与延迟消息的延时等级十分相似,除了没有延时等级的前两个时间间隔外,其他的时间间隔都是相同的。每次消费重试的间隔时间如下表:
| 重试次数 | 与上次重试的间隔时间 |
|---|---|
| 1 | 10 秒 |
| 2 | 30 秒 |
| 3 | 1 分钟 |
| 4 | 2 分钟 |
| 5 | 3 分钟 |
| 6 | 4 分钟 |
| 7 | 5 分钟 |
| 8 | 6 分钟 |
| 9 | 7 分钟 |
| 10 | 8 分钟 |
| 11 | 9 分钟 |
| 12 | 10 分钟 |
| 13 | 20 分钟 |
| 14 | 30 分钟 |
| 15 | 1 小时 |
| 16 | 2 小时 |
若一条消息在一直消费失败的前提下,将会在第一次消费失败后的第 4 小时 46 分后进行第 16 次重试。若仍然消费失败(超过最大消费重试次数),则该消息将被投递到死信队列。可以通过代码修改最大消费重试次数:
1 | // 定义一个 Push 消费者(使用 Push 消费方式),参数为 Consumer Group(消费者组)的名称 |
对于修改过的消费重试次数,将按照以下策略执行消费重试:
- 若修改值小于 16,则按照指定时间间隔(如上表所示)进行重试
- 若修改值大于 16,则超过 16 次的重试时间间隔均为 2 小时
对于 Consumer Group(消费者组),若仅修改了一个 Consumer 的消费重试次数,则会应用到该消费者组中的所有其它 Consumer 实例。若出现多个 Consumer 均修改了消费重试次数的情况,则会采用覆盖方式生效,即最后被修改的值会覆盖前面设置的值。
重试队列的实现原理
对于需要重试消费的消息,并不是 Consumer 在等待了指定时长后再次从 Broker 拉取原来的消息进行消费,而是将这些需要重试消费的消息放入到了一个特殊 Topic 的队列中,然后再进行消费的。这个特殊的队列就是重试队列。
- 当出现需要进行重试消费的消息时,Broker 会自动为每个消费组创建一个 Topic 名称为
%RETRY%{consumerGroup}的重试队列。 - 消费失败的消息会被投递到该重试队列中,并按照一定的延迟策略再次投递和消费,从而实现消息的重试机制。
- 只有出现需要进行重试消费的消息时,Broker 才会为对应的消费者组创建重试队列。这个重试队列是针对消费者组的,而不是针对每个 Topic 创建的(因为一个 Topic 的消息可以被多个消费者组进行消费,所以会为这些消费者组各创建一个重试队列)。

特别注意
在重试队列中,Broker 对重试消息的处理是通过延迟消息机制实现的。当消息消费失败后,消息会被重新发送到对应的 %RETRY%{consumerGroup} 重试主题(即重试队列),同时设置延迟等级。在底层实现中,Broker 会先将该消息存入内部的 SCHEDULE_TOPIC_XXXX 调度主题(即延迟队列),待延迟时间到达后,再将消息投递回重试主题 %RETRY%{consumerGroup} 的对应队列供消费者再次消费。
消费重试的配置方式
在集群消费模式下,若消息消费失败后希望触发消费重试,需要在消息监听器接口的实现中明确采用以下三种方式之一进行配置。
- 方式一:返回
ConsumeConcurrentlyStatus.RECONSUME_LATER或者ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT - 方式二:返回 Null
- 方式三:抛出异常
1 | // 定义一个 Push 消费者(使用 Push 消费方式),参数为 Consumer Group(消费者组)的名称 |
在某些场景下,开发者可能需要主动决定是否重试消费。例如,当消息处理失败时,可以结合业务逻辑判断是否重新投递消息。具体实现方式:将消息发送到开发者自己创建的重试 Topic,由专门的消费者处理。
1 | if (reconsumeTimes >= 3) { |
消费不重试配置方式
在集群消费模式下,若消费失败后不希望触发重试,只需在捕获异常后返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS(即与消费成功时相同的返回状态),即可避免消费重试。
1 | // 定义一个 Push 消费者(使用 Push 消费方式),参数为 Consumer Group(消费者组)的名称 |
死信队列
什么是死信队列
当一条消息首次消费失败时,消息队列会自动触发消费重试。当重试次数达到上限后消费仍失败,表明消费者无法正常处理该消息。此时,消息队列不会立即将其丢弃,而是发送到该消费者对应的特殊队列中。这个队列称为死信队列(Dead-Letter Queue,DLQ),其中的消息称为死信消息(Dead-Letter Message,DLM)。死信队列专门用于处理无法被正常消费的消息。
死信产生的原因
当消息的消费重试次数超过 maxReconsumeTimes(默认值是 16)后仍无法被消费,RocketMQ 会将其标记为死信,并存储到对应的死信队列中。消息消费失败的常见原因包括:
- 消费者持续抛出异常
- 消息体格式错误导致消息解析失败
- 业务系统依赖的服务不可用(如数据库连接超时)
死信队列的特征
死信队列具有如下特征:
- 死信队列中的消息不会再被消费者正常消费,即 DLQ 对于消费者是不可见的
- 死信消息的存储有效期与正常消息相同,均为 3 天(即 commitlog 文件的过期时间),3 天后会被自动删除
- 死信队列就是一个特殊的 Topic,名称为
%DLQ%{consumerGroup},即每个消费者组对应一个死信队列 - 一个死信队列对应一个 Consumer Group(消费组),而不是对应单个 Consumer 实例;只有当消费组产生死信消息后,Broker 才会为其创建对应的死信队列
- 消费死信队列与消费普通 Topic 的方式完全相同,只需将 Topic 参数设置为死信队列的名称即可
死信队列的使用注意事项:
| 注意事项 | 说明 |
|---|---|
| 仅支持集群消费模式 | 死信队列机制只在集群消费模式下生效,因为广播消费模式下不会自动进行消费重试 |
| 有效期 | 死信消息与正常消息有效期相同,默认保留 48 小时(即 3 天),超时自动删除 |
| 幂等处理 | 死信消息可能被多次投递,消费端需做好幂等设计 |
死信消息的处理
实际上,当一条消息进入死信队列时,通常意味着系统中某些地方存在问题,导致消费者无法正常消费该消息(例如代码中存在 Bug、依赖服务故障、数据格式异常等)。因此,死信消息通常需要开发人员介入进行特殊处理。最关键的一步是排查可疑因素、修复代码中可能存在的 Bug,然后再将原有的死信消息重新投递到原 Topic 进行消费。
死信消息的处理流程:
- 自动进入死信队列:当消息的消费重试次数达到
maxReconsumeTimes(默认值是 16)时,RocketMQ 会自动将其投递到死信队列。 - 手动放入死信队列:在消费者代码中,显式判断消费重试次数,将消息发送到死信队列。
- 自动进入死信队列:当消息的消费重试次数达到
死信消息的处理策略:
- 监控与告警:通过 RocketMQ 控制台或 Prometheus 监控死信队列的消息堆积量,并设置阈值进行告警。
- 人工干预:将死信消息导出至日志系统(比如 ELK),分析消费失败原因(例如数据库字段缺失、接口变更等)。
- 重新投递:使用脚本或管理工具将死信消息重新投递到原 Topic 进行重新消费,但需要确保产生死信消息的问题已解决。
- 补偿机制:编写专用消费者处理死信队列,执行补偿操作(如数据修复、人工审核等)。
将死信消息重新投递到原 Topic 的意义:
- 问题已经修复:开发人员排查并解决了代码中的 Bug 后,原消费者已经能够正常处理这类消息了。此时将死信消息重新投递回原 Topic,让它走正常的消费流程,是最自然的方式。
- 保持业务完整性:原 Topic 可能被多个消费者订阅(例如用于不同业务场景),直接消费死信队列无法触达这些消费者。
- 复用成熟机制:重新投递后,消息可以再次享受 RocketMQ 提供的消费重试、负载均衡、顺序消费等能力。
为什么不建议直接消费死信消息,而是将其重新投递到原 Topic?
- 消息进入死信队列通常是因为代码 Bug、依赖服务故障、数据格式异常等根本性问题。直接消费死信消息,同样的异常会再次抛出。
- 消费者处理死信队列消息时,如果执行的还是同一套业务逻辑(复用同一 Consumer 的话);问题不解决,消费必然再次失败。
- 死信队列的设计初衷是 "隔离" 无法处理的消息,而不是继续用相同的方式去处理它们。
1 | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
死信队列的最佳实践
幂等性设计:避免重复消费
- 消息消费重试可能会导致重复消费,需要保障消费幂等性
- 比如:使用业务唯一标识,在消息中设置业务 ID(如订单号),通过数据库唯一索引或 Redis 去重。
合理配置最大重试次数与间隔
- 高频业务:降低最大重试次数(如 3 ~ 5 次),避免资源浪费
- 低频高可靠性业务:保留默认最多重试消费 16 次,并结合指数退避算法(比如
coolDown = base * 2^failCount)减少负载
死信队列的清理与归档
- 定期清理:对已确认无法修复消费的死信消息,通过定时任务清理死信队列(死信消息默认只保留 3 天)
- 归档存储:将死信消息持久化到日志系统(比如 ELK),供后续审计
RocketMQ 与 Kafka 的对比
| 特性 | RocketMQ | Kafka |
|---|---|---|
| 自动发送重试 | 默认支持,可配置重试次数和间隔 | 支持(通过 retries、retry.backoff.ms 等参数配置) |
| 自动消费重试 | 支持渐进式消费重试,可配置最大重试次数 | 不支持,需手动实现重试逻辑 |
| 死信队列 | 内置 DLQ,支持自动 / 手动重投 | 无原生支持,需自定义逻辑 |
| 适用场景 | 企业级高可靠性业务(如支付、订单) | 流式处理、日志聚合 |
