大纲
前言
学习资源
版本说明
本文所有案例代码使用的各软件版本如下表所示:
组件 | 版本 | 说明 |
---|
RabbitMQ Server | 3.8.26 | |
RabbitMQ Client | 5.10.0 | |
Erlang | 24.2 | |
Java | 11 | |
RabbitMQ 死信队列
概念介绍
死信队列(Dead Letter Queue - DLQ)是在 RabbitMQ 中处理无法被消费者正常消费的消息的队列。顾名思义,死信就是无法被消费的消息。当消息无法被成功消费时,它们会被路由到指定的死信队列,以便后续的处理和分析。一般来说,生产者将消息投递到 Queue 里了,消费者从 Queue 取出消息进行消费,但某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有了死信就应该有死信队列。举个例子:用户在商城下单成功后,在指定时间未支付时自动取消订单。还有比如说:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列,当消息消费发生异常时,将消息投入死信队列中.
死信的来源
- 消费者拒绝:消费者拒绝了消息,比如使用
channel.basicReject()
或者 channel.basicNack()
,并且没有重新入队或设置 requeue
参数为 false
。 - 消息过期:消息在队列中存放超过设定的 TTL(生存时间)后,会被视为死信。
- 队列撑满:当队列达到最大长度,新的消息无法被添加,会被转发到死信队列。
死信队列的核心特性
- 消息过期:消息在队列中存在超过指定的 TTL(生存时间)后,会被认为是 “死信”,然后转发到死信队列。
- 重复消费失败:如果消费者在一定次数内无法成功消费消息,这些消息也会被转发到死信队列。
- 路由失败:当消息无法找到对应的队列,例如目标队列已不存在时,消息会被发送到死信队列。
- 独立性:死信队列可以与主队列分开管理,便于对异常消息进行独立分析。
死信队列的使用场景
- 异常处理:用于捕获消费过程中发生错误的消息,帮助开发人员进行故障排查。
- 数据审计:保存那些无法被处理的消息,便于进行数据审计和回溯。
- 再处理机制:允许对死信队列中的消息进行重试或人工干预,确保业务逻辑的可靠性。
- 快速响应:避免主队列阻塞,提升系统的整体响应速度。
案例代码
本节将演示如何使用 RabbiMQ 的死信队列,整体的代码架构如下图所示:
![]()
代码下载
本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-11
。
1 2 3 4 5
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
public static ConnectionFactory connectionFactory;
static { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.2.127"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); }
public static Channel createChannel() throws Exception { Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
return channel; }
}
|
- 生产者的代码(往普通交换机发送消息,并指定消息的 RoutingKey)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| import com.clay.rabbitmq.utils.RabbitMQUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties;
import java.nio.charset.StandardCharsets;
public class MQProducer {
public static final String NORMAL_EXCHANGE_NAME = "normal_exchange";
public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);
AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder() .expiration("10000") .deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode()) .build();
for (int i = 0; i < 5; i++) { String message = "info" + i; System.out.println("发送消息:" + message);
channel.basicPublish(NORMAL_EXCHANGE_NAME, "zhangsan", properties, message.getBytes(StandardCharsets.UTF_8)); }
channel.close(); }
}
|
- 消费者一的代码(绑定死信交换机和死信队列,并且在声明普通队列时,指定死信交换机和死信队列的 RoutingKey)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
| import com.clay.rabbitmq.utils.RabbitMQUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Scanner;
public class MQConsumer01 {
public static final String DEAD_EXCHANGE_NAME = "dead_exchange";
public static final String DEAD_QUEUE_NAME = "dead-queue";
public static final String NORMAL_EXCHANGE_NAME = "normal_exchange";
public static final String NORMAL_QUEUE_NAME = "normal-queue";
public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel();
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);
channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi");
channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);
Map<String, Object> normalArguments = new HashMap<>(); normalArguments.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME); normalArguments.put("x-dead-letter-routing-key", "lisi");
channel.queueDeclare(NORMAL_QUEUE_NAME, true, false, false, normalArguments);
channel.queueBind(NORMAL_QUEUE_NAME, NORMAL_EXCHANGE_NAME, "zhangsan");
DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8);
try { Thread.sleep(100); System.out.println("Consume from normal queue, RoutingKey : " + message.getEnvelope().getRoutingKey() + ", Message : " + msg); } catch (Exception e) { e.printStackTrace(); }
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); };
CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); };
System.out.println("消费者一等待接收消息...");
boolean autoAck = false;
channel.basicConsume(NORMAL_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
System.out.println("按回车键退出程序:"); new Scanner(System.in).nextLine();
channel.close(); }
}
|
- 消费者二的代码(绑定死信交换机和死信队列,并且从死信队列中消费消息)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| import com.clay.rabbitmq.utils.RabbitMQUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets; import java.util.Scanner;
public class MQConsumer02 {
public static final String DEAD_EXCHANGE_NAME = "dead_exchange";
public static final String DEAD_QUEUE_NAME = "dead-queue";
public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel();
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);
channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi");
DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8);
try { Thread.sleep(100); System.out.println("Consume from dead queue, RoutingKey : " + message.getEnvelope().getRoutingKey() + ", Message : " + msg); } catch (Exception e) { e.printStackTrace(); }
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); };
CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); };
System.out.println("消费者二等待接收消息...");
boolean autoAck = false;
channel.basicConsume(DEAD_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
System.out.println("按回车键退出程序:"); new Scanner(System.in).nextLine();
channel.close(); }
}
|
代码测试
- (1) 首先启动消费者一应用,让 RabbitMQ 正常创建在 Java 代码里声明的交换机和队列。
![]()
![]()
![]()
![]()
- (4) 启动消费者二应用,在消费者二的控制台中,可以看到从死信队列消费到的消息
1 2 3 4 5 6 7
| 消费者二等待接收消息... 按回车键退出程序: Consume from dead queue, RoutingKey : lisi, Message : info0 Consume from dead queue, RoutingKey : lisi, Message : info1 Consume from dead queue, RoutingKey : lisi, Message : info2 Consume from dead queue, RoutingKey : lisi, Message : info3 Consume from dead queue, RoutingKey : lisi, Message : info4
|
- (5) 最终,死信队列中的消息会全部被消费者二消费掉
![]()
RabbitMQ 队列长度
概念介绍
在使用死信队列时,死信的来源有以下几种:
- 消费者拒绝:消费者拒绝了消息,比如使用
channel.basicReject()
或者 channel.basicNack()
,并且没有重新入队或设置 requeue
参数为 false
。 - 消息过期:消息在队列中存放超过设定的 TTL(生存时间)后,会被视为死信。
- 队列撑满:当队列达到最大长度,新的消息无法被添加,会被转发到死信队列。
当队列达到最大长度时(队列被撑满了),为了避免消息丢失,需要将新的消息转发到死信队列中。这就需要在生产者 / 消费者声明队列的时候,设置队列的最大长度。
案例代码
在生产者 / 消费者声明队列的时候,设置队列的最大长度。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-length", 1000);
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
|
RabbitMQ 中的 TTL
概念介绍
TTL(Time To Live)是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间(单位是毫秒)。换句话说,如果一条消息设置了 TTL 属性或者进入了设置了 TTL 属性的队列,那么这条消息如果在设置的 TTL 时间内没有被消费,则会成为 “死信”;如果该队列绑定了死信队列,那么这条消息就会被转发到死信队列中。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个 TTL 值将会生效。
案例代码
消息的 TTL
在生产者发送消息时,可以设置消息的 TTL 属性。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder() .expiration("10000") .build();
channel.basicPublish(EXCHANGE_NAME, "info", properties, message.getBytes(StandardCharsets.UTF_8));
|
队列的 TTL
在生产者 / 消费者声明队列时,可以设置队列的 TTL 属性。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 5000);
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
|
使用注意事项
- 当设置了消息或者队列的 TTL 属性,即使消息过期了,也不一定会被立刻丢弃(如果配置了死信队列,消息会被转发到死信队列)。因为消息是否过期是在即将投递给消费者之前判定的,如果队列有严重的消息积压情况,则已过期的消息可能还会存活较长一段时间。
- 如果不设置 TTL,表示消息永远不会过期;如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃(如果配置了死信队列,消息会被转发到死信队列)。
延迟队列的实现
在 RabbitMQ 中,可以利用 TTL 特性和死信队列来实现延迟队列。思考一下,延时队列不就是想要消息延迟多久被处理吗?TTL 刚好能够让消息在延迟多久之后成为 "死信",另一方面,成为 "死信" 的消息都会被投递到死信队列中,这样只需要消费者一直消费死信队列里的消息就可以了,因为里面的消息都是希望被立即处理的消息。值得一提的是,无论是设置消息的 TTL 属性,还是设置队列的 TTL 属性(x-message-ttl
),RabbitMQ 的 TTL 判定是惰性的,不是主动的。死信机制依赖于 "消息被投递或检查时",才会判定是否需要丢进死信队列。如果需要更实时的 TTL 处理来实现延迟队列,可以考虑使用延迟队列插件。
底层原理分析
TTL 过期 ≠ 立即转发到死信队列
在 RabbitMQ 中,消息或者队列的 TTL 机制的行为如下:
- RabbitMQ 不会实时扫描队列中哪些消息过期了。
在 RabbitMQ 中,消息什么时候会被判断为 “过期”?
死信队列何时会被触发?
消息只有在准备投递且被发现过期的时候,才会被 RabbitMQ 判定为过期,然后转发到死信队列中。RabbitMQ 中一条消息被转发到死信队列的常见情况包括:
- 消费者拒绝:消费者拒绝了消息,比如使用
channel.basicReject()
或者 channel.basicNack()
,并且没有重新入队或设置 requeue
参数为 false
。 - 消息过期:消息在队列中存放超过设定的 TTL(生存时间)后,会被视为死信。
- 队列撑满:当队列达到最大长度,新的消息无法被添加,会被转发到死信队列。
举个例子说明
假设有一个队列 Q1
设置了死信交换机 DLX1
,消息的 TTL 设置为 10 秒。
- 当生产者发送了 10 万条消息,但消费者的消费速度很慢或者根本没有消费。
- 消息会在队列中排队等待,即使前面的消息已经 “过期”,RabbitMQ 也不会主动去清理过期消息。
- 等到 RabbitMQ 尝试投递这条消息时,发现消息的 TTL 已经过期了,才会将其投递给死信交换机
DLX1
。
总结
无论是设置消息的 TTL 属性,还是设置队列的 TTL 属性(x-message-ttl
),RabbitMQ 的 TTL 判定是惰性的,不是主动的。死信机制依赖于 "消息被投递或检查时",才会判定是否需要丢进死信队列。
注意
如果设置的是队列的 TTL 属性(x-message-ttl
),而不是逐条设置消息的 TTL 属性,那 RabbitMQ 是否就能在消息到期后立刻将其丢进死信队列呢?答案是:不会,RabbitMQ 的 TTL 判定仍然是惰性的!因为当 x-message-ttl
属性设置在队列上,RabbitMQ 只是自动为每一条新进来的消息设置相同的 TTL 值,本质上还是 "消息 TTL"。真正意义上的 "自动过期清理" 机制只有一个:使用延迟队列插件。