RabbitMQ 入门教程之九

大纲

前言

学习资源

RabbitMQ 临时队列

概念介绍

RabbitMQ 的‌临时队列‌(通常称为 ‌Exclusive Queue‌ 或 ‌Auto-Delete Queue‌)是一种特殊类型的队列,其设计目的是在短时间内使用,生命周期与消费者或应用程序的连接状态紧密相关。

  • 临时队列的核心特性

    • 自动删除(Auto-Delete)‌
      • 当队列的最后一个消费者断开连接,或 RabbitMQ 服务重启时,队列会被自动删除。
      • 适用于临时任务(如一次性请求),避免队列长期占用资源。
    • 排他性(Exclusive)‌
      • 队列仅允许创建它的连接(Connection)使用,其他连接无法访问。
      • 若创建队列的连接关闭,队列会立即被删除。
      • 适用于单次会话场景(例如 RPC 请求的响应队列)。
    • ‌动态命名(可选项)‌
      • 临时队列通常由 RabbitMQ 生成唯一名称(如 amq.gen-xxxx),无需手动命名,可以避免队列名称冲突。
  • 临时队列的使用场景

    • ‌RPC 模式(请求 - 响应)‌
      • 客户端发送请求时,创建一个临时队列作为响应队列,服务端处理完请求后将结果发送到此队列。任务完成后队列自动删除。‌
    • 临时订阅(Pub/Sub)‌
      • 在发布 / 订阅模式中,消费者动态创建临时队列绑定到交换机(交换机类型是 Fanout),实现按需接收消息,断开连接后自动删除队列。
    • ‌测试和调试‌
      • 在开发环境中,临时队列可用于快速测试消息流,避免残留测试数据。
  • 临时队列的‌注意事项‌

    • ‌数据持久性‌:临时队列不持久化消息,若需要保留数据,需使用持久化队列(durable=True)。
    • ‌连接稳定性‌:若连接意外断开,队列会立即删除,可能导致消息丢失。
    • ‌适用性‌:适用于短生命周期任务,不适合需长期存储或共享的队列。

使用案例

  • 临时队列的创建方式
1
2
// 创建临时队列,由 RabbitMQ 自动生成一个随机的队列名称
String queueName = channel.queueDeclare().getQueue();
  • 临时队列创建出来之后的样子

RabbitMQ 惰性队列

背景介绍

默认情况下,当生产者将消息发送到 RabbitMQ 时,队列会尽可能将消息存储在内存中,以便更快速地将消息推送给消费者。即使是被标记为 “持久化” 的消息,在写入磁盘的同时,也会在内存中保留一份副本。然而,当 RabbitMQ 内存使用率过高,系统需要释放内存资源时,就会将部分消息从内存中 “换页” 到磁盘中。这个过程不仅耗时较长,还可能阻塞队列的正常操作,从而导致 RabbitMQ 无法及时接收新的消息。尽管 RabbitMQ 的开发者持续优化了内存管理和换页相关的算法,但在消息量特别大的场景下,效果仍然有限,性能问题依然突出。为了解决这个问题,可以使用 RabbitMQ 的惰性队列。

概念介绍

从 RabbitMQ 3.6.0 版本开始,引入了惰性队列(Lazy Queue)的概念。惰性队列的设计目标是:尽可能将消息存储在磁盘中,仅在消费者真正需要消费消息时,才将消息从磁盘加载到内存中处理。这种机制的一个重要意义在于,它能够支持更长的队列长度,即可以容纳更多的消息存储,而不会像默认的内存队列那样因内存占用过高而导致性能下降甚至崩溃。在实际使用中,当消费者由于各种原因(例如消费者下线、宕机、由于维护而关闭等)长时间无法消费消息,导致消息大量堆积时,惰性队列就显得尤为重要。它能够有效减轻内存压力,保证系统在面对大规模消息积压时依然能够保持稳定运行。

  • 惰性队列的核心特性

    • 消息尽可能写入磁盘,只有在消费时才加载进内存,以减少内存压力。
    • 设计目的是为了支持消息堆积较多的业务场景。
  • 惰性队列的使用场景

    • 消息堆积多:消费者下线 / 处理慢,导致消息大量堆积。
    • 实时性要求低:可以接受消费延迟,如:日志收集、邮件通知、定时任务等。
    • 内存资源紧张:惰性队列有效减轻内存占用,降低 OOM 风险。
    • 相比普通队列(消息优先保存在内存),惰性队列更加适合高吞吐、消息堆积量大但实时性要求不高的场景。
  • 惰性队列的注意事项

    • 惰性队列虽然可以减少内存占用,但会增加磁盘 I/O 负担,导致消息的消费速度减慢。
    • 惰性队列不支持在已有队列上 “修改模式”,只能重新声明队列,或者通过 Policy 策略设置队列模式。
    • 惰性队列不适合对实时性要求非常高的系统。

使用说明

RabbitMQ 的队列支持两种模式:default(默认模式)和 lazy(惰性模式)。在默认情况下,队列处于 default 模式。在 3.6.0 版本之前,RabbitMQ 并不支持惰性队列,因此无需进行任何配置更改。从 3.6.0 版本开始,RabbitMQ 支持通过以下两种方式将队列设置为 lazy 模式:

  • (1) 在声明队列时通过参数设置:例如使用 channel.queueDeclare() 方法时,指定相应的参数。
  • (2) 通过 Policy 策略设置:通过命令行或管理界面为符合条件的队列统一应用惰性队列策略,此方式适合批量或动态更改已有队列的模式。

特别注意

  • 如果同时使用了声明队列参数和指定 Policy 策略这两种方式来将队列设置为 lazy 模式,那么 Policy 策略设置方式的优先级更高,也就是会覆盖声明队列时的参数配置。
  • 如果希望通过修改队列声明参数的方式来将已有队列切换为 lazy 模式,那么必须先删除原有队列,再重新声明一个新的惰性队列,因为 RabbitMQ 不支持直接修改已存在队列的模式。

使用案例

在声明队列的时候,可以通过 x-queue-mode 参数来设置队列的模式,可选值为 defaultlazy。在下面示例中,演示了一个惰性队列的声明细节:

1
2
3
4
5
6
// 队列属性
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");

// 声明队列
channel.queueDeclare("myqueue", true, false, false, args);

RabbitMQ 优先级队列

背景介绍

在我们的系统中,有一个订单催付的场景:当客户在天猫下单后,淘宝会将订单信息及时推送给我们。如果用户在设定的时间内仍未付款,我们就会向其发送一条短信提醒。这看起来是一个非常简单的功能,对吧?但实际上,对于我们来说,天猫商家中也存在 “大客户” 和 “小客户” 之分。例如像苹果、小米这样的头部品牌,每年都能为我们带来可观的收益,自然他们的订单理应获得更高的处理优先级。过去,我们的后端系统采用 Redis 实现定时轮询,用 List 模拟一个简易的消息队列。但大家都知道,Redis 本身并不支持优先级处理机制,只能按照先进先出的顺序进行处理。因此,随着订单量的不断增长,这种方式逐渐暴露出性能和灵活性上的瓶颈。为此,我们对系统进行了优化与重构,采用了 RabbitMQ 来替代 Redis,实现更强大的消息调度能力。通过引入 RabbitMQ 的优先级队列功能,我们可以根据客户类型为订单设置不同的优先级:若判断为大客户订单,则赋予较高的优先级,确保优先处理;否则则采用默认的普通优先级。

概念介绍

RabbitMQ 的优先级队列(Priority Queue)是一种可以根据消息优先级来决定投递顺序的队列,优先级高的消息会被优先消费,从而实现更灵活的消息调度策略。

  • 优先级队列的核心特性

    • 基于消息的优先级排序:每条消息都可以携带一个 priority 属性(0 ~ 255 之间的整数),数值越大,优先级越高。
    • 队列维度配置优先级上限:优先级队列在创建时,需要设置最大优先级(如 x-max-priority = 10),RabbitMQ 会根据这个值来决定是否启用优先级功能以及如何排序。
    • 低优先级不会被饿死:RabbitMQ 内部仍是尽量公平消费,即使有高优先级消息存在,低优先级消息最终也会被消费。
  • 优先级队列的应用场景

    • 异常报警信息优先投递。
    • 多级任务处理系统(紧急任务优先执行)。
    • 用户消息分类处理(VIP 用户优先处理)。
  • 优先级队列的注意事项

    • 并不是所有的队列默认支持优先级,要明确声明。
    • 优先级排序不是绝对精确,而是尽可能按优先级调度,受限于 RabbitMQ 的内部实现与队列状态。
    • 不建议滥用高优先级,避免导致低优先级的消息长期积压。

使用说明

在 RabbitMQ 中,只有同时满足以下三个条件(缺一不可),优先级队列才能真正发挥作用,实现 “重要消息优先处理” 的目标。

  • 队列声明为优先级队列

    • 在创建队列时,需显式设置队列支持优先级功能,例如通过设置参数 x-max-priority 来指定队列的最大优先级值。
  • 为消息设置优先级属性

    • 生产者在发送消息时,需要在消息属性中指定优先级(如 priority = 5),以便队列在接收到消息后,能够根据优先级对消息进行排序处理。
  • 消费者延后启动或消费

    • 为了让队列有时间接收并排序不同优先级的消息,消费者需要在所有消息发送完毕之后再开始消费,否则可能会出现先发送的低优先级消息被立即消费,导致高优先级消息晚到却无法优先消费的情况。

在 RabbitMQ 中,使用优先级队列需要执行以下两个步骤:

  • (1) 在生产者 / 消费者声明队列时,设置队列的最大优先级,范围在 0 ~ 255 之间的整数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 设置队列属性
Map<String, Object> queueArguments = new HashMap<>();
// 队列的最大优先级,范围在 0 ~ 255 之间的整数
queueArguments.put("x-max-priority", 10);

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments);
  • (2) 在生产者发送消息时,设置消息的优先级(不能超过队列的最大优先级)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 设置消息属性
AMQP.BasicProperties msgProperties = new AMQP.BasicProperties()
.builder()
// 消息的持久化(可选)
.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode())
// 消息的优先级(不能超过队列的最大优先级)
.priority(5)
.build();

// 发送消息
// 参数说明:
// exchange – 要将消息发布到的交换机,空字符串表示默认交换机
// routingKey – 路由 Key
// props – 消息的其他属性,比如:使用 MessageProperties.PERSISTENT_TEXT_PLAIN 属性确保消息持久化,配合持久化队列可避免服务器重启导致消息丢失
// body – 消息内容
channel.basicPublish(EXCHANGE_NAME, "", msgProperties, msg.getBytes(StandardCharsets.UTF_8));

在 RabbitMQ 中,优先级队列创建好的样子如下图所示

使用案例

版本说明

本案例使用的各软件版本如下表所示:

组件版本说明
RabbitMQ Server3.8.26
RabbitMQ Client5.10.0
Erlang24.2
Java11

案例代码

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
  • 生产者的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.nio.charset.StandardCharsets;

public class MQProducer {

// 交换机的名称
public static final String EXCHANGE_NAME = "priority.exchange";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.127");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");

// 使用 try-with-resources 自动关闭连接和通道,确保资源释放
try (
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel()
) {
// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, false, null);

// 循环发送消息
for (int i = 0; i < 10; i++) {
String msg = "info" + i;
int msgPriority = i == 5 ? 8 : 0;

// 设置消息属性
AMQP.BasicProperties msgProperties = new AMQP.BasicProperties()
.builder()
// 消息的持久化
.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode())
// 消息的优先级(不能超过队列的最大优先级)
.priority(msgPriority)
.build();

// 发送消息
// 参数说明:
// exchange – 要将消息发布到的交换机,空字符串表示默认交换机
// routingKey – 路由 Key
// props – 消息的其他属性,比如:使用 MessageProperties.PERSISTENT_TEXT_PLAIN 属性确保消息持久化,配合持久化队列可避免服务器重启导致消息丢失
// body – 消息内容
channel.basicPublish(EXCHANGE_NAME, "", msgProperties, msg.getBytes(StandardCharsets.UTF_8));
}
}
}

}
  • 消费者的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;

public class MQConsumer {

// 交换机的名称
public static final String EXCHANGE_NAME = "priority.exchange";

// 队列的名称
public static final String QUEUE_NAME = "priority.queue";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.127");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");

// 创建连接
Connection connection = factory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, false, null);

// 设置队列属性
Map<String, Object> queueArguments = new HashMap<>();
// 队列的最大优先级,范围在 0 ~ 255 之间的整数
queueArguments.put("x-max-priority", 10);

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments);

// 绑定交换机和队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey(如果不使用 Routingkey,可填空字符串)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("Successed to consume message : " + msg);

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

// 消费者延后启动(等待生产者将所有消息发送完再消费)
try {
System.out.println("消费者等待消息发送完毕...");
Thread.sleep(20000);
} catch (Exception e) {
e.printStackTrace();
}

System.out.println("消费者开始消费消息...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
connection.close();
}

}

代码测试

  • (1) 先启动消费者应用(让交换机和队列提前创建好),然后再启动生产者应用

  • (2) 等待一段时间后,在消费者的控制台中,会输出以下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
消费者等待消息发送完毕...
消费者开始消费消息...
按回车键退出程序:
Successed to consume message : info5
Successed to consume message : info0
Successed to consume message : info1
Successed to consume message : info2
Successed to consume message : info3
Successed to consume message : info4
Successed to consume message : info6
Successed to consume message : info7
Successed to consume message : info8
Successed to consume message : info9
  • (3) 观察消费者的控制台输出结果,可以发现 info5 的消息(优先级最高)会被优先消费,也就是说明优先级队列生效了