RabbitMQ 入门教程之十一
大纲
- RabbitMQ 入门教程之一、RabbitMQ 入门教程之二、RabbitMQ 入门教程之三
- RabbitMQ 入门教程之四、RabbitMQ 入门教程之五、RabbitMQ 入门教程之六
- RabbitMQ 入门教程之七、RabbitMQ 入门教程之八、RabbitMQ 入门教程之九
- RabbitMQ 入门教程之十、RabbitMQ 入门教程之十一
前言
学习资源
业务背景
在一个典型的分布式系统中,存在地理上分布较远的多个节点,例如部署在北京的 RabbitMQ 节点(Broker 北京)和部署在深圳的 RabbitMQ 节点(Broker 深圳)。由于两地之间网络延迟较大,跨地域通信时延是必须面对的问题。假设有一个部署在北京的数据生产方(Client 北京),需要连接到本地的 Broker 北京,并向其中的交换机 exchangeA 发送消息。由于网络延迟极小,即便启用了发布确认机制或事务机制,Client 北京也能迅速地将消息发送至 exchangeA,并及时收到确认响应,整体通信效率较高。然而,当另一个部署在深圳的业务系统(Client 深圳)也需要向 exchangeA 发送消息时,就面临了显著的延迟问题。由于 Client 深圳需跨地域连接到 Broker 北京,消息的发送及等待确认过程会受到网络延迟的影响,尤其在启用发布确认机制或事务机制时,延迟变得更加明显。这不仅会导致消息发送线程响应变慢,降低吞吐性能,严重时甚至可能造成线程阻塞,影响整体业务的稳定性。一种看似可行的方案是将 Client 深圳迁移部署到北京的机房,从而缩短其与 Broker 北京之间的通信延迟。然而,这种做法往往会引发新的问题。例如,Client 深圳可能还依赖部署在深圳的其他服务,迁移到北京后又会增加访问这些服务的网络延迟。此外,完全将所有业务系统集中部署在同一个机房,不仅违背了分布式部署和负载均衡的初衷,也严重影响了系统的容灾能力与灵活性。为了解决跨地域通信延迟带来的问题,RabbitMQ 提供了 Federation 插件和 Shovel 插件。通过 Shovel 插件,可以在 Broker 北京和 Broker 深圳之间建立推送通道,使得 Client 深圳可以就近连接到本地的 Broker 深圳,并将消息通过 Shovel 插件自动转发至 Broker 北京的 exchangeA。这样既避免了直接跨地域通信造成的高延迟,又保留了系统架构的灵活性与容灾能力,是一种高效且实用的方案。
RabbitMQ Federated 插件
概念介绍
Federated Plugin(联邦插件)是 RabbitMQ 官方提供的一种跨节点或者跨集群消息桥接机制,主要用于在不同 RabbitMQ 节点或集群之间转发消息,而不需要将它们合并成一个集群(如下图所示)。
Federated 插件的桥接方式
Federation Exchange
:联邦交换机,将远程交换机中的消息拉取(Pull)并复制到本地交换机。Federation Queue
:联邦队列,将远程队列中的消息拉取(Pull)并复制到本地队列。
Federated 插件的核心作用
- 在多个 RabbitMQ 节点(或者集群)之间建立松耦合的消息流通通道。
- 实现消息从一个集群的交换机或队列自动桥接到另一个集群。
- 不要求各个集群之间网络完全互通或者节点角色一致。
- 适用于地理分布式部署、跨数据中心传输、云混合架构等场景。
Federated 插件的核心特性
- 松耦合架构:各集群独立运行,互不依赖。
- 灵活配置:按需定义哪些交换机或队列联邦同步。
- 适应网络限制:可通过 NAT、防火墙、VPN 等中介连接。
- 按需消息传输:只有在本地有消费者订阅时才开始拉取消息(避免浪费)。
Federated 插件的使用场景
- 跨数据中心消息传递:比如有两个数据中心,各自有独立的 RabbitMQ 集群,通过 Federation 插件可以把消息从一个数据中心传给另一个数据中心。
- 跨组织通信:不同组织、子公司、服务系统之间 RabbitMQ 独立部署的场景下互联互通。
- 可控的数据同步:Federation 插件是基于拉取(Pull)的方式,不会对远程 RabbitMQ 造成压力,也能做流量控制。
- 异地灾备:如果一个 RabbitMQ 出问题,可以将消息同步到另一个 RabbitMQ 做备份处理(不是实时高可用,但可以保证消息冗余)。
Federated 插件与集群的区别
Federated 插件 | RabbitMQ 集群 |
---|---|
节点彼此独立 | 节点组成统一集群 |
松耦合 | 强耦合(共享元数据) |
可跨网络、跨数据中心 | 需低延迟、稳定内网 |
消息通过桥接拉取(Pull) | 消息自动路由同步 |
特别注意
- 使用 Federated 插件(包括联邦交换机、联邦队列)时,消息都是下游(Downstream)通过拉取方式获取的,而不是上游(Upstream)主动推送消息,所以消息获取方式都是拉模式(Pull-Based)。
专业术语
在 RabbitMQ 的 Federated Plugin(联邦插件)中,Federated Link 是从 Downstream(本地)连接到 Upstream(远程):
Upstream(上游)
:被订阅的远程 RabbitMQ 节点(即数据源)。Downstream(下游)
:发起连接、接收或转发消息的本地 RabbitMQ 节点。Federation Link
:这是 Downstream(下游)主动拉取消息的桥梁。
特别注意
- 在 Federated Exchange(联邦交换机)中,下游从上游的交换机中拉取消息,并重新发布到本地交换机,供本地消费者消费。
- 在 Federated Queue(联邦队列)中,当本地有消费者订阅时,下游从上游的队列中拉取消息,并重新传递到本地队列,供本地消费者消费。
安装插件
- 由于 RabbitMQ 默认自带了 Federated 插件,包括
rabbitmq_federation
和rabbitmq_federation_management
,因此只需要在所需的 RabbitMQ 节点上分别启用 Federated 插件即可,如下所示:
1 | # 启用 rabbitmq_federation 插件 |
- 当 Federated 插件成功启用后,在 RabbitMQ 节点的控制台页面中,可以看到以下内容:
使用说明
Federated 插件的使用步骤:
- (1) 启用
rabbitmq_federation
和rabbitmq_federation_management
插件 - (2) 配置远程服务器(Upstream)
- (3) 创建 Federated Exchange 或 Federated Queue(通过管理界面或者命令行创建策略)
- (4) 消费者连接本地队列即可间接消费远程消息
Federated Exchagne
概念介绍
Federated Exchange(联邦交换机)是 Federation 插件提供的一种机制,用于在不同 RabbitMQ 节点(或者集群)之间联通交换机,使消息可以从一个交换机自动路由到另一个交换机中。值得一提的是,联邦交换机可以让消息在多个 RabbitMQ 节点(或者集群)中实现交换机级别的广播复制。
Federated Exchange 的工作原理
- 在两个或多个 RabbitMQ 节点(或者集群)中,各自定义一个名称相同的交换机。
- 本地交换机(Downstream)通过
federation upstream
指定远程交换机(Upstream)。 - 当有消息发布到远程交换机(Upstream)时,本地交换机(Downstream)会通过联邦链接从远程交换机(Upstream)将消息拉取(Pull)到本地,消息就会像正常发布到本地交换机那样进行路由,最后分发到本地绑定的队列中。
Federated Exchange 的核心特性
- 主动拉取:本地交换机(Downstream)主动连接远程交换机(Upstream),从远程拉取发布的消息。
- 拓扑独立:不需要队列之间互联,只需交换机保持同步,适用于解耦多个站点。
- 灵活匹配:联邦可应用于任意交换机类型,比如支持
fanout
、direct
、topic
、headers
等类型的交换机。 - 消息原样保留:被拉取的消息会重新发布到本地交换机,保留原始的 RoutingKey、Headers 等属性。
Federated Exchange 的注意事项
- 消息是拉取式同步:下游节点通过 AMQP Consumer 从上游获取消息,若连接断开则消息不会缓存,断开期间消息会丢失(除非 Upstream 队列持久化)。
- 存在重复消息可能:若配置多个 Federated Upstream,可能导致消息重复,需要业务层实现幂等消费。
- 不具备消费 ACK 保证机制:消息仅转发一次,不能确认是否被成功消费,不具备像 Queue 那样的可靠性保障。
- 本地需持久化:联邦交换机不存储消息,需要配合绑定本地持久化队列实现落盘。
Federated Exchange 的适用场景
- 多数据中心同步广播消息(如总部通知分部)。
- 多区域事件发布(如多地游戏服务器的事件联播)。
- 跨网络隔离系统同步数据(比如生产网到测试网)。
Federated Exchange 与其他机制对比
特性 | Federated Exchange | Federated Queue | Shovel 插件 |
---|---|---|---|
传输模式 | 拉取(Pull) | 拉取(Pull) | 拉取(Pull) |
联通对象 | 交换机 → 交换机 | 队列 → 队列 | 队列 → 交换机 / 队列 |
典型场景 | 发布 / 订阅场景下的消息广播同步 | 拉取型任务聚合(如分布式任务) | 精准的数据迁移与桥接 |
使用案例
本节将演示如何使用 Federation 插件中的 Federation Exchange(联邦交换机),整体工作流程如下图所示:
提示
- 在上面的工作流程图中,使用了 Federation Exchange(联邦交换机),当发送消息到独立节点一(
node1 upstream
)的交换机fed_exchange
时,RabbitMQ 会将消息发送给独立节点一中的本地队列node1_queue
。同时,联邦交换机还会从独立节点一的交换机拉取(Pull)消息,并发送给独立节点二(node2 upstream
)的交换机fed_exchange
,然后独立节点二的交换机fed_exchange
再将消息发送到本地队列node2_queue
。这相当于独立节点二可以消费到跟独立节点一相同的消息,从而让消息可以在多个 RabbitMQ 节点(或者集群)中实现交换机级别的广播复制。
版本说明
本案例使用的各软件版本如下表所示:
组件 | 版本 | 说明 |
---|---|---|
RabbitMQ Server | 3.8.26 | |
RabbitMQ Client | 5.10.0 | |
Erlang | 24.2 | |
Java | 11 |
节点说明
这里简单使用两个独立的 RabbitMQ 节点(非同一个集群),并且都已经启用 Federated 插件,如下表所示:
节点的名称 | 节点的 IP | 节点暴露的端口 | 节点的主机名称 | 节点的控制台访问地址 | 说明 |
---|---|---|---|---|---|
独立节点一 | 192.168.2.143 | 5673 、15673 | rabbitmq-node1 | http://192.168.2.143:15673 | 作为 Upstream(上游) |
独立节点二 | 192.168.2.144 | 5674 、15674 | rabbitmq-node2 | http://192.168.2.144:15674 | 作为 Downstream(下游) |
创建 Exchange
- 在 RabbitMQ 独立节点一(
rabbitmq-node1
)中,创建对应的交换机和队列,并建立绑定关系,可以通过 Web 控制台或者 Java 代码进行创建,同时让消费者订阅队列中的消息。特别注意,在 RabbitMQ 两个独立节点中,交换机的名称必须完全一致。
1 | import com.rabbitmq.client.BuiltinExchangeType; |
- 在 RabbitMQ 独立节点二(
rabbitmq-node2
)中,创建对应的交换机和队列,并建立绑定关系,可以通过 Web 控制台或者 Java 代码进行创建,同时让消费者订阅队列中的消息。特别注意,在 RabbitMQ 两个独立节点中,交换机的名称必须完全一致。
1 | import com.rabbitmq.client.BuiltinExchangeType; |
添加 Upstream
- 在 RabbitMQ 独立节点二(
rabbitmq-node2
)的 Web 控制台中,添加节点一(rabbitmq-node1
)作为 Upstream。
参数说明:
Name
:Upstream 的名称URI
:Upstream 的 URI 地址,其中amqp://
是协议前缀,admin:admin
用户名和密码,@rabbitmq-node1
是主机名称。Acknowledgement Mode
:下游从上游拉取消息后,何时向上游确认(ACK)这条消息已被 “接收” 了。它控制的是 Federation Exchange 与上游的消息拉取确认行为,而不是消费者消费消息的 ACK 行为。
特别注意
Acknowledgement Mode 不是指消费者的 ACK 机制,也不等同于 RabbitMQ Queue 中的消息消费确认机制,只是针对 Federation Exchange 拉取消息的内部行为。Acknowledgement Mode 的三种取值含义如下表所示。
确认模式 | 是否等待下游 Confirm | 是否等待消费者处理 | 具体含义 | 风险 / 特点 |
---|---|---|---|---|
on-confirm | ✅ 等待 confirm | ❌ 不管 | 默认值,Federation Exchange 拉取到消息后,等下游的 Exchange 成功接收(RabbitMQ 层面 Confirm)后才向上游 ACK | 最安全,可靠性最高,推荐用于关键业务 |
on-publish | ❌ 不等待 | ❌ 不管 | 拉取到消息并成功发布(指投递动作没有抛出错误,如交换机不存在、连接中断等)到下游的 Exchange 后立即 ACK,不等待 Confirm | 性能优于 on-confirm ,风险略高 |
no-ack | ❌ 不等待 | ❌ 不管 | 拉取到消息立即 ACK,不管下游的 Exchange 是否成功接收消息 | 风险最大,易丢消息(断线、未投递成功等) |
添加策略(Policy)
- 在 RabbitMQ 独立节点二(
rabbitmq-node2
)的 Web 控制台中,添加策略(Policy),让名称为node1-as-upstream
的 Upstream 应用在所有以fed_
开头的交换机。
查看联邦交换机
- 当 Upstream 和策略(Policy)都添加完成,并且交换机也创建完成后,如果一切正常运行,那么在独立节点二(
rabbitmq-node2
)的 Web 控制台中,可以在交换机列表中看到联邦交换机。
- 当 Upstream 和策略(Policy)都添加完成,并且交换机也创建完成后,如果一切正常运行,那么在独立节点二(
rabbitmq-node2
)的 Web 控制台中可以看到联邦状态显示正常。
特别注意
只有独立节点二(rabbitmq-node2
)中的交换机已经创建好,RabbitMQ 的 Web 控制台才会正常显示对应的联邦交换机和联邦状态。
测试联邦交换机
- 创建一个生产者,发送消息到独立节点一(
rabbitmq-node1
)中的交换机
1 | import com.rabbitmq.client.BuiltinExchangeType; |
- 代码测试结果:启动所有生产者和消费者应用,由于在独立节点二(
rabbitmq-node2
)中配置了 Federation Exchange(联邦交换机),当发送消息到 RabbitMQ 独立节点一(rabbitmq-node1
)中的交换机时,除了独立节点一(rabbitmq-node1
)中的消费者可以接收到消息,独立节点二(rabbitmq-node2
)中的消费者也可以接收到消息;即对于同一条消息,会被两个独立节点中的消费者同时消费。这就说明使用 Federation Exchange(联邦交换机)后,消息可以在多个 RabbitMQ 节点(或者集群)中实现交换机级别的广播复制。
Federated Queue
概念介绍
Federated Queue(联邦队列)是 Federation 插件提供的一种机制,用于在一个队列与另一个远程队列之间建立连接,将远程队列的消息拉取(Pull)到本地队列中 的一种机制。它不会复制队列的元数据(如绑定、消费者等),只是将消息从远程传输到本地,消费者只需要订阅本地队列即可消费远程消息。一个联邦队列可以连接一个或者多个上游队列(Upstream Queue), 并从这些上游队列中获取消息以满足本地消费者消费消息的需求。值得一提的是,Federated Queue 可以让多个 RabbitMQ 节点(或者集群)共享消费同一个逻辑队列的数据,即可以在跨节点的多个消费者之间轮询分发消息(效果类似 RabbitMQ 的工作队列模式)。
Federated Queue 的工作原理
- 联邦队列通过 HTTP API 从远程 RabbitMQ 节点拉取(Pull)目标队列的消息。
- 使用一个 “后台工作者”(Federation Link)定期拉取新消息。
- 拉取的消息会缓存在本地队列,供本地消费者消费。
- 联邦队列是基于拉模式(Pull-Based),而非主动推送。
Federated Queue 的核心特性
- 解耦:远程与本地队列解耦,连接断开时可自动恢复。
- 灵活性:支持配置哪些消息被拉取(如使用 RoutingKey 或 Headers)。
- 异地容错:适合跨区域部署(如一个数据中心消费另一个数据中心的数据)。
- 只拉不推:Federated Queue 是拉取(Pull)消息,然后供消费者消费。
Federated Queue 的注意事项
- 联邦队列不适用于要求高实时性、低延迟的场景。
- 如果远程队列消息量太大,本地可能会堆积消息。
- 发生网络异常时,消息同步会暂停,可能造成消费延迟。
Federated Queue 与普通 Queue 对比
特性 | Federated Queue | 普通 Queue |
---|---|---|
消息方向 | 拉取(Pull) | 本地消息处理 |
消息中继 | 是 | 否 |
延迟控制 | 有延迟(受网络和速率影响) | 本地处理较快 |
使用场景 | 跨地区数据聚合 | 常规消息传递 |
提示
一个联邦队列可以连接一个或者多个上游队列(Upstream Queue),并从这些上游队列中获取消息,以满足本地消费者消费消息的需求。
使用案例
本节将演示如何使用 Federation 插件中的 Federation Queue(联邦队列),整体工作流程如下图所示:
提示
- 在上面的工作流程图中,使用了 Federation Queue(联邦队列),当发送消息到独立节点一(
node1 upstream
)的队列时,RabbitMQ 会轮询分发消息给独立节点一(node1 upstream
)和独立节点二(node2 upstream
)的消费者。 - 举个例子,当发送消息 A 和消息 B 到独立节点一的队列时,独立节点一的消费者会接收到消息 A,而独立节点二的消费者会接收到消息 B,消息不会被重复消费。
版本说明
本案例使用的各软件版本如下表所示:
组件 | 版本 | 说明 |
---|---|---|
RabbitMQ Server | 3.8.26 | |
RabbitMQ Client | 5.10.0 | |
Erlang | 24.2 | |
Java | 11 |
节点说明
这里简单使用两个独立的 RabbitMQ 节点(非同一个集群),并且都已经启用 Federated 插件,如下表所示:
节点的名称 | 节点的 IP | 节点暴露的端口 | 节点的主机名称 | 节点的控制台访问地址 | 说明 |
---|---|---|---|---|---|
独立节点一 | 192.168.2.143 | 5673 、15673 | rabbitmq-node1 | http://192.168.2.143:15673 | 作为 Upstream(上游) |
独立节点二 | 192.168.2.144 | 5674 、15674 | rabbitmq-node2 | http://192.168.2.144:15674 | 作为 Downstream(下游) |
创建 Queue
- 在 RabbitMQ 独立节点一(
rabbitmq-node1
)中,创建对应的交换机和队列,并建立绑定关系,可以通过 Web 控制台或者 Java 代码进行创建,同时让消费者订阅队列中的消息。特别注意,在 RabbitMQ 两个独立节点中,队列的名称必须完全一致。
1 | import com.rabbitmq.client.BuiltinExchangeType; |
- 在 RabbitMQ 独立节点二(
rabbitmq-node2
)中,创建对应的交换机和队列,并建立绑定关系,可以通过 Web 控制台或者 Java 代码进行创建,同时让消费者订阅队列中的消息。特别注意,在 RabbitMQ 两个独立节点中,队列的名称必须完全一致。
1 | import com.rabbitmq.client.BuiltinExchangeType; |
添加 Upstream
- 在 RabbitMQ 独立节点二(
rabbitmq-node2
)的 Web 控制台中,添加节点一(rabbitmq-node1
)作为 Upstream。
参数说明:
Name
:Upstream 的名称URI
:Upstream 的 URI 地址,其中amqp://
是协议前缀,admin:admin
用户名和密码,@rabbitmq-node1
是主机名称。Acknowledgement Mode
:下游从上游拉取消息后,何时向上游确认(ACK)这条消息已被 “接收” 了。它控制的是 Federation Queue 与上游的消息拉取确认行为,而不是消费者消费消息的 ACK 行为。
特别注意
Acknowledgement Mode 不是指消费者的 ACK 机制,也不等同于 RabbitMQ Queue 中的消息消费确认机制,只是针对 Federation Queue 拉取消息的内部行为。Acknowledgement Mode 的三种取值含义如下表所示。
确认模式 | 是否等待下游 Confirm | 是否等待消费者处理 | 具体含义 | 风险 / 特点 |
---|---|---|---|---|
on-confirm | ✅ 等待 confirm | ❌ 不管 | 默认值,Federation Queue 拉取到消息后,等下游的 Queue 成功接收(指消息已经被 RabbitMQ 成功、安全地写入下游队列的存储结构中,包括内存或磁盘)后才向上游 ACK | 最安全,可靠性最高,推荐用于关键业务 |
on-publish | ❌ 不等待 | ❌ 不管 | 拉取到消息并成功发布(指投递动作没有抛出错误,如队列不存在、连接中断等)到下游的 Queue 后立即 ACK,不等待 Confirm | 性能优于 on-confirm ,风险略高 |
no-ack | ❌ 不等待 | ❌ 不管 | 拉取到消息立即 ACK,不管下游的 Queue 是否成功接收消息 | 风险最大,易丢消息(断线、未投递成功等) |
添加策略(Policy)
- 在 RabbitMQ 独立节点二(
rabbitmq-node2
)的 Web 控制台中,添加策略(Policy),让名称为node1-as-upstream
的 Upstream 应用在所有以fed.
开头的队列。
查看联邦队列
- 当 Upstream 和策略(Policy)都添加完成,并且队列也创建完成后,如果一切正常运行,那么在独立节点二(
rabbitmq-node2
)的 Web 控制台中可以看到联邦状态显示正常。
特别注意
只有独立节点二(rabbitmq-node2
)中的队列已经创建好,RabbitMQ 的 Web 控制台才会正常显示对应的联邦状态。
测试联邦队列
- 创建一个生产者,发送消息到独立节点一(
rabbitmq-node1
)中的队列
1 | import com.rabbitmq.client.BuiltinExchangeType; |
- 代码测试结果:启动所有生产者和消费者应用,由于在独立节点二(
rabbitmq-node2
)中配置了 Federation Queue(联邦队列),当发送消息到 RabbitMQ 独立节点一(rabbitmq-node1
)的队列时,RabbitMQ 会轮询分发消息给独立节点一(rabbitmq-node1
)和独立节点二(rabbitmq-node2
)的消费者。举个例子,当发送消息 A 和消息 B 到独立节点一的队列时,独立节点一的消费者会接收到消息 A,而独立节点二的消费者会接收到消息 B,消息不会被重复消费。这就说明使用 Federation Queue(联邦队列)后,可以在多个 RabbitMQ 节点(或者集群)之间为单个队列提供均衡负载的功能,即在跨节点的多个消费者之间轮询分发消息(效果类似 RabbitMQ 的工作队列模式)。
RabbitMQ Shovel 插件
概述介绍
RabbitMQ 的 Shovel 插件用于实现不同 RabbitMQ 服务器或虚拟主机(Vhost)之间的消息转发,其核心作用是 “铲子” 式地将消息从一个队列 “铲” 到另一个位置。Shovel 插件能够从一个 Broker 中的队列(作为源端,即 Source)中拉取(Pull)消息,并将其转发至另一个 Broker 中的交换机 / 队列(作为目标端,即 Destination)。源端的队列和目标端的交换机 / 队列既可以分别位于不同的 Broker,也可以位于同一个 Broker 中。这种机制使得 Shovel 插件特别适用于跨数据中心消息传输、系统迁移、灾备同步等场景,确保消息可靠地在 RabbitMQ 实例间流动。简而言之,Shovel 是一个 “拉取消息再发送出去” 的搬运工,即基于拉取(Pull)模式从源端的队列获取消息,再推送(Push)消息到目标端的交换机 / 队列。
Shovel 插件的工作原理
- Shovel 连接到源服务器(Source)并拉取指定队列。
- 拉取到消息后,将消息转发到目标服务器(Destination)。
- 确保目标端写入成功后,才确认(ACK)消费源端消息(保证消息不丢)。
- 若出现连接中断,会自动重试连接并继续搬运。
Shovel 插件的核心特性
- 跨实例传输:支持在不同 RabbitMQ 实例间转发消息
- 持续运行:它是一种持续运行的转发机制(非一次性)
- 可靠性:支持自动重连、持久化、确认机制
- 可配置性:支持静态配置(Static Shovel)和动态配置(Dynamic Shovel)
- 支持 AMQP
0.9.1
:兼容 RabbitMQ 默认协议
Shovel 插件的注意事项
- 大量 Shovel 实例会消耗较多连接资源、通道和内存,建议限制并发 Shovel 数量。
- 消息顺序不能百分百保证,特别在连接中断重连、存在多个 Shovel 的情况下。
- 若业务对消息顺序、消息重复敏感,则需要在消费端做幂等性处理或顺序校验。
- 可通过 RabbitMQ 管理控制台或 HTTP API 监控 Shovel 状态,推荐集成监控系统报警异常。
Shovel 插件的使用场景
- 跨数据中心的消息转发
- 消息系统迁移
- 多活集群的消息同步
- 灾备系统的消息复制
Shovel 插件的配置说明
- 源端和目标端都应设置合理的权限控制,避免越权访问。
- 参数
ack-mode
建议设置为on-confirm
或on-publish
,以保证消息成功写入目标端后再确认源端消费,避免消息丢失。 - 可以设置
prefetch-count
(预取数量)来控制一次拉取的消息数量,避免消息积压。 - 连接异常时,Shovel 会自动尝试重连,使用
reconnect-delay
参数设置重连间隔,比如reconnect-delay: 5
。 - 静态配置(Static Shovel):通过
rabbitmq.conf
文件或环境变量配置,适合在服务启动前就确定连接和队列信息的场景。 - 动态配置(Dynamic Shovel):使用 RabbitMQ Management UI 或 HTTP API 动态创建和管理 Shovel,适合运行时灵活调整。
总结
- Shovel 插件是基于消费机制实现的,它从队列中 "消费" 消息(就像一个消费者一样),所以消息源(源端 - Source)必须是一个队列,消息目标(目标端 - Destination)则通常是交换机,间接也支持目标端为队列。
- 在 RabbitMQ 控制台的配置页面中,虽然 Shovel 的源端(Source)可以选择是交换机,但本质上它依然是通过中间队列消费消息。Shovel 插件不能作为 "监听交换机" 的工具,而是从现有队列中读取消息进行再发布。
Shovel 插件与 Federated 插件的对比
特性 / 机制 | Shovel 插件 | Federated Exchange(联邦交换机) | Federated Queue(联邦队列) |
---|---|---|---|
连接发起方 | 目标端主动连接源端 | 目标端主动连接源端 | 目标端主动连接源端 |
消息获取模式 | 从源队列拉取(Pull) | 从源交换机拉取(Pull) | 从源队列拉取并消费 |
源端行为 | 类似普通 Broker,无特殊行为 | 类似普通 Broker,无特殊行为 | 类似普通 Broker,无特殊行为 |
目标端行为 | 拉取消息 → 发布到目标交换机 / 队列 | 拉取消息 → 再次路由到本地交换机 | 拉取消息 → 直接投递到本地队列 |
支持方向 | 支持队列到交换机、队列到队列 | 仅支持交换机级别联邦 | 仅支持队列级别联邦 |
拉取粒度 | 手动配置:逐条、批量、确认等 | 自动控制,类似 AMQP 消费者 | 自动控制,类似 AMQP 消费者 |
使用方式 | 插件 + 显式配置(静态 / 动态) | 插件 + 通过策略自动应用 | 插件 + 通过策略自动应用 |
适用场景 | 精细控制、混合部署、定制路由 | 多站点广播、发布同步 | 多个节点共享消费同一个逻辑队列的数据(即轮询分发消息给多个消费者) |
消息可靠性控制 | 高(支持 ACK、持久化等) | 中等(依赖 AMQP 投递机制) | 中等(依赖消费确认机制) |
安装插件
- 由于 RabbitMQ 默认自带了 Shovel 插件,包括
rabbitmq_shovel
和rabbitmq_shovel_management
,因此只需要在所需的 RabbitMQ 节点上分别启用 Shovel 插件即可,如下所示:
1 | # 启用 rabbitmq_shovel 插件 |
- 当 Shovel 插件成功启用后,在 RabbitMQ 节点的控制台页面中,可以看到以下内容:
使用案例
本节将演示如何使用 Shovel 插件,整体工作流程如下图所示:
版本说明
本案例使用的各软件版本如下表所示:
组件 | 版本 | 说明 |
---|---|---|
RabbitMQ Server | 3.8.26 | |
RabbitMQ Client | 5.10.0 | |
Erlang | 24.2 | |
Java | 11 |
节点说明
这里简单使用两个独立的 RabbitMQ 节点(非同一个集群),并且都已经启用 Shovel 插件,如下表所示:
节点的名称 | 节点的 IP | 节点暴露的端口 | 节点的主机名称 | 节点的控制台访问地址 | 说明 |
---|---|---|---|---|---|
独立节点一 | 192.168.2.143 | 5673 、15673 | rabbitmq-node1 | http://192.168.2.143:15673 | 作为源端(Source) |
独立节点二 | 192.168.2.144 | 5674 、15674 | rabbitmq-node2 | http://192.168.2.144:15674 | 作为目标端(Destination) |
创建队列
- 在 RabbitMQ 独立节点一(
rabbitmq-node1
)中,创建对应的交换机和队列,并建立绑定关系,可以通过 Web 控制台或者 Java 代码进行创建,同时让消费者订阅队列中的消息。
1 | import com.rabbitmq.client.BuiltinExchangeType; |
- 在 RabbitMQ 独立节点二(
rabbitmq-node2
)中,创建对应的交换机和队列,并建立绑定关系,可以通过 Web 控制台或者 Java 代码进行创建,同时让消费者订阅队列中的消息。
1 | import com.rabbitmq.client.BuiltinExchangeType; |
添加 Shovel
- 在 RabbitMQ 控制台的配置页面中,添加 Shovel 时可以将源端(Source)和目标端(Destination)设置为交换机或者队列,其中 RabbitMQ 的默认行为如下:
源端的选择 | 默认行为 |
---|---|
如果源端(Source)选择的是 Queue | RabbitMQ 会在创建 Shovel 之前,预先自动创建该队列(如果队列不存在就自动创建) |
如果源端(Source)选择的是 Exchange | RabbitMQ 不会预先自动创建该交换机,但如果源端(Source)是 Exchange,会自动创建一个内部队列,并绑定到这个 Exchange,以便从 Exchange 拉取消息 |
- 在 RabbitMQ 独立节点二(
rabbitmq-node2
)的 Web 控制台中,添加 Shovel,如下图所示:
参数说明:
Name:Shovel 的名称
URI:节点 的 URI 地址,其中amqp://
是协议前缀,admin:admin
用户名和密码,@rabbitmq-node1
是主机名称。
特别注意
Shovel 是 "谁配置,谁搬运",一般推荐将 Shovel 添加在目标端(Destination)节点上,也可以添加在中立第三方节点。这样目标端可以主动拉取消息,更安全可控,易于统一监控与运维。
查看 Shovel
- 当 Shovel 添加完成后,并且交换机 / 队列也创建完成后,如果一切正常运行,那么在独立节点二(
rabbitmq-node2
)的 Web 控制台中可以看到 Shovel 状态显示正常。
测试 Shovel
- 创建一个生产者,发送消息到独立节点一(
rabbitmq-node1
)中的队列
1 | import com.rabbitmq.client.BuiltinExchangeType; |
- 代码测试结果:启动所有生产者和消费者应用,由于在独立节点二(
rabbitmq-node2
)中配置了 Shovel,且源端(Source)和目标端(Destination)选择的都是队列,当发送消息到 RabbitMQ 独立节点一(rabbitmq-node1
)的队列时,RabbitMQ 会轮询分发消息给独立节点一(rabbitmq-node1
)和独立节点二(rabbitmq-node2
)的消费者。举个例子,当发送消息 A 和消息 B 到独立节点一的队列时,独立节点一的消费者会接收到消息 A,而独立节点二的消费者会接收到消息 B,消息不会被重复消费。
使用总结说明
在 RabbitMQ 的 Web 控制台中,对于 Shovel 的源端(Source)和目标端(Destination)的不同配置,有着不同的运行效果:
源端选择 | 目标端选择 | 实际运行效果 | 运行效果类比 |
---|---|---|---|
队列 | 队列 | 从源队列消费消息,重新发布到目标队列(目标队列可在不同实例),实现真正的 “消息搬运” | 运行效果类似 Federated Queue(联邦队列),可以让多个 RabbitMQ 节点(或者集群)共享消费同一个逻辑队列的数据,即可以在跨节点的多个消费者之间轮询分发消息 |
交换机 | 交换机 | 自动绑定一个临时队列到源交换机 → 从该队列消费消息 → 发布到目标交换机 | 运行效果类似 Federated Exchange(联邦交换机),可实现广播复制效果 |
队列 | 交换机 | 从源队列消费消息,重新发布到目标交换机,再由目标交换机路由到绑定的队列 | 最常见用法,灵活可靠 |
交换机 | 队列 | 自动绑定一个临时队列到源交换机 → 从该队列消费消息 → 直接投递到目标队列 | 支持,但配置上需要确保匹配路由键 |