RabbitMQ 入门教程之六

大纲

前言

学习资源

版本说明

本文所有案例代码使用的各软件版本如下表所示:

组件版本说明
RabbitMQ Server3.8.26
RabbitMQ Client5.10.0
Erlang24.2
Java11

RabbitMQ 死信队列

概念介绍

死信队列(Dead Letter Queue - DLQ)是在 RabbitMQ 中处理无法被消费者正常消费的消息的队列。顾名思义,死信就是无法被消费的消息。当消息无法被成功消费时,它们会被路由到指定的死信队列,以便后续的处理和分析。一般来说,生产者将消息投递到 Queue 里了,消费者从 Queue 取出消息进行消费,但某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有了死信就应该有死信队列。举个例子:用户在商城下单成功后,在指定时间未支付时自动取消订单。还有比如说:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列,当消息消费发生异常时,将消息投入死信队列中.

  • 死信的来源

    • 消费者拒绝:消费者拒绝了消息,比如使用 channel.basicReject() 或者 channel.basicNack(),并且没有重新入队或设置 requeue 参数为 false
    • 消息过期:消息在队列中存放超过设定的 TTL(生存时间)后,会被视为死信。
    • 队列撑满:当队列达到最大长度,新的消息无法被添加,会被转发到死信队列。
  • 死信队列的核心特性

    • 消息过期:消息在队列中存在超过指定的 TTL(生存时间)后,会被认为是 “死信”,然后转发到死信队列。
    • 重复消费失败:如果消费者在一定次数内无法成功消费消息,这些消息也会被转发到死信队列。
    • 路由失败:当消息无法找到对应的队列,例如目标队列已不存在时,消息会被发送到死信队列。
    • 独立性:死信队列可以与主队列分开管理,便于对异常消息进行独立分析。
  • 死信队列的使用场景

    • 异常处理:用于捕获消费过程中发生错误的消息,帮助开发人员进行故障排查。
    • 数据审计:保存那些无法被处理的消息,便于进行数据审计和回溯。
    • 再处理机制:允许对死信队列中的消息进行重试或人工干预,确保业务逻辑的可靠性。
    • 快速响应:避免主队列阻塞,提升系统的整体响应速度。

案例代码

本节将演示如何使用 RabbiMQ 的死信队列,整体的代码架构如下图所示:

代码下载

本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-11

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
  • 工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtils {

public static ConnectionFactory connectionFactory;

static {
// 创建连接工厂
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.127");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
}

/**
* 创建信道
*/
public static Channel createChannel() throws Exception {
// 创建连接
Connection connection = connectionFactory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

return channel;
}

}
  • 生产者的代码(往普通交换机发送消息,并指定消息的 RoutingKey)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.nio.charset.StandardCharsets;

public class MQProducer {

// 普通交换机的名称
public static final String NORMAL_EXCHANGE_NAME = "normal_exchange";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明普通交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

// 设置消息属性
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
// 消息的过期时间(TTL),单位是毫秒
.expiration("10000")
// 消息的持久化
.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode())
.build();

// 发送消息
for (int i = 0; i < 5; i++) {
// 消息内容
String message = "info" + i;
System.out.println("发送消息:" + message);

// 发送消息
// 参数说明:
// exchange – 要将消息发布到的交换机,空字符串表示默认交换机
// routingKey – 路由 Key
// props – 消息的其他属性,比如:使用 MessageProperties.PERSISTENT_TEXT_PLAIN 属性确保消息持久化,配合持久化队列可避免服务器重启导致消息丢失
// body – 消息内容
channel.basicPublish(NORMAL_EXCHANGE_NAME, "zhangsan", properties, message.getBytes(StandardCharsets.UTF_8));
}

// 关闭信道
channel.close();
}

}
  • 消费者一的代码(绑定死信交换机和死信队列,并且在声明普通队列时,指定死信交换机和死信队列的 RoutingKey)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;

public class MQConsumer01 {

// 死信交换机的名称
public static final String DEAD_EXCHANGE_NAME = "dead_exchange";

// 死信队列的名称
public static final String DEAD_QUEUE_NAME = "dead-queue";

// 普通交换机的名称
public static final String NORMAL_EXCHANGE_NAME = "normal_exchange";

// 普通队列的名称
public static final String NORMAL_QUEUE_NAME = "normal-queue";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// 声明死信交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

// 声明死信队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);

// 绑定死信交换机和死信队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey
channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi");

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// 声明普通交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

// 普通队列绑定死信队列的配置信息
Map<String, Object> normalArguments = new HashMap<>();
// 普通队列设置死信交换机,参数 key 是固定值
normalArguments.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
// 普通队列设置死信队列的 RoutingKey,参数 key 是固定值
normalArguments.put("x-dead-letter-routing-key", "lisi");

// 声明普通队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(NORMAL_QUEUE_NAME, true, false, false, normalArguments);

// 绑定普通交换机和普通队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey
channel.queueBind(NORMAL_QUEUE_NAME, NORMAL_EXCHANGE_NAME, "zhangsan");

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);

// 模拟消息的耗时处理
try {
Thread.sleep(100);
System.out.println("Consume from normal queue, RoutingKey : " + message.getEnvelope().getRoutingKey() + ", Message : " + msg);
} catch (Exception e) {
e.printStackTrace();
}

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者一等待接收消息...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(NORMAL_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
}

}
  • 消费者二的代码(绑定死信交换机和死信队列,并且从死信队列中消费消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQConsumer02 {

// 死信交换机的名称
public static final String DEAD_EXCHANGE_NAME = "dead_exchange";

// 死信队列的名称
public static final String DEAD_QUEUE_NAME = "dead-queue";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明死信交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

// 声明死信队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(DEAD_QUEUE_NAME, true, false, false, null);

// 绑定死信交换机和死信队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey
channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi");

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);

// 模拟消息的耗时处理
try {
Thread.sleep(100);
System.out.println("Consume from dead queue, RoutingKey : " + message.getEnvelope().getRoutingKey() + ", Message : " + msg);
} catch (Exception e) {
e.printStackTrace();
}

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者二等待接收消息...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(DEAD_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
}

}

代码测试

  • (1) 首先启动消费者一应用,让 RabbitMQ 正常创建在 Java 代码里声明的交换机和队列。

  • (2) 关闭消费者一应用,模拟消费者不能正常处理消息。

  • (3) 启动生产者,发送消息到普通队列中(normal-queue)。由于消息设置了 TTL 时间,一旦消息在指定时间内没被处理,消息就会被自动投递到死信队列(dead-queue)中。

  • (4) 启动消费者二应用,在消费者二的控制台中,可以看到从死信队列消费到的消息
1
2
3
4
5
6
7
消费者二等待接收消息...
按回车键退出程序:
Consume from dead queue, RoutingKey : lisi, Message : info0
Consume from dead queue, RoutingKey : lisi, Message : info1
Consume from dead queue, RoutingKey : lisi, Message : info2
Consume from dead queue, RoutingKey : lisi, Message : info3
Consume from dead queue, RoutingKey : lisi, Message : info4
  • (5) 最终,死信队列中的消息会全部被消费者二消费掉

RabbitMQ 队列长度

概念介绍

在使用死信队列时,死信的来源有以下几种:

  • 消费者拒绝:消费者拒绝了消息,比如使用 channel.basicReject() 或者 channel.basicNack(),并且没有重新入队或设置 requeue 参数为 false
  • 消息过期:消息在队列中存放超过设定的 TTL(生存时间)后,会被视为死信。
  • 队列撑满:当队列达到最大长度,新的消息无法被添加,会被转发到死信队列。

当队列达到最大长度时(队列被撑满了),为了避免消息丢失,需要将新的消息转发到死信队列中。这就需要在生产者 / 消费者声明队列的时候,设置队列的最大长度。

案例代码

在生产者 / 消费者声明队列的时候,设置队列的最大长度。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 队列的配置
Map<String, Object> arguments = new HashMap<>();
// 队列的最大长度
arguments.put("x-max-length", 1000);

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);

RabbitMQ 中的 TTL

概念介绍

TTL(Time To Live)是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间(单位是毫秒)。换句话说,如果一条消息设置了 TTL 属性或者进入了设置了 TTL 属性的队列,那么这条消息如果在设置的 TTL 时间内没有被消费,则会成为 “死信”;如果该队列绑定了死信队列,那么这条消息就会被转发到死信队列中。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个 TTL 值将会生效。

案例代码

消息的 TTL

在生产者发送消息时,可以设置消息的 TTL 属性。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 消息的配置
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
// 消息的过期时间(TTL),单位是毫秒
.expiration("10000")
.build();

// 发送消息
// 参数说明:
// exchange – 要将消息发布到的交换机,空字符串表示默认交换机
// routingKey – 路由 Key
// props – 消息的其他属性,比如:使用 MessageProperties.PERSISTENT_TEXT_PLAIN 属性确保消息持久化,配合持久化队列可避免服务器重启导致消息丢失
// body – 消息内容
channel.basicPublish(EXCHANGE_NAME, "info", properties, message.getBytes(StandardCharsets.UTF_8));

队列的 TTL

在生产者 / 消费者声明队列时,可以设置队列的 TTL 属性。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 队列的配置
Map<String, Object> arguments = new HashMap<>();
// 队列的过期时间(TTL),单位是毫秒
arguments.put("x-message-ttl", 5000);

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);

使用注意事项

  • 当设置了消息或者队列的 TTL 属性,即使消息过期了,也不一定会被立刻丢弃(如果配置了死信队列,消息会被转发到死信队列)。因为消息是否过期是在即将投递给消费者之前判定的,如果队列有严重的消息积压情况,则已过期的消息可能还会存活较长一段时间。
  • 如果不设置 TTL,表示消息永远不会过期;如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃(如果配置了死信队列,消息会被转发到死信队列)

延迟队列的实现

在 RabbitMQ 中,可以利用 TTL 特性和死信队列来实现延迟队列。思考一下,延时队列不就是想要消息延迟多久被处理吗?TTL 刚好能够让消息在延迟多久之后成为 "死信",另一方面,成为 "死信" 的消息都会被投递到死信队列中,这样只需要消费者一直消费死信队列里的消息就可以了,因为里面的消息都是希望被立即处理的消息。值得一提的是,无论是设置消息的 TTL 属性,还是设置队列的 TTL 属性(x-message-ttl),RabbitMQ 的 TTL 判定是惰性的,不是主动的。死信机制依赖于 "消息被投递或检查时",才会判定是否需要丢进死信队列。如果需要更实时的 TTL 处理来实现延迟队列,可以考虑使用延迟队列插件。

底层原理分析

TTL 过期 ≠ 立即转发到死信队列

在 RabbitMQ 中,消息或者队列的 TTL 机制的行为如下:

  • RabbitMQ 不会实时扫描队列中哪些消息过期了。

在 RabbitMQ 中,消息什么时候会被判断为 “过期”?

  • 只有在以下两种场景中,RabbitMQ 才会检查消息是否过期:

    • 消息准备投递给消费者时
    • 有新的消息到达队列,触发了队列的前端检查
  • 这意味着:

    • 如果一个消息在队列中排在很后面(前面有大量积压),它即使已经过期,也不会立刻被丢弃或转发到死信队列。
    • 只有当 RabbitMQ 尝试 “处理” 这条消息时,才会检查消息的 TTL 是否过期。

死信队列何时会被触发?

消息只有在准备投递且被发现过期的时候,才会被 RabbitMQ 判定为过期,然后转发到死信队列中。RabbitMQ 中一条消息被转发到死信队列的常见情况包括:

  • 消费者拒绝:消费者拒绝了消息,比如使用 channel.basicReject() 或者 channel.basicNack(),并且没有重新入队或设置 requeue 参数为 false
  • 消息过期:消息在队列中存放超过设定的 TTL(生存时间)后,会被视为死信。
  • 队列撑满:当队列达到最大长度,新的消息无法被添加,会被转发到死信队列。

举个例子说明

假设有一个队列 Q1 设置了死信交换机 DLX1,消息的 TTL 设置为 10 秒。

  • 当生产者发送了 10 万条消息,但消费者的消费速度很慢或者根本没有消费。
  • 消息会在队列中排队等待,即使前面的消息已经 “过期”,RabbitMQ 也不会主动去清理过期消息。
  • 等到 RabbitMQ 尝试投递这条消息时,发现消息的 TTL 已经过期了,才会将其投递给死信交换机 DLX1

总结

无论是设置消息的 TTL 属性,还是设置队列的 TTL 属性(x-message-ttl),RabbitMQ 的 TTL 判定是惰性的,不是主动的。死信机制依赖于 "消息被投递或检查时",才会判定是否需要丢进死信队列。

注意

如果设置的是队列的 TTL 属性(x-message-ttl),而不是逐条设置消息的 TTL 属性,那 RabbitMQ 是否就能在消息到期后立刻将其丢进死信队列呢?答案是:不会,RabbitMQ 的 TTL 判定仍然是惰性的!因为当 x-message-ttl 属性设置在队列上,RabbitMQ 只是自动为每一条新进来的消息设置相同的 TTL 值,本质上还是 "消息 TTL"。真正意义上的 "自动过期清理" 机制只有一个:使用延迟队列插件。