RabbitMQ 入门教程之十一

大纲

前言

学习资源

业务背景

在一个典型的分布式系统中,存在地理上分布较远的多个节点,例如部署在北京的 RabbitMQ 节点(Broker 北京)和部署在深圳的 RabbitMQ 节点(Broker 深圳)。由于两地之间网络延迟较大,跨地域通信时延是必须面对的问题。假设有一个部署在北京的数据生产方(Client 北京),需要连接到本地的 Broker 北京,并向其中的交换机 exchangeA 发送消息。由于网络延迟极小,即便启用了发布确认机制或事务机制,Client 北京也能迅速地将消息发送至 exchangeA,并及时收到确认响应,整体通信效率较高。然而,当另一个部署在深圳的业务系统(Client 深圳)也需要向 exchangeA 发送消息时,就面临了显著的延迟问题。由于 Client 深圳需跨地域连接到 Broker 北京,消息的发送及等待确认过程会受到网络延迟的影响,尤其在启用发布确认机制或事务机制时,延迟变得更加明显。这不仅会导致消息发送线程响应变慢,降低吞吐性能,严重时甚至可能造成线程阻塞,影响整体业务的稳定性。一种看似可行的方案是将 Client 深圳迁移部署到北京的机房,从而缩短其与 Broker 北京之间的通信延迟。然而,这种做法往往会引发新的问题。例如,Client 深圳可能还依赖部署在深圳的其他服务,迁移到北京后又会增加访问这些服务的网络延迟。此外,完全将所有业务系统集中部署在同一个机房,不仅违背了分布式部署和负载均衡的初衷,也严重影响了系统的容灾能力与灵活性。为了解决跨地域通信延迟带来的问题,RabbitMQ 提供了 Federation 插件和 Shovel 插件。通过 Shovel 插件,可以在 Broker 北京和 Broker 深圳之间建立推送通道,使得 Client 深圳可以就近连接到本地的 Broker 深圳,并将消息通过 Shovel 插件自动转发至 Broker 北京的 exchangeA。这样既避免了直接跨地域通信造成的高延迟,又保留了系统架构的灵活性与容灾能力,是一种高效且实用的方案。

RabbitMQ Federated 插件

概念介绍

Federated Plugin(联邦插件)是 RabbitMQ 官方提供的一种跨节点或者跨集群消息桥接机制,主要用于在不同 RabbitMQ 节点或集群之间转发消息,而不需要将它们合并成一个集群(如下图所示)。

  • Federated 插件的桥接方式

    • Federation Exchange:联邦交换机,将远程交换机中的消息拉取(Pull)并复制到本地交换机。
    • Federation Queue:联邦队列,将远程队列中的消息拉取(Pull)并复制到本地队列。
  • Federated 插件的核心作用

    • 在多个 RabbitMQ 节点(或者集群)之间建立松耦合的消息流通通道。
    • 实现消息从一个集群的交换机或队列自动桥接到另一个集群。
    • 不要求各个集群之间网络完全互通或者节点角色一致。
    • 适用于地理分布式部署、跨数据中心传输、云混合架构等场景。
  • Federated 插件的核心特性

    • 松耦合架构:各集群独立运行,互不依赖。
    • 灵活配置:按需定义哪些交换机或队列联邦同步。
    • 适应网络限制:可通过 NAT、防火墙、VPN 等中介连接。
    • 按需消息传输:只有在本地有消费者订阅时才开始拉取消息(避免浪费)。
  • Federated 插件的使用场景

    • 跨数据中心消息传递:比如有两个数据中心,各自有独立的 RabbitMQ 集群,通过 Federation 插件可以把消息从一个数据中心传给另一个数据中心。
    • 跨组织通信:不同组织、子公司、服务系统之间 RabbitMQ 独立部署的场景下互联互通。
    • 可控的数据同步:Federation 插件是基于拉取(Pull)的方式,不会对远程 RabbitMQ 造成压力,也能做流量控制。
    • 异地灾备:如果一个 RabbitMQ 出问题,可以将消息同步到另一个 RabbitMQ 做备份处理(不是实时高可用,但可以保证消息冗余)。
  • Federated 插件与集群的区别

Federated 插件 RabbitMQ 集群
节点彼此独立节点组成统一集群
松耦合强耦合(共享元数据)
可跨网络、跨数据中心需低延迟、稳定内网
消息通过桥接拉取(Pull)消息自动路由同步

特别注意

  • 使用 Federated 插件(包括联邦交换机、联邦队列)时,消息都是下游(Downstream)通过拉取方式获取的,而不是上游(Upstream)主动推送消息,所以消息获取方式都是拉模式(Pull-Based)。

专业术语

在 RabbitMQ 的 Federated Plugin(联邦插件)中,Federated Link 是从 Downstream(本地)连接到 Upstream(远程):

  • Upstream(上游):被订阅的远程 RabbitMQ 节点(即数据源)。
  • Downstream(下游):发起连接、接收或转发消息的本地 RabbitMQ 节点。
  • Federation Link:这是 Downstream(下游)主动拉取消息的桥梁。

特别注意

  • 在 Federated Exchange(联邦交换机)中,下游从上游的交换机中拉取消息,并重新发布到本地交换机,供本地消费者消费。
  • 在 Federated Queue(联邦队列)中,当本地有消费者订阅时,下游从上游的队列中拉取消息,并重新传递到本地队列,供本地消费者消费。

安装插件

  • 由于 RabbitMQ 默认自带了 Federated 插件,包括 rabbitmq_federationrabbitmq_federation_management,因此只需要在所需的 RabbitMQ 节点上分别启用 Federated 插件即可,如下所示:
1
2
3
4
5
# 启用 rabbitmq_federation 插件
rabbitmq-plugins enable rabbitmq_federation

# 启用 rabbitmq_federation_management 插件
rabbitmq-plugins enable rabbitmq_federation_management
  • 当 Federated 插件成功启用后,在 RabbitMQ 节点的控制台页面中,可以看到以下内容:

使用说明

Federated 插件的使用步骤:

  • (1) 启用 rabbitmq_federationrabbitmq_federation_management 插件
  • (2) 配置远程服务器(Upstream)
  • (3) 创建 Federated Exchange 或 Federated Queue(通过管理界面或者命令行创建策略)
  • (4) 消费者连接本地队列即可间接消费远程消息

Federated Exchagne

概念介绍

Federated Exchange(联邦交换机)是 Federation 插件提供的一种机制,用于在不同 RabbitMQ 节点(或者集群)之间联通交换机,使消息可以从一个交换机自动路由到另一个交换机中。值得一提的是,联邦交换机可以让消息在多个 RabbitMQ 节点(或者集群)中实现交换机级别的广播复制。

  • Federated Exchange 的工作原理

    • 在两个或多个 RabbitMQ 节点(或者集群)中,各自定义一个名称相同的交换机
    • 本地交换机(Downstream)通过 federation upstream 指定远程交换机(Upstream)。
    • 当有消息发布到远程交换机(Upstream)时,本地交换机(Downstream)会通过联邦链接从远程交换机(Upstream)将消息拉取(Pull)到本地,消息就会像正常发布到本地交换机那样进行路由,最后分发到本地绑定的队列中。
  • Federated Exchange 的核心特性

    • 主动拉取:本地交换机(Downstream)主动连接远程交换机(Upstream),从远程拉取发布的消息。
    • 拓扑独立:不需要队列之间互联,只需交换机保持同步,适用于解耦多个站点。
    • 灵活匹配:联邦可应用于任意交换机类型,比如支持 fanoutdirecttopicheaders 等类型的交换机。
    • 消息原样保留:被拉取的消息会重新发布到本地交换机,保留原始的 RoutingKey、Headers 等属性。
  • Federated Exchange 的注意事项

    • 消息是拉取式同步:下游节点通过 AMQP Consumer 从上游获取消息,若连接断开则消息不会缓存,断开期间消息会丢失(除非 Upstream 队列持久化)。
    • 存在重复消息可能:若配置多个 Federated Upstream,可能导致消息重复,需要业务层实现幂等消费。
    • 不具备消费 ACK 保证机制:消息仅转发一次,不能确认是否被成功消费,不具备像 Queue 那样的可靠性保障。
    • 本地需持久化:联邦交换机不存储消息,需要配合绑定本地持久化队列实现落盘。
  • Federated Exchange 的适用场景

    • 多数据中心同步广播消息(如总部通知分部)。
    • 多区域事件发布(如多地游戏服务器的事件联播)。
    • 跨网络隔离系统同步数据(比如生产网到测试网)。
  • Federated Exchange 与其他机制对比

特性 Federated ExchangeFederated QueueShovel 插件
传输模式拉取(Pull)拉取(Pull)拉取(Pull)
联通对象交换机 → 交换机队列 → 队列队列 → 交换机 / 队列
典型场景发布 / 订阅场景下的消息广播同步拉取型任务聚合(如分布式任务)精准的数据迁移与桥接

使用案例

本节将演示如何使用 Federation 插件中的 Federation Exchange(联邦交换机),整体工作流程如下图所示:

提示

  • 在上面的工作流程图中,使用了 Federation Exchange(联邦交换机),当发送消息到独立节点一(node1 upstream)的交换机 fed_exchange 时,RabbitMQ 会将消息发送给独立节点一中的本地队列 node1_queue。同时,联邦交换机还会从独立节点一的交换机拉取(Pull)消息,并发送给独立节点二(node2 upstream)的交换机 fed_exchange,然后独立节点二的交换机 fed_exchange 再将消息发送到本地队列 node2_queue。这相当于独立节点二可以消费到跟独立节点一相同的消息,从而让消息可以在多个 RabbitMQ 节点(或者集群)中实现交换机级别的广播复制。
版本说明

本案例使用的各软件版本如下表所示:

组件版本说明
RabbitMQ Server3.8.26
RabbitMQ Client5.10.0
Erlang24.2
Java11
节点说明

这里简单使用两个独立的 RabbitMQ 节点(非同一个集群),并且都已经启用 Federated 插件,如下表所示:

节点的名称节点的 IP 节点暴露的端口节点的主机名称节点的控制台访问地址说明
独立节点一 192.168.2.143567315673rabbitmq-node1http://192.168.2.143:15673作为 Upstream(上游)
独立节点二 192.168.2.144567415674rabbitmq-node2http://192.168.2.144:15674作为 Downstream(下游)
创建 Exchange
  • 在 RabbitMQ 独立节点一(rabbitmq-node1)中,创建对应的交换机和队列,并建立绑定关系,可以通过 Web 控制台或者 Java 代码进行创建,同时让消费者订阅队列中的消息。特别注意,在 RabbitMQ 两个独立节点中,交换机的名称必须完全一致。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

/**
* RabbitMQ 独立节点一(Upstream)的消费者
*/
public class MQConsumerNode1 {

// 交换机的名称
public static final String EXCHANGE_NAME = "fed_exchange";

// 队列的名称
public static final String QUEUE_NAME = "node1_queue";

// 队列的路由键(绑定键)
public static final String QUEUE_ROUTING_KEY = "routekey";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.143");
factory.setPort(5673);
factory.setUsername("admin");
factory.setPassword("admin");

// 创建连接
Connection connection = factory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 绑定交换机和队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey(如果不使用 Routingkey,可填空字符串)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, QUEUE_ROUTING_KEY);

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("Successed to consume message : " + msg);

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者开始消费消息...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
connection.close();
}

}
  • 在 RabbitMQ 独立节点二(rabbitmq-node2)中,创建对应的交换机和队列,并建立绑定关系,可以通过 Web 控制台或者 Java 代码进行创建,同时让消费者订阅队列中的消息。特别注意,在 RabbitMQ 两个独立节点中,交换机的名称必须完全一致。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

/**
* RabbitMQ 独立节点二(Downstream)的消费者
*/
public class MQConsumerNode2 {

// 交换机的名称
public static final String EXCHANGE_NAME = "fed_exchange";

// 队列的名称
public static final String QUEUE_NAME = "node2_queue";

// 队列的路由键(绑定键)
public static final String QUEUE_ROUTING_KEY = "routekey";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.144");
factory.setPort(5674);
factory.setUsername("admin");
factory.setPassword("admin");

// 创建连接
Connection connection = factory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 绑定交换机和队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey(如果不使用 Routingkey,可填空字符串)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, QUEUE_ROUTING_KEY);

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("Successed to consume message : " + msg);

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者开始消费消息...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
connection.close();
}

}
添加 Upstream
  • 在 RabbitMQ 独立节点二(rabbitmq-node2)的 Web 控制台中,添加节点一(rabbitmq-node1)作为 Upstream。

参数说明:
Name:Upstream 的名称
URI:Upstream 的 URI 地址,其中 amqp:// 是协议前缀,admin:admin 用户名和密码,@rabbitmq-node1 是主机名称。
Acknowledgement Mode:下游从上游拉取消息后,何时向上游确认(ACK)这条消息已被 “接收” 了。它控制的是 Federation Exchange 与上游的消息拉取确认行为,而不是消费者消费消息的 ACK 行为。

特别注意

Acknowledgement Mode 不是指消费者的 ACK 机制,也不等同于 RabbitMQ Queue 中的消息消费确认机制,只是针对 Federation Exchange 拉取消息的内部行为。Acknowledgement Mode 的三种取值含义如下表所示。

确认模式是否等待下游 Confirm 是否等待消费者处理具体含义风险 / 特点
on-confirm✅ 等待 confirm❌ 不管默认值,Federation Exchange 拉取到消息后,等下游的 Exchange 成功接收(RabbitMQ 层面 Confirm)后才向上游 ACK 最安全,可靠性最高,推荐用于关键业务
on-publish❌ 不等待❌ 不管拉取到消息并成功发布(指投递动作没有抛出错误,如交换机不存在、连接中断等)到下游的 Exchange 后立即 ACK,不等待 Confirm 性能优于 on-confirm,风险略高
no-ack❌ 不等待❌ 不管拉取到消息立即 ACK,不管下游的 Exchange 是否成功接收消息风险最大,易丢消息(断线、未投递成功等)
添加策略(Policy)
  • 在 RabbitMQ 独立节点二(rabbitmq-node2)的 Web 控制台中,添加策略(Policy),让名称为 node1-as-upstream 的 Upstream 应用在所有以 fed_ 开头的交换机。

查看联邦交换机
  • 当 Upstream 和策略(Policy)都添加完成,并且交换机也创建完成后,如果一切正常运行,那么在独立节点二(rabbitmq-node2)的 Web 控制台中,可以在交换机列表中看到联邦交换机。

  • 当 Upstream 和策略(Policy)都添加完成,并且交换机也创建完成后,如果一切正常运行,那么在独立节点二(rabbitmq-node2)的 Web 控制台中可以看到联邦状态显示正常。

特别注意

只有独立节点二(rabbitmq-node2)中的交换机已经创建好,RabbitMQ 的 Web 控制台才会正常显示对应的联邦交换机和联邦状态。

测试联邦交换机
  • 创建一个生产者,发送消息到独立节点一(rabbitmq-node1)中的交换机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.nio.charset.StandardCharsets;

/**
* RabbitMQ 独立节点一(Upstream)的生产者
*/
public class MQProducerNode1 {

// 交换机的名称
public static final String EXCHANGE_NAME = "fed_exchange";

// 队列的路由键(绑定键)
public static final String QUEUE_ROUTING_KEY = "routekey";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.143");
factory.setPort(5673);
factory.setUsername("admin");
factory.setPassword("admin");

// 使用 try-with-resources 自动关闭连接和通道,确保资源释放
try (
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel()
) {
// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

// 发送消息
// 参数说明:
// exchange – 要将消息发布到的交换机,空字符串表示默认交换机
// routingKey – 路由 Key
// props – 消息的其他属性,比如:使用 MessageProperties.PERSISTENT_TEXT_PLAIN 属性确保消息持久化,配合持久化队列可避免服务器重启导致消息丢失
// body – 消息内容
channel.basicPublish(EXCHANGE_NAME, QUEUE_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, "hello".getBytes(StandardCharsets.UTF_8));
}
}

}
  • 代码测试结果:启动所有生产者和消费者应用,由于在独立节点二(rabbitmq-node2)中配置了 Federation Exchange(联邦交换机),当发送消息到 RabbitMQ 独立节点一(rabbitmq-node1)中的交换机时,除了独立节点一(rabbitmq-node1)中的消费者可以接收到消息,独立节点二(rabbitmq-node2)中的消费者也可以接收到消息;即对于同一条消息,会被两个独立节点中的消费者同时消费。这就说明使用 Federation Exchange(联邦交换机)后,消息可以在多个 RabbitMQ 节点(或者集群)中实现交换机级别的广播复制。

Federated Queue

概念介绍

Federated Queue(联邦队列)是 Federation 插件提供的一种机制,用于在一个队列与另一个远程队列之间建立连接,将远程队列的消息拉取(Pull)到本地队列中 的一种机制。它不会复制队列的元数据(如绑定、消费者等),只是将消息从远程传输到本地,消费者只需要订阅本地队列即可消费远程消息。一个联邦队列可以连接一个或者多个上游队列(Upstream Queue), 并从这些上游队列中获取消息以满足本地消费者消费消息的需求。值得一提的是,Federated Queue 可以让多个 RabbitMQ 节点(或者集群)共享消费同一个逻辑队列的数据,即可以在跨节点的多个消费者之间轮询分发消息(效果类似 RabbitMQ 的工作队列模式)。

  • Federated Queue 的工作原理

    • 联邦队列通过 HTTP API 从远程 RabbitMQ 节点拉取(Pull)目标队列的消息。
    • 使用一个 “后台工作者”(Federation Link)定期拉取新消息。
    • 拉取的消息会缓存在本地队列,供本地消费者消费。
    • 联邦队列是基于拉模式(Pull-Based),而非主动推送。
  • Federated Queue 的核心特性

    • 解耦:远程与本地队列解耦,连接断开时可自动恢复。
    • 灵活性:支持配置哪些消息被拉取(如使用 RoutingKey 或 Headers)。
    • 异地容错:适合跨区域部署(如一个数据中心消费另一个数据中心的数据)。
    • 只拉不推:Federated Queue 是拉取(Pull)消息,然后供消费者消费。
  • Federated Queue 的注意事项

    • 联邦队列不适用于要求高实时性、低延迟的场景。
    • 如果远程队列消息量太大,本地可能会堆积消息。
    • 发生网络异常时,消息同步会暂停,可能造成消费延迟。
  • Federated Queue 与普通 Queue 对比

特性 Federated Queue 普通 Queue
消息方向拉取(Pull)本地消息处理
消息中继
延迟控制有延迟(受网络和速率影响)本地处理较快
使用场景跨地区数据聚合常规消息传递

提示

一个联邦队列可以连接一个或者多个上游队列(Upstream Queue),并从这些上游队列中获取消息,以满足本地消费者消费消息的需求。

使用案例

本节将演示如何使用 Federation 插件中的 Federation Queue(联邦队列),整体工作流程如下图所示:

提示

  • 在上面的工作流程图中,使用了 Federation Queue(联邦队列),当发送消息到独立节点一(node1 upstream)的队列时,RabbitMQ 会轮询分发消息给独立节点一(node1 upstream)和独立节点二(node2 upstream)的消费者。
  • 举个例子,当发送消息 A 和消息 B 到独立节点一的队列时,独立节点一的消费者会接收到消息 A,而独立节点二的消费者会接收到消息 B,消息不会被重复消费。
版本说明

本案例使用的各软件版本如下表所示:

组件版本说明
RabbitMQ Server3.8.26
RabbitMQ Client5.10.0
Erlang24.2
Java11
节点说明

这里简单使用两个独立的 RabbitMQ 节点(非同一个集群),并且都已经启用 Federated 插件,如下表所示:

节点的名称节点的 IP 节点暴露的端口节点的主机名称节点的控制台访问地址说明
独立节点一 192.168.2.143567315673rabbitmq-node1http://192.168.2.143:15673作为 Upstream(上游)
独立节点二 192.168.2.144567415674rabbitmq-node2http://192.168.2.144:15674作为 Downstream(下游)
创建 Queue
  • 在 RabbitMQ 独立节点一(rabbitmq-node1)中,创建对应的交换机和队列,并建立绑定关系,可以通过 Web 控制台或者 Java 代码进行创建,同时让消费者订阅队列中的消息。特别注意,在 RabbitMQ 两个独立节点中,队列的名称必须完全一致。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

/**
* RabbitMQ 独立节点一(Upstream)的消费者
*/
public class MQConsumerNode1 {

// 交换机的名称
public static final String EXCHANGE_NAME = "node1_exchange";

// 队列的名称
public static final String QUEUE_NAME = "fed.queue";

// 队列的路由键(绑定键)
public static final String QUEUE_ROUTING_KEY = "routekey";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.148");
factory.setPort(5673);
factory.setUsername("admin");
factory.setPassword("admin");

// 创建连接
Connection connection = factory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 绑定交换机和队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey(如果不使用 Routingkey,可填空字符串)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, QUEUE_ROUTING_KEY);

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("Successed to consume message : " + msg);

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者开始消费消息...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
connection.close();
}

}
  • 在 RabbitMQ 独立节点二(rabbitmq-node2)中,创建对应的交换机和队列,并建立绑定关系,可以通过 Web 控制台或者 Java 代码进行创建,同时让消费者订阅队列中的消息。特别注意,在 RabbitMQ 两个独立节点中,队列的名称必须完全一致。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

/**
* RabbitMQ 独立节点二(Downstream)的消费者
*/
public class MQConsumerNode2 {

// 交换机的名称
public static final String EXCHANGE_NAME = "node2_exchange";

// 队列的名称
public static final String QUEUE_NAME = "fed.queue";

// 队列的路由键(绑定键)
public static final String QUEUE_ROUTING_KEY = "routekey";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.148");
factory.setPort(5674);
factory.setUsername("admin");
factory.setPassword("admin");

// 创建连接
Connection connection = factory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 绑定交换机和队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey(如果不使用 Routingkey,可填空字符串)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, QUEUE_ROUTING_KEY);

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("Successed to consume message : " + msg);

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者开始消费消息...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
connection.close();
}

}
添加 Upstream
  • 在 RabbitMQ 独立节点二(rabbitmq-node2)的 Web 控制台中,添加节点一(rabbitmq-node1)作为 Upstream。

参数说明:
Name:Upstream 的名称
URI:Upstream 的 URI 地址,其中 amqp:// 是协议前缀,admin:admin 用户名和密码,@rabbitmq-node1 是主机名称。
Acknowledgement Mode:下游从上游拉取消息后,何时向上游确认(ACK)这条消息已被 “接收” 了。它控制的是 Federation Queue 与上游的消息拉取确认行为,而不是消费者消费消息的 ACK 行为。

特别注意

Acknowledgement Mode 不是指消费者的 ACK 机制,也不等同于 RabbitMQ Queue 中的消息消费确认机制,只是针对 Federation Queue 拉取消息的内部行为。Acknowledgement Mode 的三种取值含义如下表所示。

确认模式是否等待下游 Confirm 是否等待消费者处理具体含义风险 / 特点
on-confirm✅ 等待 confirm❌ 不管默认值,Federation Queue 拉取到消息后,等下游的 Queue 成功接收(指消息已经被 RabbitMQ 成功、安全地写入下游队列的存储结构中,包括内存或磁盘)后才向上游 ACK 最安全,可靠性最高,推荐用于关键业务
on-publish❌ 不等待❌ 不管拉取到消息并成功发布(指投递动作没有抛出错误,如队列不存在、连接中断等)到下游的 Queue 后立即 ACK,不等待 Confirm 性能优于 on-confirm,风险略高
no-ack❌ 不等待❌ 不管拉取到消息立即 ACK,不管下游的 Queue 是否成功接收消息风险最大,易丢消息(断线、未投递成功等)
添加策略(Policy)
  • 在 RabbitMQ 独立节点二(rabbitmq-node2)的 Web 控制台中,添加策略(Policy),让名称为 node1-as-upstream 的 Upstream 应用在所有以 fed. 开头的队列。

查看联邦队列
  • 当 Upstream 和策略(Policy)都添加完成,并且队列也创建完成后,如果一切正常运行,那么在独立节点二(rabbitmq-node2)的 Web 控制台中可以看到联邦状态显示正常。

特别注意

只有独立节点二(rabbitmq-node2)中的队列已经创建好,RabbitMQ 的 Web 控制台才会正常显示对应的联邦状态。

测试联邦队列
  • 创建一个生产者,发送消息到独立节点一(rabbitmq-node1)中的队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.nio.charset.StandardCharsets;

/**
* RabbitMQ 独立节点一(Upstream)的生产者
*/
public class MQProducerNode1 {

// 交换机的名称
public static final String EXCHANGE_NAME = "node1_exchange";

// 队列的路由键(绑定键)
public static final String QUEUE_ROUTING_KEY = "routekey";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.148");
factory.setPort(5673);
factory.setUsername("admin");
factory.setPassword("admin");

// 使用 try-with-resources 自动关闭连接和通道,确保资源释放
try (
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel()
) {
// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

// 发送消息
// 参数说明:
// exchange – 要将消息发布到的交换机,空字符串表示默认交换机
// routingKey – 路由 Key
// props – 消息的其他属性,比如:使用 MessageProperties.PERSISTENT_TEXT_PLAIN 属性确保消息持久化,配合持久化队列可避免服务器重启导致消息丢失
// body – 消息内容
channel.basicPublish(EXCHANGE_NAME, QUEUE_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, "hello".getBytes(StandardCharsets.UTF_8));
}
}

}
  • 代码测试结果:启动所有生产者和消费者应用,由于在独立节点二(rabbitmq-node2)中配置了 Federation Queue(联邦队列),当发送消息到 RabbitMQ 独立节点一(rabbitmq-node1)的队列时,RabbitMQ 会轮询分发消息给独立节点一(rabbitmq-node1)和独立节点二(rabbitmq-node2)的消费者。举个例子,当发送消息 A 和消息 B 到独立节点一的队列时,独立节点一的消费者会接收到消息 A,而独立节点二的消费者会接收到消息 B,消息不会被重复消费。这就说明使用 Federation Queue(联邦队列)后,可以在多个 RabbitMQ 节点(或者集群)之间为单个队列提供均衡负载的功能,即在跨节点的多个消费者之间轮询分发消息(效果类似 RabbitMQ 的工作队列模式)。

RabbitMQ Shovel 插件

概述介绍

RabbitMQ 的 Shovel 插件用于实现不同 RabbitMQ 服务器或虚拟主机(Vhost)之间的消息转发,其核心作用是 “铲子” 式地将消息从一个队列 “铲” 到另一个位置。Shovel 插件能够从一个 Broker 中的队列(作为源端,即 Source)中拉取(Pull)消息,并将其转发至另一个 Broker 中的交换机 / 队列(作为目标端,即 Destination)。源端的队列和目标端的交换机 / 队列既可以分别位于不同的 Broker,也可以位于同一个 Broker 中。这种机制使得 Shovel 插件特别适用于跨数据中心消息传输、系统迁移、灾备同步等场景,确保消息可靠地在 RabbitMQ 实例间流动。简而言之,Shovel 是一个 “拉取消息再发送出去” 的搬运工,即基于拉取(Pull)模式从源端的队列获取消息,再推送(Push)消息到目标端的交换机 / 队列。

  • Shovel 插件的工作原理

    • Shovel 连接到源服务器(Source)并拉取指定队列。
    • 拉取到消息后,将消息转发到目标服务器(Destination)。
    • 确保目标端写入成功后,才确认(ACK)消费源端消息(保证消息不丢)。
    • 若出现连接中断,会自动重试连接并继续搬运。
  • Shovel 插件的核心特性

    • 跨实例传输:支持在不同 RabbitMQ 实例间转发消息
    • 持续运行:它是一种持续运行的转发机制(非一次性)
    • 可靠性:支持自动重连、持久化、确认机制
    • 可配置性:支持静态配置(Static Shovel)和动态配置(Dynamic Shovel)
    • 支持 AMQP 0.9.1:兼容 RabbitMQ 默认协议
  • Shovel 插件的注意事项

    • 大量 Shovel 实例会消耗较多连接资源、通道和内存,建议限制并发 Shovel 数量。
    • 消息顺序不能百分百保证,特别在连接中断重连、存在多个 Shovel 的情况下。
    • 若业务对消息顺序、消息重复敏感,则需要在消费端做幂等性处理或顺序校验。
    • 可通过 RabbitMQ 管理控制台或 HTTP API 监控 Shovel 状态,推荐集成监控系统报警异常。
  • Shovel 插件的使用场景

    • 跨数据中心的消息转发
    • 消息系统迁移
    • 多活集群的消息同步
    • 灾备系统的消息复制
  • Shovel 插件的配置说明

    • 源端和目标端都应设置合理的权限控制,避免越权访问。
    • 参数 ack-mode 建议设置为 on-confirmon-publish,以保证消息成功写入目标端后再确认源端消费,避免消息丢失。
    • 可以设置 prefetch-count(预取数量)来控制一次拉取的消息数量,避免消息积压。
    • 连接异常时,Shovel 会自动尝试重连,使用 reconnect-delay 参数设置重连间隔,比如 reconnect-delay: 5
    • 静态配置(Static Shovel):通过 rabbitmq.conf 文件或环境变量配置,适合在服务启动前就确定连接和队列信息的场景。
    • 动态配置(Dynamic Shovel):使用 RabbitMQ Management UI 或 HTTP API 动态创建和管理 Shovel,适合运行时灵活调整。

总结

  • Shovel 插件是基于消费机制实现的,它从队列中 "消费" 消息(就像一个消费者一样),所以消息源(源端 - Source)必须是一个队列,消息目标(目标端 - Destination)则通常是交换机,间接也支持目标端为队列。
  • 在 RabbitMQ 控制台的配置页面中,虽然 Shovel 的源端(Source)可以选择是交换机,但本质上它依然是通过中间队列消费消息。Shovel 插件不能作为 "监听交换机" 的工具,而是从现有队列中读取消息进行再发布。

Shovel 插件与 Federated 插件的对比

特性 / 机制 Shovel 插件 Federated Exchange(联邦交换机)Federated Queue(联邦队列)
连接发起方目标端主动连接源端目标端主动连接源端目标端主动连接源端
消息获取模式从源队列拉取(Pull)从源交换机拉取(Pull)从源队列拉取并消费
源端行为类似普通 Broker,无特殊行为类似普通 Broker,无特殊行为类似普通 Broker,无特殊行为
目标端行为拉取消息 → 发布到目标交换机 / 队列拉取消息 → 再次路由到本地交换机拉取消息 → 直接投递到本地队列
支持方向支持队列到交换机、队列到队列仅支持交换机级别联邦仅支持队列级别联邦
拉取粒度手动配置:逐条、批量、确认等自动控制,类似 AMQP 消费者自动控制,类似 AMQP 消费者
使用方式插件 + 显式配置(静态 / 动态)插件 + 通过策略自动应用插件 + 通过策略自动应用
适用场景精细控制、混合部署、定制路由多站点广播、发布同步多个节点共享消费同一个逻辑队列的数据(即轮询分发消息给多个消费者)
消息可靠性控制高(支持 ACK、持久化等)中等(依赖 AMQP 投递机制)中等(依赖消费确认机制)

安装插件

  • 由于 RabbitMQ 默认自带了 Shovel 插件,包括 rabbitmq_shovelrabbitmq_shovel_management,因此只需要在所需的 RabbitMQ 节点上分别启用 Shovel 插件即可,如下所示:
1
2
3
4
5
# 启用 rabbitmq_shovel 插件
rabbitmq-plugins enable rabbitmq_shovel

# 启用 rabbitmq_shovel_management 插件
rabbitmq-plugins enable rabbitmq_shovel_management
  • 当 Shovel 插件成功启用后,在 RabbitMQ 节点的控制台页面中,可以看到以下内容:

使用案例

本节将演示如何使用 Shovel 插件,整体工作流程如下图所示:

版本说明

本案例使用的各软件版本如下表所示:

组件版本说明
RabbitMQ Server3.8.26
RabbitMQ Client5.10.0
Erlang24.2
Java11

节点说明

这里简单使用两个独立的 RabbitMQ 节点(非同一个集群),并且都已经启用 Shovel 插件,如下表所示:

节点的名称节点的 IP 节点暴露的端口节点的主机名称节点的控制台访问地址说明
独立节点一 192.168.2.143567315673rabbitmq-node1http://192.168.2.143:15673作为源端(Source)
独立节点二 192.168.2.144567415674rabbitmq-node2http://192.168.2.144:15674作为目标端(Destination)

创建队列

  • 在 RabbitMQ 独立节点一(rabbitmq-node1)中,创建对应的交换机和队列,并建立绑定关系,可以通过 Web 控制台或者 Java 代码进行创建,同时让消费者订阅队列中的消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

/**
* RabbitMQ 独立节点一(Source)的消费者
*/
public class MQConsumerNode1 {

// 交换机的名称
public static final String EXCHANGE_NAME = "shovel1_exchange";

// 队列的名称
public static final String QUEUE_NAME = "Q1";

// 队列的路由键(绑定键)
public static final String QUEUE_ROUTING_KEY = "routekey.q1";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.148");
factory.setPort(5673);
factory.setUsername("admin");
factory.setPassword("admin");

// 创建连接
Connection connection = factory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 绑定交换机和队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey(如果不使用 Routingkey,可填空字符串)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, QUEUE_ROUTING_KEY);

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("Successed to consume message : " + msg);

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者开始消费消息...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
connection.close();
}

}
  • 在 RabbitMQ 独立节点二(rabbitmq-node2)中,创建对应的交换机和队列,并建立绑定关系,可以通过 Web 控制台或者 Java 代码进行创建,同时让消费者订阅队列中的消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

/**
* RabbitMQ 独立节点二(Destination)的消费者
*/
public class MQConsumerNode2 {

// 交换机的名称
public static final String EXCHANGE_NAME = "shovel2_exchange";

// 队列的名称
public static final String QUEUE_NAME = "Q2";

// 队列的路由键(绑定键)
public static final String QUEUE_ROUTING_KEY = "routekey.q2";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.148");
factory.setPort(5674);
factory.setUsername("admin");
factory.setPassword("admin");

// 创建连接
Connection connection = factory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 绑定交换机和队列
// 参数说明:
// queue – 队列的名称
// exchange – 交换机的名称
// routingKey – 用于绑定的 RoutingKey(如果不使用 Routingkey,可填空字符串)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, QUEUE_ROUTING_KEY);

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("Successed to consume message : " + msg);

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者开始消费消息...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
connection.close();
}

}

添加 Shovel

  • 在 RabbitMQ 控制台的配置页面中,添加 Shovel 时可以将源端(Source)和目标端(Destination)设置为交换机或者队列,其中 RabbitMQ 的默认行为如下:
源端的选择默认行为
如果源端(Source)选择的是 QueueRabbitMQ 会在创建 Shovel 之前,预先自动创建该队列(如果队列不存在就自动创建)
如果源端(Source)选择的是 ExchangeRabbitMQ 不会预先自动创建该交换机,但如果源端(Source)是 Exchange,会自动创建一个内部队列,并绑定到这个 Exchange,以便从 Exchange 拉取消息
  • 在 RabbitMQ 独立节点二(rabbitmq-node2)的 Web 控制台中,添加 Shovel,如下图所示:

参数说明:
Name:Shovel 的名称
URI:节点 的 URI 地址,其中 amqp:// 是协议前缀,admin:admin 用户名和密码,@rabbitmq-node1 是主机名称。

特别注意

Shovel 是 "谁配置,谁搬运",一般推荐将 Shovel 添加在目标端(Destination)节点上,也可以添加在中立第三方节点。这样目标端可以主动拉取消息,更安全可控,易于统一监控与运维。

查看 Shovel

  • 当 Shovel 添加完成后,并且交换机 / 队列也创建完成后,如果一切正常运行,那么在独立节点二(rabbitmq-node2)的 Web 控制台中可以看到 Shovel 状态显示正常。

测试 Shovel

  • 创建一个生产者,发送消息到独立节点一(rabbitmq-node1)中的队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.nio.charset.StandardCharsets;

/**
* RabbitMQ 独立节点一(Source)的生产者
*/
public class MQProducerNode1 {

// 交换机的名称
public static final String EXCHANGE_NAME = "shovel1_exchange";

// 队列的路由键(绑定键)
public static final String QUEUE_ROUTING_KEY = "routekey.q1";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.2.148");
factory.setPort(5673);
factory.setUsername("admin");
factory.setPassword("admin");

// 使用 try-with-resources 自动关闭连接和通道,确保资源释放
try (
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel()
) {
// 声明交换机
// 参数说明:
// exchange – 交换机的名称
// type – 交换机类型
// durable – 如果需要声明一个持久交换机,则为 true(交换机将在服务器重启后继续存在)
// autoDelete – 如果需要声明 autoDelete 交换机,则为 true(当最后一个绑定队列解除绑定后,自动删除该交换机)
// arguments – 交换的其他属性(构造参数)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);

// 发送消息
// 参数说明:
// exchange – 要将消息发布到的交换机,空字符串表示默认交换机
// routingKey – 路由 Key
// props – 消息的其他属性,比如:使用 MessageProperties.PERSISTENT_TEXT_PLAIN 属性确保消息持久化,配合持久化队列可避免服务器重启导致消息丢失
// body – 消息内容
channel.basicPublish(EXCHANGE_NAME, QUEUE_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, "hello".getBytes(StandardCharsets.UTF_8));
}
}

}
  • 代码测试结果:启动所有生产者和消费者应用,由于在独立节点二(rabbitmq-node2)中配置了 Shovel,且源端(Source)和目标端(Destination)选择的都是队列,当发送消息到 RabbitMQ 独立节点一(rabbitmq-node1)的队列时,RabbitMQ 会轮询分发消息给独立节点一(rabbitmq-node1)和独立节点二(rabbitmq-node2)的消费者。举个例子,当发送消息 A 和消息 B 到独立节点一的队列时,独立节点一的消费者会接收到消息 A,而独立节点二的消费者会接收到消息 B,消息不会被重复消费。

使用总结说明

在 RabbitMQ 的 Web 控制台中,对于 Shovel 的源端(Source)和目标端(Destination)的不同配置,有着不同的运行效果:

源端选择目标端选择实际运行效果运行效果类比
队列队列从源队列消费消息,重新发布到目标队列(目标队列可在不同实例),实现真正的 “消息搬运” 运行效果类似 Federated Queue(联邦队列),可以让多个 RabbitMQ 节点(或者集群)共享消费同一个逻辑队列的数据,即可以在跨节点的多个消费者之间轮询分发消息
交换机交换机自动绑定一个临时队列到源交换机 → 从该队列消费消息 → 发布到目标交换机运行效果类似 Federated Exchange(联邦交换机),可实现广播复制效果
队列交换机从源队列消费消息,重新发布到目标交换机,再由目标交换机路由到绑定的队列最常见用法,灵活可靠
交换机队列自动绑定一个临时队列到源交换机 → 从该队列消费消息 → 直接投递到目标队列支持,但配置上需要确保匹配路由键