大纲
前言
学习资源
RabbitMQ 发布确认
背景介绍
在生产环境中,由于一些不明原因导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者投递消息失败,导致消息丢失,需要手动处理和恢复。那么,如何才能实现 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,比如 RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?这时候就需要使用到 RabbitMQ 的发布确认机制来解决。
概念介绍
发布确认机制是 RabbitMQ 提供的一种用于确认生产者是否将消息成功发送到交换机的机制。它允许生产者在发送消息后收到 Broker 的确认(ACK)或否认(NACK)反馈,从而确认消息是否真正被 RabbitMQ 接收和处理。默认情况下,RabbitMQ 的消息发送是 “发出去就不管了” 的模型(即 Fire-And-Forget)。在网络抖动、服务重启、资源满载等异常情况下,消息有可能根本没被 RabbitMQ 接收到,就悄悄丢失了。发布确认机制就是为了解决这个问题。
实现方案
RabbitMQ 消息发布确认的常见实现方案如下图所示:
![]()
幂等消费
消息发送失败,在消息重新发送后,为了避免消费者在消费消息时可能出现重复消费的问题,消费者需要实现幂等消费。
使用案例
这里将演示 SpringBoot 项目如何使用 RabbitMQ 的发布确认机制,队列与交换机的绑定关系如下图所示:
![]()
代码下载
本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-16
。
版本说明
本案例使用的各软件版本如下表所示:
组件 | 版本 | 说明 |
---|
RabbitMQ Server | 3.8.26 | RabbitMQ 服务器 |
SpringBoot | 2.7.18 | |
Erlang | 24.2 | |
Java | 11 | |
案例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| <properties> <spring-boot.version>2.7.18</spring-boot.version> </properties>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| server: port: 8080
spring: application: name: rabbit-application rabbitmq: host: 192.168.2.127 port: 5672 password: admin username: admin virtual-host: / publisher-confirm-type: correlated
|
参数 publisher-confirm-type 的使用说明
上面的 publisher-confirm-type
参数用于确认消息是否成功到达交换机(Exchange),也叫做发布确认机制(Publisher Confirms)。该参数的可选值及其含义如下:
可选值 | 含义 |
---|
none | 禁用发布确认机制,是默认值。 |
correlated | 启用基于 CorrelationData 的发布确认机制,会调用自定义实现的 RabbitTemplate.ConfirmCallback 接口中的 confirm() 方法。 |
simple | 启用简单模式的发布确认机制。Spring 会使用内部的发布确认逻辑(不是基于回调的),但不会像 correlated 那样提供精细的回调。 |
特别注意
当 publisher-confirm-type
的值设置为 simple
,经测试有两种效果。其一效果和 correlated
值一样会触发 RabbitTemplate.ConfirmCallback
接口的回调方法;其二效果在发布消息成功后,当使用 RabbitTemplate 调用 waitForConfirms()
或 waitForConfirmsOrDie()
方法等待 Broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,此时需要注意的是 waitForConfirmsOrDie()
方法如果返回 false
,则会关闭 Channel,导致接下来无法再发送消息到 Broker。
- 消息发布确认的回调实现类(实现了
RabbitTemplate.ConfirmCallback
接口)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;
@Slf4j @Component public class CustomConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String msgId = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("ID 为 {} 的消息成功发送到交换机", msgId); } else { log.error("ID 为 {} 的消息发送到交换机失败,原因:{}", msgId, cause); } }
}
|
特别注意
实现 RabbitTemplate.ConfirmCallback
接口后,如果消息成功发送到交换机,但是交换机根据 RoutingKey 无法将消息路由到匹配的队列,消息会被丢弃,但 ack
参数的值仍然为 true
。换句话说,在仅启用发布确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息;如果交换机在投递消息时,发现消息不可路由(比如路由键写错、没有匹配的队列等),那么消息会被直接丢弃,此时消息生产者是不知道消息被丢弃这个事件的。如果需要解决这个问题,则需要使用到 RabbitMQ 的消息退回机制或者备份交换机。
- RabbitTemplate 的配置类(设置 RabbitTemplate 消息发布确认的回调实现类)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitConfig {
@Autowired private RabbitTemplate.ConfirmCallback confirmCallback;
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(confirmCallback); return rabbitTemplate; }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class QueueConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String CONFIRM_QUEUE_ROUTING_KEY = "key1";
@Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE_NAME); }
@Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); }
@Bean public Binding bindingQueue(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_QUEUE_ROUTING_KEY); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import com.clay.rabbitmq.config.QueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import java.util.Date; import java.util.UUID;
@Slf4j @RestController @RequestMapping("/confirm") public class ProducerController {
@Resource private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{msg}") public String sendMsg(@PathVariable("msg") String message) { log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(QueueConfig.CONFIRM_EXCHANGE_NAME, QueueConfig.CONFIRM_QUEUE_ROUTING_KEY, message, correlationData); return "success"; }
@GetMapping("/sendNoExchangeMsg/{msg}") public String sendNoExchangeMsg(@PathVariable("msg") String message) { log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("xxxx", QueueConfig.CONFIRM_QUEUE_ROUTING_KEY, message, correlationData); return "success"; }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import com.clay.rabbitmq.config.QueueConfig; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets; import java.util.Date;
@Slf4j @Component public class QueueConsumer {
@RabbitListener(queues = QueueConfig.CONFIRM_QUEUE_NAME) public void receiveMsg(Message message, Channel channel) { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("当前时间: {}, 接收到信息: {}", new Date(), msg); }
}
|
1 2 3 4 5 6 7 8 9 10 11
| import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class RabbitApplication {
public static void main(String[] args) { SpringApplication.run(RabbitApplication.class, args); }
}
|
特别注意
Spring Boot 2.x
之前,需要在主启动类上添加 @EnableRabbit
注解。
代码测试
1 2 3
| INFO 51355 --- [nio-8080-exec-1] c.c.r.controller.ProducerController : 当前时间: Wed Apr 22 21:07:01 CST 2021, 发送一条信息给队列: hello INFO 51355 --- [ntContainer#0-1] c.clay.rabbitmq.consumer.QueueConsumer : 当前时间: Wed Apr 22 21:07:01 CST 2021, 接收到信息: hello INFO 51355 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 4520dc03-0c8a-425d-8230-8440cffa3d12 的消息成功发送到交换机
|
- (3) 通过浏览器调用接口
http://127.0.0.1:8080/confirm/sendNoExchangeMsg/hello
,SpringBoot 应用会在控制台打印以下日志信息:
1 2
| INFO 51355 --- [nio-8080-exec-1] c.c.r.controller.ProducerController : 当前时间: Wed Apr 22 21:08:15 CST 2021, 发送一条信息给队列: hello INFO 51355 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 56a86b63-2641-4f04-b96b-ef8e64ac20ce 的消息发送到交换机失败,原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'xxxx' in vhost '/', class-id=60, method-id=40)
|
- (4) 在第二次调用发送消息的接口时,由于交换机名称不正确,消息无法发送到任何交换机,导致消息投递失败,最终 Broker 会发送消息投递失败的通知(NACK)给生产者。
底层实现
在 SpringBoot 项目中,推荐使用 publisher-confirm-type: correlated
+ RabbitTemplate.ConfirmCallback
来实现 RabbitMQ 的发布确认机制。它们本质上就是对 RabbitMQ 原生 API 的封装,也就是封装了 channel.confirmSelect()
+ channel.addConfirmListener(ackCallback, nackCallback)
,都属于异步确认发布操作;省去了开发者手动管理 deliveryTag
、注册监听器、处理多线程等麻烦,适合绝大多数业务开发场景。其底层实现原理如下:
- 当配置了
publisher-confirm-type: correlated
,Spring 内部实际上就是帮开发者开启了 channel.confirmSelect()
,并注册了一个 ConfirmListener
监听器。 - 开发者自定义实现的
RabbitTemplate.ConfirmCallback
是 Spring 把 ConfirmListener
回调事件封装后再分发出来的。 - 也就是说,开发者使用的是 Spring 管理版的
channel.addConfirmListener(ackCallback, nackCallback)
,只是更高级、更好用而已。
RabbitMQ 消息退回
背景介绍
通过上面的案例,可以知道在仅启用 RabbitMQ 发布确认机制的情况下,交换机接收到消息后,会直接给生产者发送确认消息;如果交换机在投递消息时,发现消息不可路由(比如路由键写错、没有匹配的队列等),那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。为了解决这个问题,需要使用到 RabbitMQ 的消息退回机制;通过设置 mandatory = true
,可以让消息在投递过程中出现无法路由的时候,将消息退回给生产者。
概念介绍
RabbitMQ 的消息退回机制是指当消息成功发送到了交换机,但由于某些原因交换机无法将消息路由到任何队列(比如路由键写错、没有匹配的队列等),RabbitMQ 将这条无法被路由的消息退回给生产者的一种机制。默认情况下,在不使用消息退回机制时(比如:不设置 mandatory = true
),RabbitMQ 会直接丢弃无法路由的消息,不会有任何提示信息。特别注意:消息退回机制只在以下三个条件同时满足时才会生效:
- 消息已成功到达交换机
- 交换机无法将消息路由到匹配的队列
- 设置了
mandatory = true
(否则消息会被直接丢弃)
使用案例
这里将演示 SpringBoot 项目如何使用 RabbitMQ 的发布确认机制和消息退回机制,队列与交换机的绑定关系如下图所示:
![]()
代码下载
本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-17
。
版本说明
本案例使用的各软件版本如下表所示:
组件 | 版本 | 说明 |
---|
RabbitMQ Server | 3.8.26 | RabbitMQ 服务器 |
SpringBoot | 2.7.18 | |
Erlang | 24.2 | |
Java | 11 | |
案例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| <properties> <spring-boot.version>2.7.18</spring-boot.version> </properties>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| server: port: 8080
spring: application: name: rabbit-application rabbitmq: host: 192.168.2.127 port: 5672 password: admin username: admin virtual-host: / publisher-confirm-type: correlated publisher-returns: true
|
参数 publisher-confirm-type 的使用说明
上面的 publisher-confirm-type
参数用于确认消息是否成功到达交换机(Exchange),也叫做发布确认机制(Publisher Confirms)。该参数的可选值及其含义如下:
可选值 | 含义 |
---|
none | 禁用发布确认机制,是默认值。 |
correlated | 启用基于 CorrelationData 的发布确认机制,会调用自定义实现的 RabbitTemplate.ConfirmCallback 接口中的 confirm() 方法。 |
simple | 启用简单模式的发布确认机制。Spring 会使用内部的发布确认逻辑(不是基于回调的),但不会像 correlated 那样提供精细的回调。 |
特别注意
当 publisher-confirm-type
的值设置为 simple
,经测试有两种效果。其一效果和 correlated
值一样会触发 RabbitTemplate.ConfirmCallback
接口的回调方法;其二效果在发布消息成功后,当使用 RabbitTemplate 调用 waitForConfirms()
或 waitForConfirmsOrDie()
方法等待 Broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,此时需要注意的是 waitForConfirmsOrDie()
方法如果返回 false
,则会关闭 Channel,导致接下来无法再发送消息到 Broker。
参数 publisher-returns 的使用说明
上面的 publisher-returns
参数用于启用 RabbitMQ 的消息退回机制。通过 publisher-returns: true
启用消息退回机制后,再配合 rabbitTemplate.setMandatory(true);
,就可以让自定义实现的 ReturnsCallback
回调方法被触发,这样就能知道哪条消息发送失败了。
特别注意
publisher-returns: true
启用的是 RabbitMQ 的消息退回机制。rabbitTemplate.setMandatory(true);
是告诉 RabbitMQ:"如果消息不能路由成功,就把消息退回给生产者,不要直接丢弃"。- 两者必须搭配使用,
publisher-returns
控制消息退回功能是否启用,mandatory
控制消息是否可退回,然后才能在 RabbitTemplate.ReturnsCallback
中接收到消息退回的通知。
- 消息发布确认的回调实现类(实现了
RabbitTemplate.ConfirmCallback
接口)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;
@Slf4j @Component public class CustomConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String msgId = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("ID 为 {} 的消息成功发送到交换机", msgId); } else { log.error("ID 为 {} 的消息发送到交换机失败,原因:{}", msgId, cause); } }
}
|
特别注意
实现 RabbitTemplate.ConfirmCallback
接口后,如果消息成功发送到交换机,但是交换机根据 RoutingKey 无法将消息路由到匹配的队列,消息会被丢弃,但 ack
参数的值仍然为 true
。换句话说,在仅启用发布确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息;如果交换机在投递消息时,发现消息不可路由(比如路由键写错、没有匹配的队列等),那么消息会被直接丢弃,此时消息生产者是不知道消息被丢弃这个事件的。如果需要解决这个问题,则需要使用到 RabbitMQ 的消息退回机制或者备份交换机。
- 消息退回的回调实现类(实现了
RabbitTemplate.ReturnsCallback
接口)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets;
@Slf4j @Component public class CustomReturnsCallback implements RabbitTemplate.ReturnsCallback {
@Override public void returnedMessage(ReturnedMessage returned) { String cause = returned.getReplyText(); String exchagne = returned.getExchange(); String routingKey = returned.getRoutingKey(); String message = new String(returned.getMessage().getBody(), StandardCharsets.UTF_8); log.error("消息 {} 被交换机 {} 退回, 路由键: {}, 退回原因: {}", message, exchagne, routingKey, cause); }
}
|
- RabbitTemplate 的配置类(设置 RabbitTemplate 消息发布确认和消息退回的回调实现类)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitConfig {
@Autowired private RabbitTemplate.ConfirmCallback confirmCallback;
@Autowired private RabbitTemplate.ReturnsCallback returnsCallback;
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnsCallback);
return rabbitTemplate; }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class QueueConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String CONFIRM_QUEUE_ROUTING_KEY = "key1";
@Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE_NAME); }
@Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); }
@Bean public Binding bindingQueue(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_QUEUE_ROUTING_KEY); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import com.clay.rabbitmq.config.QueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import java.util.Date; import java.util.UUID;
@Slf4j @RestController @RequestMapping("/confirm") public class ProducerController {
@Resource private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{msg}") public String sendMsg(@PathVariable("msg") String message) { log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(QueueConfig.CONFIRM_EXCHANGE_NAME, QueueConfig.CONFIRM_QUEUE_ROUTING_KEY, message, correlationData); return "success"; }
@GetMapping("/sendNoRouteMsg/{msg}") public String sendNoRouteMsg(@PathVariable("msg") String message) { log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(QueueConfig.CONFIRM_EXCHANGE_NAME, "", message, correlationData); return "success"; }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import com.clay.rabbitmq.config.QueueConfig; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets; import java.util.Date;
@Slf4j @Component public class QueueConsumer {
@RabbitListener(queues = QueueConfig.CONFIRM_QUEUE_NAME) public void receiveMsg(Message message, Channel channel) { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("当前时间: {}, 接收到信息: {}", new Date(), msg); }
}
|
1 2 3 4 5 6 7 8 9 10 11
| import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class RabbitApplication {
public static void main(String[] args) { SpringApplication.run(RabbitApplication.class, args); }
}
|
特别注意
Spring Boot 2.x
之前,需要在主启动类上添加 @EnableRabbit
注解。
代码测试
1 2 3
| INFO 51355 --- [nio-8080-exec-1] c.c.r.controller.ProducerController : 当前时间: Wed Apr 22 22:07:01 CST 2021, 发送一条信息给队列: hello INFO 51355 --- [ntContainer#0-1] c.clay.rabbitmq.consumer.QueueConsumer : 当前时间: Wed Apr 22 22:07:01 CST 2021, 接收到信息: hello INFO 51355 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 4520dc03-0c8a-425d-8230-8440cffa3d12 的消息成功发送到交换机
|
- (3) 通过浏览器调用接口
http://127.0.0.1:8080/confirm/sendNoRouteMsg/hello
,SpringBoot 应用会在控制台打印以下日志信息:
1 2 3
| INFO 71199 --- [nio-8080-exec-1] c.c.r.controller.ProducerController : 当前时间: Wed Apr 22 22:27:23 CST 2021, 发送一条信息给队列: hello INFO 71199 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 a08f5856-e75e-496b-b95b-a1cb6220b954 的消息成功发送到交换机 ERROR 71199 --- [nectionFactory1] c.c.r.callback.CustomReturnsCallback : 消息 hello 被交换机 confirm.exchange 退回, 路由键: xxxx, 退回原因: NO_ROUTE
|
- (4) 在第二次调用发送消息的接口时,由于路由键(RoutingKey)不正确,交换机无法将消息路由到任何队列,导致消息投递失败,最终 Broker 会将消息退回给生产者。
RabbitMQ 备份交换机
背景介绍
在 RabbitMQ 中,开启 mandatory
参数并结合消息退回机制,确实可以帮助我们感知那些无法被正常路由的消息,从而在消息无法投递到队列时及时发现并进行处理。然而,现实中的处理过程往往并不理想。多数情况下,我们只能简单地打印一条日志,或触发一个告警,然后手动介入处理。这种方式不仅效率低下,而且当生产者服务以集群方式部署在多台机器上时,手动收集日志的成本较高、容易出错,处理体验非常不优雅。更重要的是,一旦启用消息退回机制,就意味着生产者需要额外编写代码逻辑来处理这些被退回的消息,增加了系统的复杂度。此时我们面临一个两难选择:要么丢弃消息,要么增加额外的开发和维护负担。那么,有没有一种方式,既能防止消息丢失,又能不增加生产者的复杂性呢?前面我们提到过死信队列,它可以用来保存那些被消费者拒绝或处理失败的消息。但死信队列只适用于进入队列之后的异常处理,而无法覆盖根本未进入队列的情况 —— 也就是说,那些因为找不到匹配队列而直接被交换机退回的消息,是无法被死信机制捕获的。此时,就需要使用 RabbitMQ 提供的备份交换机(Alternate Exchange)机制来解决这个问题。
概念介绍
备份交换机(Alternate Exchange)可以理解为一个交换机的 “兜底方案” 或 “备用出口”。当某个交换机收到一条消息,而这条消息没有匹配任何已绑定的队列时,RabbitMQ 会自动将这条消息转发给该交换机所指定的备份交换机,由备份交换机来处理这类 “不可路由的消息”。通常,备份交换机会设置为 Fanout
类型,以便将消息广播投递到其下绑定的所有队列中。我们只需在备份交换机上绑定一个(或多个)” 回收队列 / 备份队列”,即可集中存储这些不可路由的消息。同时,还可以在备份交换机上额外绑定一个报警队列,由专门的消费者监听该队列,用于实时触发监控和告警。这样一来,不仅避免了消息丢失,也无需在生产者端增加任何额外的处理逻辑,实现了消息可靠性的同时保持了系统简洁性。
特别注意
当 RabbitMQ 的消息退回机制和备份交换机同时启用的时候,备份交换机的优先级更高;也就是说,无法路由的消息会被转发给备份交换机处理,而不会直接退回给消息生产者。
使用案例
这里将演示 SpringBoot 项目如何使用 RabbitMQ 的发布确认机制 + 消息退回机制 + 备份交换机,队列与交换机的绑定关系如下图所示:
![]()
代码下载
本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-18
。
版本说明
本案例使用的各软件版本如下表所示:
组件 | 版本 | 说明 |
---|
RabbitMQ Server | 3.8.26 | RabbitMQ 服务器 |
SpringBoot | 2.7.18 | |
Erlang | 24.2 | |
Java | 11 | |
案例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| <properties> <spring-boot.version>2.7.18</spring-boot.version> </properties>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| server: port: 8080
spring: application: name: rabbit-application rabbitmq: host: 192.168.2.127 port: 5672 password: admin username: admin virtual-host: / publisher-confirm-type: correlated publisher-returns: true
|
- 消息发布确认的回调实现类(实现了
RabbitTemplate.ConfirmCallback
接口)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;
@Slf4j @Component public class CustomConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String msgId = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("ID 为 {} 的消息成功发送到交换机", msgId); } else { log.error("ID 为 {} 的消息发送到交换机失败,原因:{}", msgId, cause); } }
}
|
特别注意
实现 RabbitTemplate.ConfirmCallback
接口后,如果消息成功发送到交换机,但是交换机根据 RoutingKey 无法将消息路由到匹配的队列,消息会被丢弃,但 ack
参数的值仍然为 true
。换句话说,在仅启用发布确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息;如果交换机在投递消息时,发现消息不可路由(比如路由键写错、没有匹配的队列等),那么消息会被直接丢弃,此时消息生产者是不知道消息被丢弃这个事件的。如果需要解决这个问题,则需要使用到 RabbitMQ 的消息退回机制或者备份交换机。
- 消息退回的回调实现类(实现了
RabbitTemplate.ReturnsCallback
接口)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets;
@Slf4j @Component public class CustomReturnsCallback implements RabbitTemplate.ReturnsCallback {
@Override public void returnedMessage(ReturnedMessage returned) { String cause = returned.getReplyText(); String exchagne = returned.getExchange(); String routingKey = returned.getRoutingKey(); String message = new String(returned.getMessage().getBody(), StandardCharsets.UTF_8); log.error("消息 {} 被交换机 {} 退回, 路由键: {}, 退回原因: {}", message, exchagne, routingKey, cause); }
}
|
- RabbitTemplate 的配置类(设置 RabbitTemplate 消息发布确认和消息退回的回调实现类)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitConfig {
@Autowired private RabbitTemplate.ConfirmCallback confirmCallback;
@Autowired private RabbitTemplate.ReturnsCallback returnsCallback;
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnsCallback);
return rabbitTemplate; }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.ExchangeBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import java.util.HashMap; import java.util.Map;
@Configuration public class QueueConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String CONFIRM_QUEUE_ROUTING_KEY = "key1";
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
public static final String BACKUP_QUEUE_NAME = "backup.queue";
public static final String WARNING_QUEUE_NAME = "warning.queue";
@Bean("confirmExchange") public DirectExchange confirmExchange() { Map<String, Object> arguments = new HashMap<>(); arguments.put("alternate-exchange", BACKUP_EXCHANGE_NAME); return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArguments(arguments).build(); }
@Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); }
@Bean public Binding bindingQueue(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_QUEUE_ROUTING_KEY); }
@Bean("backupExchange") public FanoutExchange backupExchange() { return new FanoutExchange(BACKUP_EXCHANGE_NAME); }
@Bean("backupQueue") public Queue backupQueue() { return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); }
@Bean("warningQueue") public Queue warningQueue() { return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); }
@Bean public Binding bindingBackupQueue(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) { return BindingBuilder.bind(backupQueue).to(backupExchange); }
@Bean public Binding bindingWarningQueue(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) { return BindingBuilder.bind(warningQueue).to(backupExchange); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import com.clay.rabbitmq.config.QueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import java.util.Date; import java.util.UUID;
@Slf4j @RestController @RequestMapping("/confirm") public class ProducerController {
@Resource private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{msg}") public String sendMsg(@PathVariable("msg") String message) { log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(QueueConfig.CONFIRM_EXCHANGE_NAME, QueueConfig.CONFIRM_QUEUE_ROUTING_KEY, message, correlationData); return "success"; }
@GetMapping("/sendNoRouteMsg/{msg}") public String sendNoRouteMsg(@PathVariable("msg") String message) { log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(QueueConfig.CONFIRM_EXCHANGE_NAME, "", message, correlationData); return "success"; }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import com.clay.rabbitmq.config.QueueConfig; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets; import java.util.Date;
@Slf4j @Component public class CommonQueueConsumer {
@RabbitListener(queues = QueueConfig.CONFIRM_QUEUE_NAME) public void receiveMsg(Message message, Channel channel) { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("当前时间: {}, 接收到信息: {}", new Date(), msg); }
}
|
1 2 3 4 5 6 7 8 9 10 11
| @Slf4j @Component public class WarningQueueConsumer {
@RabbitListener(queues = QueueConfig.WARNING_QUEUE_NAME) public void receiveMsg(Message message, Channel channel) { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.error("当前时间: {}, 报警发现不可路由消息: {}", new Date(), msg); }
}
|
1 2 3 4 5 6 7 8 9 10 11
| import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class RabbitApplication {
public static void main(String[] args) { SpringApplication.run(RabbitApplication.class, args); }
}
|
特别注意
Spring Boot 2.x
之前,需要在主启动类上添加 @EnableRabbit
注解。
代码测试
1 2 3
| INFO 51355 --- [nio-8080-exec-1] c.c.r.controller.ProducerController : 当前时间: Wed Apr 22 22:07:01 CST 2021, 发送一条信息给队列: hello INFO 51355 --- [ntContainer#0-1] c.clay.rabbitmq.consumer.QueueConsumer : 当前时间: Wed Apr 22 22:07:01 CST 2021, 接收到信息: hello INFO 51355 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 4520dc03-0c8a-425d-8230-8440cffa3d12 的消息成功发送到交换机
|
- (3) 通过浏览器调用接口
http://127.0.0.1:8080/confirm/sendNoRouteMsg/hello
,SpringBoot 应用会在控制台打印以下日志信息:
1 2 3
| INFO 71199 --- [nio-8080-exec-1] c.c.r.controller.ProducerController : 当前时间: Wed Apr 22 22:27:23 CST 2021, 发送一条信息给队列: hello ERROR 44139 --- [ntContainer#1-1] c.c.r.consumer.WarningQueueConsumer : 当前时间: Wed Apr 22 22:27:23 CST 2021, 报警发现不可路由消息: hello INFO 71199 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 a08f5856-e75e-496b-b95b-a1cb6220b954 的消息成功发送到交换机
|
- (4) 在第二次调用发送消息的接口时,由于路由键(RoutingKey)不正确,交换机无法将消息路由到任何队列,消息转而发送到备份交换机,接着消息被投递到备份队列和报警队列,最终监听报警队列的消费者会消费掉消息,而备份队列同时会将消息存储起来(如下图所示)。
![]()
RabbitMQ 其他知识
幂等性
幂等性的核心概念
幂等性(Idempotency)指的是一个操作无论执行多少次,产生的副作用和结果都是一样的,就像只执行了一次一样。幂等性可以保证在网络异常、重复请求等情况下,多次执行操作不会导致数据错误或系统不一致。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了;用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单体应用系统中,通常只需要把数据操作放入同一个事务中即可,发生错误立即回滚,但是在响应客户端的时候也有可能出现网络中断等意外。这时候就需要保证支付操作的幂等性,否则就会出现前面提到的重复扣款情况。
幂等性的实现方案
- 业务层设计时避免副作用,如重复扣款、重复下单
- 使用唯一请求 ID / 幂等键(Idempotency Key)
- 利用数据库的唯一约束(如唯一索引)
- 在处理前判断操作是否已经执行
幂等性的使用场景
- HTTP 接口
- `GET`:天然幂等,获取资源,不会改变状态。
- `PUT`:幂等,多次更新资源内容,结果一致。
- `POST`:非幂等,可能每次都创建新资源。但是,可以人为让 `POST` 变成幂等,比如通过幂等键(Idempotency Key) 控制重复提交。
- 消息系统
- 消息可能重复投递,消费端需要确保幂等消费,比如处理订单时避免重复扣库存。
- 数据库操作
- 更新操作(如设置状态为 "已完成")可设计为幂等。
- 插入操作可通过 "唯一约束" 或 "先查后插" 实现幂等。
消息重复消费
消息重复消费的概念
当消息队列(MQ)将一条消息发送给消费者后,消费者在成功处理完该消息并准备向 MQ 返回确认(ACK)时,恰好发生了网络中断,导致 MQ 并未收到这条 ACK 确认信息。此时,MQ 会认为该消息未被成功消费,因此会将该消息重新分发给其他消费者,或者在网络恢复后再次发送给原消费者。实际上,该消费者可能已经完成了这条消息的处理,这就会导致同一条消息被重复消费的问题。因此,为了确保系统的正确性,消费者在处理消息时必须具备幂等性,以避免因消息重复投递而产生的业务逻辑错误。
消息重复消费的解决
为了解决 MQ 消息重复消费的问题,消费者通常需要实现幂等性机制。常见的做法是为每条消息分配一个全局唯一标识符,例如使用时间戳、UUID,或者由业务系统按特定规则生成的唯一 ID。在订单类等业务场景中,也可以直接使用 MQ 消息自带的消息 ID 作为唯一标识。消费者在接收到消息时,首先根据该 ID 判断该消息是否已经被消费过,如果是,则跳过处理;如果没有,则继续执行业务逻辑,并记录该 ID,防止重复消费。这种方式可以有效保证在网络异常、消息重试等情况下,系统依然保持数据一致性和业务正确性。目前业界主流的幂等性实现方式主要有两种,第一种是使用 Reids 的原子性操作,第二种是使用唯一 ID + 指纹码机制。
Reids 的原子性操作
利用 Redis 的原子性(如 SETNX
命令或 Lua 脚本)实现对消息 ID 的唯一性判断,天生具有幂等性,从而快速、高效地实现幂等校验,适用于高并发场景。
唯一 ID + 指纹码机制
指纹码是指根据一定的业务规则生成的唯一标识,一般由时间戳、业务字段、或其他服务提供的唯一信息拼接而成。这种标识不一定是 MQ 系统自动生成的,而是由具体的业务逻辑组合而来,核心要求是保证其全局唯一性。在幂等性实现中,可以通过查询数据库,判断该指纹码是否已存在来决定是否执行后续操作。这种方式的优势在于实现简单:只需拼接生成唯一标识并进行一次查询判断即可;劣势则是在高并发场景下,如果依赖单个数据库,容易出现写入性能瓶颈。当然,也可以通过分库分表等方式来提升系统的并发能力,但这种实现方式相对复杂,并不是最推荐的解决方案。因此,指纹码机制适用于中小规模系统或并发量不高的业务场景,若要应用于高并发系统,则需要搭配更高效的存储和查询策略。