大纲
前言
学习资源
版本说明
本文所有案例代码使用的各软件版本如下表所示:
组件 | 版本 | 说明 |
---|
RabbitMQ Server | 3.8.26 | |
RabbitMQ Client | 5.10.0 | |
Erlang | 24.2 | |
Java | 11 | |
RabbitMQ 使用案例
工作模式 | 别名 | 说明 | 对应的交换机 |
---|
简单模式 | Simple Queue 模式 | 只有一个生产者和一个消费者 | 默认交换机(使用空字符串 “” 进行标识) |
工作队列模式 | Work Queue 模式 | 一个生产者,多个消费者,同一条消息只会被一个消费者成功消费 | 默认交换机(使用空字符串 “” 进行标识) |
发布 / 订阅模式 | Fanout 模式 | 一个生产者发送的消息会被多个消费者获取,不依赖路由键(RoutingKey) | 扇形交换机(Fanout) |
路由模式 | Direct 模式 | 发送消息到交换机,并且要指定路由键(RoutingKey),消费者在将队列绑定到交换机时需要指定路由键(RoutingKey) | 直接交换机(Direct) |
主题模式 | Topic 模式 | 根据主题进行匹配,此时队列需要绑定在一个模式上,* 代表一个单词,# 代表零个或多个单词 | 主题交换机(Topic) |
RPC 模式 | | 使用 MQ 可以实现 RPC 的异步调用 | 直接交换机(Direct) |
发布者确认模式 | | RabbitMQ 确保消息可靠投递的机制 | |
简单队列模式
概念介绍
- 这种模式只有一个生产者、一个队列、一个消费者,也被称为 “简单队列模式”。
- 消息产生者将消息放入队列,消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被消费后,会自动从队列中删除。
- 存在隐患,比如:消息可能没有被消费者正确处理,但在队列中已经被删除了,造成消息的丢失。
- 应用场景:聊天(中间需要有一个过度的服务器 - P 端 与 C 端)。
![]()
案例代码
代码下载
本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-01
。
1 2 3 4 5
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import java.nio.charset.StandardCharsets;
public class MQProducer {
public static final String QUEUE_NAME = "test";
public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.2.127"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin");
try ( Connection connection = factory.newConnection(); Channel channel = connection.createChannel() ) { channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "hello".getBytes(StandardCharsets.UTF_8)); } }
}
|
独占队列的使用说明
- 当队列被声明为
exclusive
时,该队列仅对首次声明它的连接可见,其他连接(无论是同一客户端还是其他客户端)都无法访问该队列,并且当连接关闭时队列会被自动删除(即使设置了 durable=true
)。 - 这对于临时队列很有用,比如用于临时性任务(如 RPC 响应队列)。但是,如果是长期使用的队列,设置为
exclusive
会导致其他客户端无法访问。 - 如果队列被声明为
exclusive
,当其他连接尝试声明同名队列,并向其发送 / 消费消息,RabbitMQ 会返回错误。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; import java.util.Scanner;
public class MQConsumer {
public static final String QUEUE_NAME = "test";
public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.2.127"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin");
Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("Successed to consume message : " + msg); };
CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); };
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
System.out.println("按回车键退出程序:"); new Scanner(System.in).nextLine();
channel.close(); connection.close(); }
}
|
代码测试
(1) 分别启动消费者和生产者应用
(2) 在消费者的控制台中,会输出以下内容:
工作队列模式
概念介绍
- 这种模式(竞争资源 - 在工作进程之间分发任务)有一个生产者、一个队列、多个消费者,同一条消息只会被一个消费者成功消费。
- 消息生产者将消息放入队列,消费者可以有多个。比如:消费者 1 与 消费者 2 同时监听同一个队列,消息被消费时,两个消费者共同争抢当前的消息队列内容,谁先抢到谁负责消费消息。
- 存在隐患:高并发情况下,可能会发生某一个消息被多个消费者共同消费。
- 应用场景:抢红包、大型项目中的资源调度(任务分配系统不需知道哪一个任务执行系统处于空闲状态,直接将任务放进到消息队列中,空闲的任务执行系统自动争抢任务)。
![]()
提示
工作队列(又称任务队列)的主要思想是避免资源密集型任务立即执行后,而不得不等待它完成的场景。相反,我们安排任务在之后执行,可以将任务封装为消息并将其发送到队列,在后台运行的工作线程将获取任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务(如下图所示),类似并行计算。
![]()
案例代码
本节将启动两个接收消息的线程(消费者)和一个发送消息的线程(生产者),以此验证两个工作线程是如何工作的(默认是由 RabbitMQ 轮询分发消息)。值得一提的是,下述代码本质上跟上面的 简单队列模式的案例代码 并没有根本区别,只是增加了一个消费者而已。
代码下载
本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-02
。
1 2 3 4 5
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
public static ConnectionFactory connectionFactory;
static { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.2.127"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); }
public static Channel createChannel() throws Exception { Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
return channel; }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import com.clay.rabbitmq.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import java.nio.charset.StandardCharsets; import java.util.Scanner;
public class MQProducer {
public static final String QUEUE_NAME = "test";
public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.nextLine(); System.out.println("发送消息:" + message);
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); }
channel.close(); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| import com.clay.rabbitmq.utils.RabbitMQUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; import java.util.Scanner;
public class MQConsumer01 {
public static final String QUEUE_NAME = "test";
public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("Successed to consume message : " + msg); };
CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); };
System.out.println("消费者一等待接收消息...");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
System.out.println("按回车键退出程序:"); new Scanner(System.in).nextLine();
channel.close(); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| import com.clay.rabbitmq.utils.RabbitMQUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; import java.util.Scanner;
public class MQConsumer02 {
public static final String QUEUE_NAME = "test";
public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("Successed to consume message : " + msg); };
CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); };
System.out.println("消费者二等待接收消息...");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
System.out.println("按回车键退出程序:"); new Scanner(System.in).nextLine();
channel.close(); }
}
|
代码测试
(1) 分别启动生产者和两个消费者应用
(2) 在生产者的控制台中,依次输入以下内容:
1 2 3 4 5 6 7 8
| AA 发送消息:AA BB 发送消息:BB CC 发送消息:CC DD 发送消息:DD
|
1 2 3 4
| 消费者一等待接收消息... 按回车键退出程序: Successed to consume message : AA Successed to consume message : CC
|
1 2 3 4
| 消费者二等待接收消息... 按回车键退出程序: Successed to consume message : BB Successed to consume message : DD
|
- (5) 观察上面两个消费者的控制台输出结果,可以发现消息默认是由 RabbitMQ 轮询分发的,而且同一条消息只会被一个消费者消费到,不会出现重复消费的情况
发布 / 订阅模式
概念介绍
- 这种模式(共享资源 - 将消息同时发送给多个消费者)有一个生产者、一个交换机、多个队列、多个消费者,也被称为 “交换机模式”。
- 每个消费队列中的消息内容都一致,且每个消费者都会从自己的消息队列的第一个消息开始消费,直到最后一个消息。
- 交换机是 RabbitMQ 中的内部组件。消息生产者将消息发送给 RabbitMQ 后,RabbitMQ 会根据订阅的消费者个数,自动生成对应数量的消息队列,这样每个消费者都能获取生产者发送的全部消息。
- 实际上,前两种模式也使用了交换机,只是使用了采用默认设置的交换机。交换机参数是可以配置的,如果消息配置的交换机参数和 RabbitMQ 队列绑定(Binding)的交换机名称相同,则转发消息,否则丢弃消息。
- 存在隐患:一旦消费者断开与 RabbitMQ 的连接,队列就会消失。如果消费者数目很多,对于 RabbitMQ 而言,也是个重大负担;因为发布 / 订阅模式是个长连接,占用并发数,且每个消费者分配一个队列会占用大量资源。
- 应用场景:邮件群发、群聊、广播通信。
![]()
案例代码
路由模式
概念介绍
- 这种模式(选择性地接收消息)有一个生产者、一个交换机、多个队列、多个消息消费者,也被称为 “Routing 转发模式”。
- 一个交换机绑定多个消息队列,每个消息队列都有自己唯一的 RoutingKey,每一个消息队列都被一个消费者监听。
- 消息生产者将消息发送给交换机,交换机按照路由的 Key 进行判断,将消息推送到与之绑定的队列。即交换机根据路由的 Key,匹配上路由 Key 对应的消息队列,最后由监听队列的消费者消费消息。
![]()
案例代码
主题模式
概念介绍
- 这种模式(基于 Topic 接收消息)有一个生产者、一个交换机、多个队列、多个消息消费者,也被称为 “主题转发模式” 或者 “模糊匹配队列模式”,本质上是路由模式的一种(只是在路由功能的基础上增加了模糊匹配)。
- 一个交换机绑定多个消息队列,每个消息队列都有自己唯一的 RoutingKey,每一个消息队列都被一个消费者监听。
- 每个消息队列自己唯一的 Routekey 不是一个确定值,更像是正则表达式对应的匹配规则。
- 消息生产者将消息发送给交换机,交换机根据 RoutingKey 模糊匹配到对应的队列,由监听队列的消费者消费消息。
- 路由模糊匹配的规则:
#
和 *
都是代表通配符。*
代表一个单词,#
代表零个或多个单词。
![]()
案例代码
发布确认模式
概念介绍
- 这种模式是 RabbitMQ 确保消息可靠投递的机制。
- 生产者发送消息后,Broker 会异步返回 ACK(
channel.basicAck()
)表示消息已被接收并处理(如持久化到磁盘),若因故障或无法路由导致失败则返回 Nack(channel.basicNack()
)。 - 相比事务机制,该模式性能更高,适用于要求消息严格防丢失的场景,需要通过
channel.confirmSelect()
启用并监听确认结果,常配合消息持久化、重试等策略一起使用。
案例代码