大纲 前言 学习资源 版本说明 本文所有案例代码使用的各软件版本如下表所示:
组件 版本 说明 RabbitMQ Server 3.8.26
RabbitMQ Client 5.10.0
Erlang 24.2
Java 11
RabbitMQ 交换机 在前面的教程中,创建了一个工作队列,而且假设的是工作队列背后,每个消息都恰好交付给一个消费者(工作进程)来处理。在这一节中,将做一些完全不同的事情 - 将一个消息发送给多个消费者,这种模式称为 “发布 / 订阅”。
交换机的概念 RabbitMQ 消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。 实际上,通常生产者甚至都不知道这些消息最终发送到了哪些队列中。相反,生产者只能将消息发送到交换机(Exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将消息放入队列。交换机必须确切知道如何处理接收到的消息,比如是应该把这些消息放到特定队列,还是说把放到多个队列中,还是说应该丢弃它们,这些都由交换机的类型来决定。如下图所示,其中 P
表示生产者,X
表示交换机,红色部分表示不同的队列。
交换机的类型 RabbitMQ 一共有以下几种交换机类型:
扇形交换机(Fanout),消息会被广播到所有绑定的队列,不依赖 RoutingKey。 直接交换机(Direct),消息根据 RoutingKey 精确匹配队列。 主题交换机(Topic),消息根据 RoutingKey 模糊匹配队列(支持通配符匹配)。 头交换机(Headers),根据消息 Header 属性匹配队列。 RabbitMQ 默认自带了一些交换机,如下图所示:
默认的交换机 在前面的教程中,即使对交换机(Exchange)一无所知,但仍然能够将消息发送到队列中,这是因为使用的是默认交换机(通过空字符串 ""
进行标识)。生产者发送消息到默认交换机的代码如下:
第一个参数是交换机的名称,空字符串则表示默认或者无名称交换机。消息能路由发送到队列中,其实是由 RoutingKey(BindingKey)
决定的,如果它存在的话。如下图所示:
特别注意
当使用默认交换机(通过空字符串 ""
进行标识),在生产者发送消息时,指定的 RoutingKey
其实就是队列名称,比如:channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
什么是绑定 什么是绑定(Binding)呢?绑定其实是交换机(Exchange)和队列(Queue)之间的桥梁,它告诉交换机和哪个队列进行了绑定。比如,下面这张图表示交换机 X 与 队列 Q1 和队列 Q2 进行了绑定。
交换机的使用 Fanout 交换机 概念介绍 扇形交换机(Fanout)是 RabbitMQ 中的一种消息广播机制,它的主要特点是:将接收到的消息广播到所有与之绑定的队列中,并且完全忽略路由键(RoutingKey), 如下图所示:
Fanout 交换机的核心特性
向所有绑定的队列广播消息。 路由键(RoutingKey)会被忽略,不参与队列匹配。 队列只要绑定了该交换机,就能收到消息。 可以实现发布 / 订阅模式,例如日志广播、群体通知。 Fanout 交换机的使用场景
系统日志广播:希望系统中的每个服务实例都收到日志信息进行处理或存储。 多人聊天室通知:用户 A 发送消息到聊天室时,所有加入到这个聊天室的客户端都要收到。 新闻推送系统:一次推送,所有订阅了该类别的客户端都收到通知。 总结
扇形交换机(Fanout)就是 "不分青红皂白,全体发货" 的那种类型。只要排好队(将队列与交换机绑定),就能接收到广播消息,不管是否真的需要,也不关心 RoutingKey。
案例代码 本节将演示如何使用 Fanout 交换机,将一个消息同时发送给多个消费者,以此实现广播的效果。这里将构建一个简单的日志系统,它将由两个程序组成:第一个程序负责发送日志消息(生产者),第二个程序负责处理消息(消费者)。其中消费者可能会有多个,事实上第一个程序发送的日志消息将广播给所有消费者,如下图所示:
代码下载
本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-08
。
1 2 3 4 5 <dependency > <groupId > com.rabbitmq</groupId > <artifactId > amqp-client</artifactId > <version > 5.10.0</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class RabbitMQUtils { public static ConnectionFactory connectionFactory; static { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.2.127" ); connectionFactory.setPort(5672 ); connectionFactory.setUsername("admin" ); connectionFactory.setPassword("admin" ); } public static Channel createChannel () throws Exception { Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import com.clay.rabbitmq.utils.RabbitMQUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import java.nio.charset.StandardCharsets;import java.util.Scanner;public class MQProducer { public static final String EXCHANGE_NAME = "logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false , false , null ); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.nextLine(); System.out.println("发送消息:" + message); channel.basicPublish(EXCHANGE_NAME, "" , null , message.getBytes(StandardCharsets.UTF_8)); } channel.close(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 import com.clay.rabbitmq.utils.RabbitMQUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;import java.util.Scanner;public class MQConsumer01 { public static final String EXCHANGE_NAME = "logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false , false , null ); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "" ); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); try { Thread.sleep(100 ); System.out.println("Successed to consume message : " + msg); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); }; System.out.println("消费者一等待接收消息..." ); boolean autoAck = false ; channel.basicConsume(queueName, autoAck, deliverCallback, cancelCallback); System.out.println("按回车键退出程序:" ); new Scanner(System.in).nextLine(); channel.close(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 import com.clay.rabbitmq.utils.RabbitMQUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;import java.util.Scanner;public class MQConsumer02 { public static final String EXCHANGE_NAME = "logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false , false , null ); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "" ); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); try { Thread.sleep(100 ); System.out.println("Successed to consume message : " + msg); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); }; System.out.println("消费者二等待接收消息..." ); boolean autoAck = false ; channel.basicConsume(queueName, autoAck, deliverCallback, cancelCallback); System.out.println("按回车键退出程序:" ); new Scanner(System.in).nextLine(); channel.close(); } }
代码测试 (1) 分别启动生产者和两个消费者应用
(2) 在生产者的控制台中,依次输入以下内容:
1 2 3 4 消费者一等待接收消息... 按回车键退出程序: AA BB
1 2 3 4 消费者二等待接收消息... 按回车键退出程序: AA BB
(5) 观察消费者一和消费者二的控制台输出结果,可以发现两者都接收到相同的消息,也就是说明生产者是通过广播的方式将消息发送给所有消费者的。 Direct 交换机 概念介绍 直接交换机(Direct)是根据消息的路由键(RoutingKey)与队列绑定的绑定键(BindingKey)进行 “精确匹配”,然后将消息投递到对应的队列。如下图所示:
路由键(RoutingKey):消息发送时指定的关键字。 绑定键(BindingKey):队列绑定到交换机时指定的关键字。
(1) 在上面这张图中,可以看到交换机 X 绑定了两个队列,绑定类型是 direct
。队列 Q1 的 BindingKey 为 orange
,队列 Q2 的 BindingKey 有两个:一个 BindingKey 为 black
, 另一个 BindingKey 为 green
。 (2) 在这种绑定情况下,生产者发布消息到交换机上,RoutingKey 为 orange
的消息会被发布到队列 Q1,而 RoutingKey 为 black
或者 green
的消息会被发布到队列 Q2,其他 RoutingKey 的消息将被丢弃。(3) 当然,如果交换机的类型是 direct
,但是它绑定的多个队列的 BindingKey 都相同(即多重绑定),在这种情况下虽然绑定类型是 direct
,但是它实现的效果就和 Fanout 交换机一样了,跟广播消息差不多,如下图所示:
Direct 交换机的核心特性
只有 RoutingKey 与绑定的 BindingKey 完全相等的队列才会收到消息。 RoutingKey 必须指定,用于精准匹配。 一个 RoutingKey 可以绑定多个队列,实现 “多播” 的效果。 适用于点对点通信、按类型精确分发、任务分配系统等。 可以实现发布 / 订阅模式,能够根据不同的业务关注点(关键词)进行广播消息,比 Fanout 交换机更精细控制。 Direct 交换机的使用场景
订单系统:根据订单状态(如 order.paid
、order.shipped
)将消息投递到不同的队列处理。 日志分类收集:只收集 Error 日志的服务只绑定 Error 路由键。 后台任务派发:将不同类型的任务分配给不同的工作线程队列。 什么是绑定
绑定(Bindings)是交换机和队列之间的桥梁。也可以这么理解:队列只对它绑定的交换机的消息感兴趣。绑定使用参数 RoutingKey 来表示,也可以称该参数为 BindingKey 。创建绑定的代码为:channel.queueBind(queueName, exchangeName, "routingKey");
,绑定之后的意义由交换机的类型决定。
案例代码 本节将演示如何使用 Direct 交换机,只让消费者订阅生产者发布的部分消息。例如,只把错误(Error)日志消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印警告(Warning)和信息(Info)日志消息(如下图所示)。由于 Fanout 这种交换机并不是很灵活 - 它只能进行无意识的广播,在这里将使用 Direct 交换机,这种交换机的工作方式是:消息只会发送到它绑定的 RoutingKey 队列中去。
代码下载
本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-09
。
1 2 3 4 5 <dependency > <groupId > com.rabbitmq</groupId > <artifactId > amqp-client</artifactId > <version > 5.10.0</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class RabbitMQUtils { public static ConnectionFactory connectionFactory; static { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.2.127" ); connectionFactory.setPort(5672 ); connectionFactory.setUsername("admin" ); connectionFactory.setPassword("admin" ); } public static Channel createChannel () throws Exception { Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import com.clay.rabbitmq.utils.RabbitMQUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;import java.util.HashMap;import java.util.Map;public class MQProducer { public static final String EXCHANGE_NAME = "logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true , false , null ); Map<String, String> routingKeyMap = new HashMap<>(); routingKeyMap.put("debug" , "调试 Debug 信息" ); routingKeyMap.put("info" , "普通 Info 信息" ); routingKeyMap.put("warning" , "警告 Warning 信息" ); routingKeyMap.put("error" , "错误 Error 信息" ); for (Map.Entry<String, String> bindingKeyEntry : routingKeyMap.entrySet()) { String routingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); } channel.close(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 import com.clay.rabbitmq.utils.RabbitMQUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;import java.util.Scanner;public class MQConsumer01 { public static final String EXCHANGE_NAME = "logs" ; public static final String QUEUE_NAME = "disk" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true , false , null ); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error" ); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); try { Thread.sleep(100 ); System.out.println("RoutingKey : " + message.getEnvelope().getRoutingKey() + ", Message : " + msg); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); }; System.out.println("消费者一等待接收消息..." ); boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback); System.out.println("按回车键退出程序:" ); new Scanner(System.in).nextLine(); channel.close(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 import com.clay.rabbitmq.utils.RabbitMQUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;import java.util.Scanner;public class MQConsumer02 { public static final String EXCHANGE_NAME = "logs" ; public static final String QUEUE_NAME = "console" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true , false , null ); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info" ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning" ); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); try { Thread.sleep(100 ); System.out.println("RoutingKey : " + message.getEnvelope().getRoutingKey() + ", Message : " + msg); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); }; System.out.println("消费者二等待接收消息..." ); boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback); System.out.println("按回车键退出程序:" ); new Scanner(System.in).nextLine(); channel.close(); } }
代码测试 (1) 分别启动生产者和两个消费者应用
(2) 在消费者一的控制台中,会输出以下内容
1 2 3 消费者一等待接收消息... 按回车键退出程序: RoutingKey : error, Message : 错误 Error 信息
1 2 3 4 消费者二等待接收消息... 按回车键退出程序: RoutingKey : warning, Message : 警告 Warning 信息 RoutingKey : info, Message : 普通 Info 信息
(4) 观察消费者一和消费者二的控制台输出结果,可以发现消费者一会接收到错误(Error)日志消息,而消费者二会接收到警告(Warning)和信息(Info)日志消息,这也说明消息只会发送到它绑定的 RoutingKey 队列中去。由于没有队列与 debug
绑定,因此调试(Debug)日志信息会丢失,即不会有消费者去消费对应的消息。
Topic 交换机 概念介绍 主题交换机(Topic)是一种功能强大的消息路由机制,允许根据消息的路由键(RoutingKey)来灵活地将消息分发给一个或多个队列。它非常适合用于发布 / 订阅模式,特别是在需要更复杂路由匹配规则(如支持通配符)的场景中。
路由键(RoutingKey):消息发送时指定的关键字。 绑定键(BindingKey):队列绑定到交换机时指定的关键字。
上图是两个队列的绑定关系图,它们之间的消息接收情况如下表所示:
消息的路由键(RoutingKey) 消息的接收情况 quick.orange.rabbit
被队列 Q1 和 队列 Q2 接收到 lazy.orange.elephant
被队列 Q1 和 队列 Q2 接收到 quick.orange.fox
被队列 Q1 接收到 lazy.brown.fox
被队列 Q2 接收到 lazy.pink.rabbit
虽然满足两个绑定,但只被队列 Q2 接收一次 quick.brown.fox
不匹配任何绑定,不会被任何队列接收到,消息会被丢弃 quick.orange.male.rabbit
包含四个单词,不匹配任何绑定,不会被任何队列接收到,消息会被丢弃 lazy.orange.male.rabbit
包含四个单词,被队列 Q2 接收到
特别注意
当队列的绑定关系是下列这些情况时,需要引起注意: (1) 当一个队列的绑定键(BindingKey)是 #
,那么这个队列将会接收所有消息,最终的实现效果就是 Fanout 交换机。 (2) 在队列的绑定键(BindingKey)中,如果没有出现 #
和 *
通配符,那么该队列的绑定类型就是 direct
,最终的实现效果就是 Direct 交换机。 Topic 交换机的核心特性
支持模式匹配的路由键(RoutingKey)路由键(RoutingKey)不能随意写,必须是一个单词列表(最多不能超过 255 个字节),多个单词之间以 “英文点号” 分隔开 这些单词可以是任意单词,例如:user.create
、order.created
支持使用通配符进行绑定键(BindingKey)的灵活匹配绑定键(BindingKey)可以使用通配符,例如:user.*
、user.#
支持一条消息投递到多个队列如果多个队列绑定的绑定键(BindingKey)都匹配到某个消息的路由键(RoutingKey),那么这条消息会被复制并同时发送到多个队列中 可以实现发布 / 订阅模式能够根据不同的业务关注点(关键词)进行主题式广播,比 Fanout 交换机更精细控制 绑定键(BindingKey)灵活多样每个队列可以根据自己需求,绑定不同的绑定键(BindingKey),灵活配置接收哪些消息 适合结构化的消息路由通常使用在对路由键(RoutingKey)有一定层级结构的场景,如日志系统、通知系统、事件总线等 兼容 Direct 交换机(可当作支持通配符的 Direct 交换机来使用)当在绑定键(BindingKey)中不使用任何通配符时,Topic 交换机的行为与 Direct 交换机一致 Topic 交换机的通配符规则
Topic 交换机使用 “英文点号” 分隔的字符串作为路由键(RoutingKey)例如:user.login
、order.created
绑定键(BindingKey)支持使用通配符进行匹配:支持两个通配符: 通配符匹配示例:路由键:user.create
绑定键:user.*
→ 匹配成功 绑定键:user.#
→ 匹配成功 绑定键:*.create
→ 匹配成功 绑定键:order.*
→ 不匹配 Topic 交换机的通配符使用
日志系统路由键:log.error.server1
绑定键:log.error.*
→ 接收所有错误日志log.#
→ 接收所有日志 订单系统路由键:order.created
、order.paid
绑定键:order.*
→ 接收所有订单相关消息order.paid
→ 只接收已付款的订单 Topic 交换机的使用场景
微服务间的事件通信 日志系统(比如按模块、级别、来源过滤) 多个消费者根据不同规则订阅同一类消息 实现类似 “标签”、” 主题订阅” 机制 案例代码 在上面的 Direct 交换机使用案例 中,改进了日志存储系统,没有使用只能进行无意识广播的 Fanout 交换机,而是使用了 Direct 交换机,从而让消费者可以有选择性地接收不同类型的日志消息。但是,Direct 交换机仍然存在局限性 - 比如说我们想接收的日志类型有 info.base
和 info.advantage
,某个队列只想接收 info.base
类型的消息,那这个时候 Direct 交换机就无法实现了。这个时候就只能使用 Topic 交换机,下面将演示如何使用 Topic 交换机来实现该功能,如下图所示:
代码下载
本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-10
。
1 2 3 4 5 <dependency > <groupId > com.rabbitmq</groupId > <artifactId > amqp-client</artifactId > <version > 5.10.0</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class RabbitMQUtils { public static ConnectionFactory connectionFactory; static { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.2.127" ); connectionFactory.setPort(5672 ); connectionFactory.setUsername("admin" ); connectionFactory.setPassword("admin" ); } public static Channel createChannel () throws Exception { Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import com.clay.rabbitmq.utils.RabbitMQUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;import java.util.HashMap;import java.util.Map;public class MQProducer { public static final String EXCHANGE_NAME = "logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true , false , null ); Map<String, String> routingKeyMap = new HashMap<>(); routingKeyMap.put("quick.orange.rabbit" , "被队列 Q1 和 队列 Q2 接收到" ); routingKeyMap.put("lazy.orange.elephant" , "被队列 Q1 和 队列 Q2 接收到" ); routingKeyMap.put("quick.orange.fox" , "被队列 Q1 接收到" ); routingKeyMap.put("lazy.brown.fox" , "被队列 Q2 接收到" ); routingKeyMap.put("lazy.pink.rabbit" , "虽然满足两个绑定,但只被队列 Q2 接收一次" ); routingKeyMap.put("quick.brown.fox" , "不匹配任何绑定,不会被任何队列接收到,消息会被丢弃" ); routingKeyMap.put("quick.orange.male.rabbit" , "包含四个单词,不匹配任何绑定,消息会被丢弃" ); routingKeyMap.put("lazy.orange.male.rabbit" , "包含四个单词,被队列 Q2 接收到" ); for (Map.Entry<String, String> bindingKeyEntry : routingKeyMap.entrySet()) { String routingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); } channel.close(); channel.getConnection().close(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 import com.clay.rabbitmq.utils.RabbitMQUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;import java.util.Scanner;public class MQConsumer01 { public static final String EXCHANGE_NAME = "logs" ; public static final String QUEUE_NAME = "Q1" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true , false , null ); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*" ); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); try { Thread.sleep(100 ); System.out.println("RoutingKey : " + message.getEnvelope().getRoutingKey() + ", Message : " + msg); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); }; System.out.println("消费者一等待接收消息..." ); boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback); System.out.println("按回车键退出程序:" ); new Scanner(System.in).nextLine(); channel.close(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 import com.clay.rabbitmq.utils.RabbitMQUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;import java.util.Scanner;public class MQConsumer02 { public static final String EXCHANGE_NAME = "logs" ; public static final String QUEUE_NAME = "Q2" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true , false , null ); channel.queueDeclare(QUEUE_NAME, true , false , false , null ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit" ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#" ); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); try { Thread.sleep(100 ); System.out.println("RoutingKey : " + message.getEnvelope().getRoutingKey() + ", Message : " + msg); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); }; System.out.println("消费者二等待接收消息..." ); boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback); System.out.println("按回车键退出程序:" ); new Scanner(System.in).nextLine(); channel.close(); } }
代码测试 (1) 分别启动生产者和两个消费者应用
(2) 在消费者一的控制台中,会输出以下内容
1 2 3 4 5 消费者一等待接收消息... 按回车键退出程序: RoutingKey : lazy.orange.elephant, Message : 被队列 Q1 和 队列 Q2 接收到 RoutingKey : quick.orange.rabbit, Message : 被队列 Q1 和 队列 Q2 接收到 RoutingKey : quick.orange.fox, Message : 被队列 Q1 接收到
(3) 在消费者二的控制台中,会输出以下内容
1 2 3 4 5 6 7 消费者二等待接收消息... 按回车键退出程序: RoutingKey : lazy.orange.elephant, Message : 被队列 Q1 和 队列 Q2 接收到 RoutingKey : quick.orange.rabbit, Message : 被队列 Q1 和 队列 Q2 接收到 RoutingKey : lazy.brown.fox, Message : 被队列 Q2 接收到 RoutingKey : lazy.pink.rabbit, Message : 虽然满足两个绑定,但只被队列 Q2 接收一次 RoutingKey : lazy.orange.male.rabbit, Message : 包含四个单词,被队列 Q2 接收到
(4) 观察消费者一和消费者二的控制台输出结果,可以发现消费者一和消费者二会接收到相同或不同的消息,这也说明绑定键(BindingKey)的通配符规则生效了。