RabbitMQ 入门教程之五

大纲

前言

学习资源

版本说明

本文所有案例代码使用的各软件版本如下表所示:

组件版本说明
RabbitMQ Server3.8.26
RabbitMQ Client5.10.0
Erlang24.2
Java11

RabbitMQ 交换机

在前面的教程中,创建了一个工作队列,而且假设的是工作队列背后,每个消息都恰好交付给一个消费者(工作进程)来处理。在这一节中,将做一些完全不同的事情 - 将一个消息发送给多个消费者,这种模式称为 “发布 / 订阅”。

交换机的概念

RabbitMQ 消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息最终发送到了哪些队列中。相反,生产者只能将消息发送到交换机(Exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将消息放入队列。交换机必须确切知道如何处理接收到的消息,比如是应该把这些消息放到特定队列,还是说把放到多个队列中,还是说应该丢弃它们,这些都由交换机的类型来决定。如下图所示,其中 P 表示生产者,X 表示交换机,红色部分表示不同的队列。

交换机的类型

RabbitMQ 一共有以下几种交换机类型:

  • 扇形交换机(Fanout),消息会被广播到所有绑定的队列,不依赖 RoutingKey。
  • 直接交换机(Direct),消息根据 RoutingKey 精确匹配队列。
  • 主题交换机(Topic),消息根据 RoutingKey 模糊匹配队列(支持通配符匹配)。
  • 头交换机(Headers),根据消息 Header 属性匹配队列。

RabbitMQ 默认自带了一些交换机,如下图所示:

默认的交换机

在前面的教程中,即使对交换机(Exchange)一无所知,但仍然能够将消息发送到队列中,这是因为使用的是默认交换机(通过空字符串 "" 进行标识)。生产者发送消息到默认交换机的代码如下:

第一个参数是交换机的名称,空字符串则表示默认或者无名称交换机。消息能路由发送到队列中,其实是由 RoutingKey(BindingKey) 决定的,如果它存在的话。如下图所示:

特别注意

当使用默认交换机(通过空字符串 "" 进行标识),在生产者发送消息时,指定的 RoutingKey 其实就是队列名称,比如:channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));

什么是绑定

什么是绑定(Binding)呢?绑定其实是交换机(Exchange)和队列(Queue)之间的桥梁,它告诉交换机和哪个队列进行了绑定。比如,下面这张图表示交换机 X 与 队列 Q1 和队列 Q2 进行了绑定。

交换机的使用

Fanout 交换机

概念介绍

扇形交换机(Fanout)是 RabbitMQ 中的一种消息广播机制,它的主要特点是:将接收到的消息广播到所有与之绑定的队列中,并且完全忽略路由键(RoutingKey),如下图所示:

  • Fanout 交换机的核心特性

    • 向所有绑定的队列广播消息。
    • 路由键(RoutingKey)会被忽略,不参与队列匹配。
    • 队列只要绑定了该交换机,就能收到消息。
    • 可以实现发布 / 订阅模式,例如日志广播、群体通知。
  • Fanout 交换机的使用场景

    • 系统日志广播:希望系统中的每个服务实例都收到日志信息进行处理或存储。
    • 多人聊天室通知:用户 A 发送消息到聊天室时,所有加入到这个聊天室的客户端都要收到。
    • 新闻推送系统:一次推送,所有订阅了该类别的客户端都收到通知。

总结

扇形交换机(Fanout)就是 "不分青红皂白,全体发货" 的那种类型。只要排好队(将队列与交换机绑定),就能接收到广播消息,不管是否真的需要,也不关心 RoutingKey。

案例代码

本节将演示如何使用 Fanout 交换机,将一个消息同时发送给多个消费者,以此实现广播的效果。这里将构建一个简单的日志系统,它将由两个程序组成:第一个程序负责发送日志消息(生产者),第二个程序负责处理消息(消费者)。其中消费者可能会有多个,事实上第一个程序发送的日志消息将广播给所有消费者,如下图所示:

代码下载

本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-08

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
  • 工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtils {

public static ConnectionFactory connectionFactory;

static {
// 创建连接工厂
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.127");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
}

/**
* 创建信道
*/
public static Channel createChannel() throws Exception {
// 创建连接
Connection connection = connectionFactory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

return channel;
}

}
  • 生产者的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQProducer {

// 交换机名称
public static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false, false, null);

// 从控制台获取需要发送的消息
Scanner scanner = new Scanner(System.in);
// 循环发送消息
while (scanner.hasNext()) {
// 消息内容
String message = scanner.nextLine();
System.out.println("发送消息:" + message);

// 发送消息
// 参数说明:
// exchange – 要将消息发布到的交换机,空字符串表示默认交换机
// routingKey – 路由 Key
// props – 消息的其他属性,比如:使用 MessageProperties.PERSISTENT_TEXT_PLAIN 属性确保消息持久化,配合持久化队列可避免服务器重启导致消息丢失
// body – 消息内容
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
}

// 关闭信道
channel.close();
}

}
  • 消费者一的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQConsumer01 {

// 交换机名称
public static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false, false, null);

// 创建临时队列(也可以声明持久化队列)
String queueName = channel.queueDeclare().getQueue();

// 绑定交换机和队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey(如果不使用 Routingkey,可填空字符串)
channel.queueBind(queueName, EXCHANGE_NAME, "");

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);

// 模拟消息的耗时处理
try {
Thread.sleep(100);
System.out.println("Successed to consume message : " + msg);
} catch (Exception e) {
e.printStackTrace();
}

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者一等待接收消息...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(queueName, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
}

}
  • 消费者二的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQConsumer02 {

// 交换机名称
public static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false, false, null);

// 创建临时队列(也可以声明持久化队列)
String queueName = channel.queueDeclare().getQueue();

// 绑定交换机和队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey(如果不使用 Routingkey,可填空字符串)
channel.queueBind(queueName, EXCHANGE_NAME, "");

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);

// 模拟消息的耗时处理
try {
Thread.sleep(100);
System.out.println("Successed to consume message : " + msg);
} catch (Exception e) {
e.printStackTrace();
}

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者二等待接收消息...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(queueName, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
}

}
代码测试
  • (1) 分别启动生产者和两个消费者应用

  • (2) 在生产者的控制台中,依次输入以下内容:

1
2
3
4
AA
发送消息:AA
BB
发送消息:BB
  • (3) 在消费者一的控制台中,会输出以下内容
1
2
3
4
消费者一等待接收消息...
按回车键退出程序:
AA
BB
  • (4) 在消费者二的控制台中,会输出以下内容
1
2
3
4
消费者二等待接收消息...
按回车键退出程序:
AA
BB
  • (5) 观察消费者一和消费者二的控制台输出结果,可以发现两者都接收到相同的消息,也就是说明生产者是通过广播的方式将消息发送给所有消费者的。

Direct 交换机

概念介绍

直接交换机(Direct)是根据消息的路由键(RoutingKey)与队列绑定的绑定键(BindingKey)进行 “精确匹配”,然后将消息投递到对应的队列。如下图所示:

  • 路由键(RoutingKey):消息发送时指定的关键字。
  • 绑定键(BindingKey):队列绑定到交换机时指定的关键字。

(1) 在上面这张图中,可以看到交换机 X 绑定了两个队列,绑定类型是 direct。队列 Q1 的 BindingKey 为 orange,队列 Q2 的 BindingKey 有两个:一个 BindingKey 为 black, 另一个 BindingKey 为 green
(2) 在这种绑定情况下,生产者发布消息到交换机上,RoutingKey 为 orange 的消息会被发布到队列 Q1,而 RoutingKey 为 black 或者 green 的消息会被发布到队列 Q2,其他 RoutingKey 的消息将被丢弃。
(3) 当然,如果交换机的类型是 direct,但是它绑定的多个队列的 BindingKey 都相同(即多重绑定),在这种情况下虽然绑定类型是 direct,但是它实现的效果就和 Fanout 交换机一样了,跟广播消息差不多,如下图所示:

  • Direct 交换机的核心特性

    • 只有 RoutingKey 与绑定的 BindingKey 完全相等的队列才会收到消息。
    • RoutingKey 必须指定,用于精准匹配。
    • 一个 RoutingKey 可以绑定多个队列,实现 “多播” 的效果。
    • 适用于点对点通信、按类型精确分发、任务分配系统等。
    • 可以实现发布 / 订阅模式,能够根据不同的业务关注点(关键词)进行广播消息,比 Fanout 交换机更精细控制。
  • Direct 交换机的使用场景

    • 订单系统:根据订单状态(如 order.paidorder.shipped)将消息投递到不同的队列处理。
    • 日志分类收集:只收集 Error 日志的服务只绑定 Error 路由键。
    • 后台任务派发:将不同类型的任务分配给不同的工作线程队列。

什么是绑定

绑定(Bindings)是交换机和队列之间的桥梁。也可以这么理解:队列只对它绑定的交换机的消息感兴趣。绑定使用参数 RoutingKey 来表示,也可以称该参数为 BindingKey。创建绑定的代码为:channel.queueBind(queueName, exchangeName, "routingKey");,绑定之后的意义由交换机的类型决定。

案例代码

本节将演示如何使用 Direct 交换机,只让消费者订阅生产者发布的部分消息。例如,只把错误(Error)日志消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印警告(Warning)和信息(Info)日志消息(如下图所示)。由于 Fanout 这种交换机并不是很灵活 - 它只能进行无意识的广播,在这里将使用 Direct 交换机,这种交换机的工作方式是:消息只会发送到它绑定的 RoutingKey 队列中去。

代码下载

本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-09

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
  • 工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtils {

public static ConnectionFactory connectionFactory;

static {
// 创建连接工厂
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.127");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
}

/**
* 创建信道
*/
public static Channel createChannel() throws Exception {
// 创建连接
Connection connection = connectionFactory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

return channel;
}

}
  • 生产者的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class MQProducer {

// 交换机名称
public static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

Map<String, String> routingKeyMap = new HashMap<>();
routingKeyMap.put("debug", "调试 Debug 信息");
routingKeyMap.put("info", "普通 Info 信息");
routingKeyMap.put("warning", "警告 Warning 信息");
routingKeyMap.put("error", "错误 Error 信息");

// 发送消息
for (Map.Entry<String, String> bindingKeyEntry : routingKeyMap.entrySet()) {
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();

// 发送消息
// 参数说明:
// exchange – 要将消息发布到的交换机,空字符串表示默认交换机
// routingKey – 路由 Key
// props – 消息的其他属性,比如:使用 MessageProperties.PERSISTENT_TEXT_PLAIN 属性确保消息持久化,配合持久化队列可避免服务器重启导致消息丢失
// body – 消息内容
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
}

// 关闭信道
channel.close();
}

}
  • 消费者一的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQConsumer01 {

// 交换机名称
public static final String EXCHANGE_NAME = "logs";

// 队列名称
public static final String QUEUE_NAME = "disk";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 绑定交换机和队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);

// 模拟消息的耗时处理
try {
Thread.sleep(100);
System.out.println("RoutingKey : " + message.getEnvelope().getRoutingKey() + ", Message : " + msg);
} catch (Exception e) {
e.printStackTrace();
}

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者一等待接收消息...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
}

}
  • 消费者二的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQConsumer02 {

// 交换机名称
public static final String EXCHANGE_NAME = "logs";

// 队列名称
public static final String QUEUE_NAME = "console";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 绑定交换机和队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);

// 模拟消息的耗时处理
try {
Thread.sleep(100);
System.out.println("RoutingKey : " + message.getEnvelope().getRoutingKey() + ", Message : " + msg);
} catch (Exception e) {
e.printStackTrace();
}

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者二等待接收消息...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
}

}
代码测试
  • (1) 分别启动生产者和两个消费者应用

  • (2) 在消费者一的控制台中,会输出以下内容

1
2
3
消费者一等待接收消息...
按回车键退出程序:
RoutingKey : error, Message : 错误 Error 信息
  • (3) 在消费者二的控制台中,会输出以下内容
1
2
3
4
消费者二等待接收消息...
按回车键退出程序:
RoutingKey : warning, Message : 警告 Warning 信息
RoutingKey : info, Message : 普通 Info 信息

(4) 观察消费者一和消费者二的控制台输出结果,可以发现消费者一会接收到错误(Error)日志消息,而消费者二会接收到警告(Warning)和信息(Info)日志消息,这也说明消息只会发送到它绑定的 RoutingKey 队列中去。由于没有队列与 debug 绑定,因此调试(Debug)日志信息会丢失,即不会有消费者去消费对应的消息。

Topic 交换机

概念介绍

主题交换机(Topic)是一种功能强大的消息路由机制,允许根据消息的路由键(RoutingKey)来灵活地将消息分发给一个或多个队列。它非常适合用于发布 / 订阅模式,特别是在需要更复杂路由匹配规则(如支持通配符)的场景中。

  • 路由键(RoutingKey):消息发送时指定的关键字。
  • 绑定键(BindingKey):队列绑定到交换机时指定的关键字。

上图是两个队列的绑定关系图,它们之间的消息接收情况如下表所示:

消息的路由键(RoutingKey)消息的接收情况
quick.orange.rabbit被队列 Q1 和 队列 Q2 接收到
lazy.orange.elephant被队列 Q1 和 队列 Q2 接收到
quick.orange.fox被队列 Q1 接收到
lazy.brown.fox被队列 Q2 接收到
lazy.pink.rabbit虽然满足两个绑定,但只被队列 Q2 接收一次
quick.brown.fox不匹配任何绑定,不会被任何队列接收到,消息会被丢弃
quick.orange.male.rabbit包含四个单词,不匹配任何绑定,不会被任何队列接收到,消息会被丢弃
lazy.orange.male.rabbit包含四个单词,被队列 Q2 接收到

特别注意

  • 当队列的绑定关系是下列这些情况时,需要引起注意:
  • (1) 当一个队列的绑定键(BindingKey)是 #,那么这个队列将会接收所有消息,最终的实现效果就是 Fanout 交换机。
  • (2) 在队列的绑定键(BindingKey)中,如果没有出现 #* 通配符,那么该队列的绑定类型就是 direct,最终的实现效果就是 Direct 交换机。
  • Topic 交换机的核心特性

    • 支持模式匹配的路由键(RoutingKey)
      • 路由键(RoutingKey)不能随意写,必须是一个单词列表(最多不能超过 255 个字节),多个单词之间以 “英文点号” 分隔开
      • 这些单词可以是任意单词,例如:user.createorder.created
    • 支持使用通配符进行绑定键(BindingKey)的灵活匹配
      • 绑定键(BindingKey)可以使用通配符,例如:user.*user.#
    • 支持一条消息投递到多个队列
      • 如果多个队列绑定的绑定键(BindingKey)都匹配到某个消息的路由键(RoutingKey),那么这条消息会被复制并同时发送到多个队列中
    • 可以实现发布 / 订阅模式
      • 能够根据不同的业务关注点(关键词)进行主题式广播,比 Fanout 交换机更精细控制
    • 绑定键(BindingKey)灵活多样
      • 每个队列可以根据自己需求,绑定不同的绑定键(BindingKey),灵活配置接收哪些消息
    • 适合结构化的消息路由
      • 通常使用在对路由键(RoutingKey)有一定层级结构的场景,如日志系统、通知系统、事件总线等
    • 兼容 Direct 交换机(可当作支持通配符的 Direct 交换机来使用)
      • 当在绑定键(BindingKey)中不使用任何通配符时,Topic 交换机的行为与 Direct 交换机一致
  • Topic 交换机的通配符规则

    • Topic 交换机使用 “英文点号” 分隔的字符串作为路由键(RoutingKey)
      • 例如:user.loginorder.created
    • 绑定键(BindingKey)支持使用通配符进行匹配:
      • 支持两个通配符:
        • *:匹配一个单词
        • #:匹配零个或多个单词
      • 通配符匹配示例:
        • 路由键:user.create
        • 绑定键:user.* → 匹配成功
        • 绑定键:user.# → 匹配成功
        • 绑定键:*.create → 匹配成功
        • 绑定键:order.* → 不匹配
  • Topic 交换机的通配符使用

    • 日志系统
      • 路由键:log.error.server1
      • 绑定键:
        • log.error.* → 接收所有错误日志
        • log.# → 接收所有日志
    • 订单系统
      • 路由键:order.createdorder.paid
      • 绑定键:
        • order.* → 接收所有订单相关消息
        • order.paid → 只接收已付款的订单
  • Topic 交换机的使用场景

    • 微服务间的事件通信
    • 日志系统(比如按模块、级别、来源过滤)
    • 多个消费者根据不同规则订阅同一类消息
    • 实现类似 “标签”、” 主题订阅” 机制
案例代码

在上面的 Direct 交换机使用案例 中,改进了日志存储系统,没有使用只能进行无意识广播的 Fanout 交换机,而是使用了 Direct 交换机,从而让消费者可以有选择性地接收不同类型的日志消息。但是,Direct 交换机仍然存在局限性 - 比如说我们想接收的日志类型有 info.baseinfo.advantage,某个队列只想接收 info.base 类型的消息,那这个时候 Direct 交换机就无法实现了。这个时候就只能使用 Topic 交换机,下面将演示如何使用 Topic 交换机来实现该功能,如下图所示:

代码下载

本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-10

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
  • 工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtils {

public static ConnectionFactory connectionFactory;

static {
// 创建连接工厂
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.127");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
}

/**
* 创建信道
*/
public static Channel createChannel() throws Exception {
// 创建连接
Connection connection = connectionFactory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

return channel;
}

}
  • 消费者的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class MQProducer {

// 交换机名称
public static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null);

Map<String, String> routingKeyMap = new HashMap<>();
routingKeyMap.put("quick.orange.rabbit", "被队列 Q1 和 队列 Q2 接收到");
routingKeyMap.put("lazy.orange.elephant", "被队列 Q1 和 队列 Q2 接收到");
routingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
routingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
routingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定,但只被队列 Q2 接收一次");
routingKeyMap.put("quick.brown.fox", "不匹配任何绑定,不会被任何队列接收到,消息会被丢弃");
routingKeyMap.put("quick.orange.male.rabbit", "包含四个单词,不匹配任何绑定,消息会被丢弃");
routingKeyMap.put("lazy.orange.male.rabbit", "包含四个单词,被队列 Q2 接收到");

// 发送消息
for (Map.Entry<String, String> bindingKeyEntry : routingKeyMap.entrySet()) {
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();

// 发送消息
// 参数说明:
// exchange – 要将消息发布到的交换机,空字符串表示默认交换机
// routingKey – 路由 Key
// props – 消息的其他属性,比如:使用 MessageProperties.PERSISTENT_TEXT_PLAIN 属性确保消息持久化,配合持久化队列可避免服务器重启导致消息丢失
// body – 消息内容
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
}

// 关闭通道
channel.close();

// 关闭连接
channel.getConnection().close();
}

}
  • 消费者一的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQConsumer01 {

// 交换机名称
public static final String EXCHANGE_NAME = "logs";

// 队列名称
public static final String QUEUE_NAME = "Q1";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null);

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 绑定交换机和队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);

// 模拟消息的耗时处理
try {
Thread.sleep(100);
System.out.println("RoutingKey : " + message.getEnvelope().getRoutingKey() + ", Message : " + msg);
} catch (Exception e) {
e.printStackTrace();
}

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者一等待接收消息...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
}

}
  • 消费者二的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQConsumer02 {

// 交换机名称
public static final String EXCHANGE_NAME = "logs";

// 队列名称
public static final String QUEUE_NAME = "Q2";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null);

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 绑定交换机和队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);

// 模拟消息的耗时处理
try {
Thread.sleep(100);
System.out.println("RoutingKey : " + message.getEnvelope().getRoutingKey() + ", Message : " + msg);
} catch (Exception e) {
e.printStackTrace();
}

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者二等待接收消息...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
}

}
代码测试

(1) 分别启动生产者和两个消费者应用

(2) 在消费者一的控制台中,会输出以下内容

1
2
3
4
5
消费者一等待接收消息...
按回车键退出程序:
RoutingKey : lazy.orange.elephant, Message : 被队列 Q1 和 队列 Q2 接收到
RoutingKey : quick.orange.rabbit, Message : 被队列 Q1 和 队列 Q2 接收到
RoutingKey : quick.orange.fox, Message : 被队列 Q1 接收到

(3) 在消费者二的控制台中,会输出以下内容

1
2
3
4
5
6
7
消费者二等待接收消息...
按回车键退出程序:
RoutingKey : lazy.orange.elephant, Message : 被队列 Q1 和 队列 Q2 接收到
RoutingKey : quick.orange.rabbit, Message : 被队列 Q1 和 队列 Q2 接收到
RoutingKey : lazy.brown.fox, Message : 被队列 Q2 接收到
RoutingKey : lazy.pink.rabbit, Message : 虽然满足两个绑定,但只被队列 Q2 接收一次
RoutingKey : lazy.orange.male.rabbit, Message : 包含四个单词,被队列 Q2 接收到

(4) 观察消费者一和消费者二的控制台输出结果,可以发现消费者一和消费者二会接收到相同或不同的消息,这也说明绑定键(BindingKey)的通配符规则生效了。