RabbitMQ 入门教程之八

大纲

前言

学习资源

RabbitMQ 发布确认

背景介绍

在生产环境中,由于一些不明原因导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者投递消息失败,导致消息丢失,需要手动处理和恢复。那么,如何才能实现 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,比如 RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?这时候就需要使用到 RabbitMQ 的发布确认机制来解决。

概念介绍

发布确认机制是 RabbitMQ 提供的一种用于确认生产者是否将消息成功发送到交换机的机制。它允许生产者在发送消息后收到 Broker 的确认(ACK)或否认(NACK)反馈,从而确认消息是否真正被 RabbitMQ 接收和处理。默认情况下,RabbitMQ 的消息发送是 “发出去就不管了” 的模型(即 Fire-And-Forget)。在网络抖动、服务重启、资源满载等异常情况下,消息有可能根本没被 RabbitMQ 接收到,就悄悄丢失了。发布确认机制就是为了解决这个问题。

实现方案

RabbitMQ 消息发布确认的常见实现方案如下图所示:

幂等消费

消息发送失败,在消息重新发送后,为了避免消费者在消费消息时可能出现重复消费的问题,消费者需要实现幂等消费。

使用案例

这里将演示 SpringBoot 项目如何使用 RabbitMQ 的发布确认机制,队列与交换机的绑定关系如下图所示:

代码下载

本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-16

版本说明

本案例使用的各软件版本如下表所示:

组件版本说明
RabbitMQ Server3.8.26RabbitMQ 服务器
SpringBoot2.7.18
Erlang24.2
Java11

案例代码

  • Maven 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<properties>
<spring-boot.version>2.7.18</spring-boot.version>
</properties>

<dependencyManagement>
<dependencies>
<!-- SpringBoot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!--Web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--RabbitMQ 核心依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!--Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--Test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
  • 配置文件(application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
server:
port: 8080

spring:
application:
name: rabbit-application
rabbitmq:
host: 192.168.2.127
port: 5672
password: admin
username: admin
virtual-host: /
publisher-confirm-type: correlated

参数 publisher-confirm-type 的使用说明

上面的 publisher-confirm-type 参数用于确认消息是否成功到达交换机(Exchange),也叫做发布确认机制(Publisher Confirms)。该参数的可选值及其含义如下:

可选值含义
none禁用发布确认机制,是默认值。
correlated启用基于 CorrelationData 的发布确认机制,会调用自定义实现的 RabbitTemplate.ConfirmCallback 接口中的 confirm() 方法。
simple启用简单模式的发布确认机制。Spring 会使用内部的发布确认逻辑(不是基于回调的),但不会像 correlated 那样提供精细的回调。

特别注意

publisher-confirm-type 的值设置为 simple,经测试有两种效果。其一效果和 correlated 值一样会触发 RabbitTemplate.ConfirmCallback 接口的回调方法;其二效果在发布消息成功后,当使用 RabbitTemplate 调用 waitForConfirms()waitForConfirmsOrDie() 方法等待 Broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,此时需要注意的是 waitForConfirmsOrDie() 方法如果返回 false,则会关闭 Channel,导致接下来无法再发送消息到 Broker。


  • 消息发布确认的回调实现类(实现了 RabbitTemplate.ConfirmCallback 接口)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CustomConfirmCallback implements RabbitTemplate.ConfirmCallback {

/**
* 消息发送到交换机后的回调函数,用于确认消息是否成功到达交换机
* <p> 特别注意:如果消息成功发送到交换机,但是交换机根据 RoutingKey 无法将消息路由到匹配的队列,消息会被丢弃,但 ack 参数的值仍然为 true
*
* @param correlationData 关联数据,用于唯一标识发送的消息,可以在发送消息时设置,用于跟踪消息的状态
* @param ack 表示消息是否成功到达交换机。true 表示成功,false 表示失败
* @param cause 如果 ack 为 false,该字段表示失败的原因;如果 ack 为 true,该字段通常为 null
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String msgId = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("ID 为 {} 的消息成功发送到交换机", msgId);
} else {
log.error("ID 为 {} 的消息发送到交换机失败,原因:{}", msgId, cause);
}
}

}

特别注意

实现 RabbitTemplate.ConfirmCallback 接口后,如果消息成功发送到交换机,但是交换机根据 RoutingKey 无法将消息路由到匹配的队列,消息会被丢弃,但 ack 参数的值仍然为 true。换句话说,在仅启用发布确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息;如果交换机在投递消息时,发现消息不可路由(比如路由键写错、没有匹配的队列等),那么消息会被直接丢弃,此时消息生产者是不知道消息被丢弃这个事件的。如果需要解决这个问题,则需要使用到 RabbitMQ 的消息退回机制或者备份交换机。

  • RabbitTemplate 的配置类(设置 RabbitTemplate 消息发布确认的回调实现类)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

@Autowired
private RabbitTemplate.ConfirmCallback confirmCallback;

/**
* 配置 RabbitTemplate Bean,用于自定义消息发送行为,比如发布确认处理
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 设置发布确认回调,用于确认消息是否成功到达交换机,需要在配置中启用 spring.rabbitmq.publisher-confirm-type=correlated
rabbitTemplate.setConfirmCallback(confirmCallback);
return rabbitTemplate;
}

}
  • Java 配置类(负责队列、交换机的声明和绑定)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {

// 交换机的名称
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";

// 队列的名称
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

// 队列的路由键(绑定键)
public static final String CONFIRM_QUEUE_ROUTING_KEY = "key1";

// 声明交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}

// 声明队列
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}

// 绑定交换机和队列
@Bean
public Binding bindingQueue(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_QUEUE_ROUTING_KEY);
}

}
  • 生产者代码(负责发送消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import com.clay.rabbitmq.config.QueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;
import java.util.UUID;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

@Resource
private RabbitTemplate rabbitTemplate;

@GetMapping("/sendMsg/{msg}")
public String sendMsg(@PathVariable("msg") String message) {
log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message);
// 指定消息 ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend(QueueConfig.CONFIRM_EXCHANGE_NAME, QueueConfig.CONFIRM_QUEUE_ROUTING_KEY, message, correlationData);
return "success";
}

@GetMapping("/sendNoExchangeMsg/{msg}")
public String sendNoExchangeMsg(@PathVariable("msg") String message) {
log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message);
// 指定消息 ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息(特意不指定错误的交换机,从而验证消息是否会被确认发布)
rabbitTemplate.convertAndSend("xxxx", QueueConfig.CONFIRM_QUEUE_ROUTING_KEY, message, correlationData);
return "success";
}

}
  • 消费者代码(负责消费消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import com.clay.rabbitmq.config.QueueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.Date;

@Slf4j
@Component
public class QueueConsumer {

@RabbitListener(queues = QueueConfig.CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("当前时间: {}, 接收到信息: {}", new Date(), msg);
}

}
  • 主启动类
1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitApplication {

public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}

}

特别注意

Spring Boot 2.x 之前,需要在主启动类上添加 @EnableRabbit 注解。

代码测试

  • (1) 启动 SpringBoot 应用

  • (2) 通过浏览器调用接口 http://127.0.0.1:8080/confirm/sendMsg/hello,SpringBoot 应用会在控制台打印以下日志信息:

1
2
3
INFO 51355 --- [nio-8080-exec-1] c.c.r.controller.ProducerController      : 当前时间: Wed Apr 22 21:07:01 CST 2021, 发送一条信息给队列: hello
INFO 51355 --- [ntContainer#0-1] c.clay.rabbitmq.consumer.QueueConsumer : 当前时间: Wed Apr 22 21:07:01 CST 2021, 接收到信息: hello
INFO 51355 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 4520dc03-0c8a-425d-8230-8440cffa3d12 的消息成功发送到交换机
  • (3) 通过浏览器调用接口 http://127.0.0.1:8080/confirm/sendNoExchangeMsg/hello,SpringBoot 应用会在控制台打印以下日志信息:
1
2
INFO 51355 --- [nio-8080-exec-1] c.c.r.controller.ProducerController      : 当前时间: Wed Apr 22 21:08:15 CST 2021, 发送一条信息给队列: hello
INFO 51355 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 56a86b63-2641-4f04-b96b-ef8e64ac20ce 的消息发送到交换机失败,原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'xxxx' in vhost '/', class-id=60, method-id=40)
  • (4) 在第二次调用发送消息的接口时,由于交换机名称不正确,消息无法发送到任何交换机,导致消息投递失败,最终 Broker 会发送消息投递失败的通知(NACK)给生产者。

底层实现

在 SpringBoot 项目中,推荐使用 publisher-confirm-type: correlated + RabbitTemplate.ConfirmCallback 来实现 RabbitMQ 的发布确认机制。它们本质上就是对 RabbitMQ 原生 API 的封装,也就是封装了 channel.confirmSelect() + channel.addConfirmListener(ackCallback, nackCallback),都属于异步确认发布操作;省去了开发者手动管理 deliveryTag、注册监听器、处理多线程等麻烦,适合绝大多数业务开发场景。其底层实现原理如下:

  • 当配置了 publisher-confirm-type: correlated,Spring 内部实际上就是帮开发者开启了 channel.confirmSelect(),并注册了一个 ConfirmListener 监听器。
  • 开发者自定义实现的 RabbitTemplate.ConfirmCallback 是 Spring 把 ConfirmListener 回调事件封装后再分发出来的。
  • 也就是说,开发者使用的是 Spring 管理版的 channel.addConfirmListener(ackCallback, nackCallback),只是更高级、更好用而已。

RabbitMQ 消息退回

背景介绍

通过上面的案例,可以知道在仅启用 RabbitMQ 发布确认机制的情况下,交换机接收到消息后,会直接给生产者发送确认消息;如果交换机在投递消息时,发现消息不可路由(比如路由键写错、没有匹配的队列等),那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。为了解决这个问题,需要使用到 RabbitMQ 的消息退回机制;通过设置 mandatory = true,可以让消息在投递过程中出现无法路由的时候,将消息退回给生产者。

概念介绍

RabbitMQ 的消息退回机制是指当消息成功发送到了交换机,但由于某些原因交换机无法将消息路由到任何队列(比如路由键写错、没有匹配的队列等),RabbitMQ 将这条无法被路由的消息退回给生产者的一种机制。默认情况下,在不使用消息退回机制时(比如:不设置 mandatory = true),RabbitMQ 会直接丢弃无法路由的消息,不会有任何提示信息。特别注意:消息退回机制只在以下三个条件同时满足时才会生效:

  • 消息已成功到达交换机
  • 交换机无法将消息路由到匹配的队列
  • 设置了 mandatory = true(否则消息会被直接丢弃)

使用案例

这里将演示 SpringBoot 项目如何使用 RabbitMQ 的发布确认机制和消息退回机制,队列与交换机的绑定关系如下图所示:

代码下载

本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-17

版本说明

本案例使用的各软件版本如下表所示:

组件版本说明
RabbitMQ Server3.8.26RabbitMQ 服务器
SpringBoot2.7.18
Erlang24.2
Java11

案例代码

  • Maven 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<properties>
<spring-boot.version>2.7.18</spring-boot.version>
</properties>

<dependencyManagement>
<dependencies>
<!-- SpringBoot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!--Web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--RabbitMQ 核心依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!--Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--Test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
  • 配置文件(application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
server:
port: 8080

spring:
application:
name: rabbit-application
rabbitmq:
host: 192.168.2.127
port: 5672
password: admin
username: admin
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true

参数 publisher-confirm-type 的使用说明

上面的 publisher-confirm-type 参数用于确认消息是否成功到达交换机(Exchange),也叫做发布确认机制(Publisher Confirms)。该参数的可选值及其含义如下:

可选值含义
none禁用发布确认机制,是默认值。
correlated启用基于 CorrelationData 的发布确认机制,会调用自定义实现的 RabbitTemplate.ConfirmCallback 接口中的 confirm() 方法。
simple启用简单模式的发布确认机制。Spring 会使用内部的发布确认逻辑(不是基于回调的),但不会像 correlated 那样提供精细的回调。

特别注意

publisher-confirm-type 的值设置为 simple,经测试有两种效果。其一效果和 correlated 值一样会触发 RabbitTemplate.ConfirmCallback 接口的回调方法;其二效果在发布消息成功后,当使用 RabbitTemplate 调用 waitForConfirms()waitForConfirmsOrDie() 方法等待 Broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,此时需要注意的是 waitForConfirmsOrDie() 方法如果返回 false,则会关闭 Channel,导致接下来无法再发送消息到 Broker。

参数 publisher-returns 的使用说明

上面的 publisher-returns 参数用于启用 RabbitMQ 的消息退回机制。通过 publisher-returns: true 启用消息退回机制后,再配合 rabbitTemplate.setMandatory(true);,就可以让自定义实现的 ReturnsCallback 回调方法被触发,这样就能知道哪条消息发送失败了。

特别注意

  • publisher-returns: true 启用的是 RabbitMQ 的消息退回机制。
  • rabbitTemplate.setMandatory(true); 是告诉 RabbitMQ:"如果消息不能路由成功,就把消息退回给生产者,不要直接丢弃"。
  • 两者必须搭配使用,publisher-returns 控制消息退回功能是否启用,mandatory 控制消息是否可退回,然后才能在 RabbitTemplate.ReturnsCallback 中接收到消息退回的通知。

  • 消息发布确认的回调实现类(实现了 RabbitTemplate.ConfirmCallback 接口)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CustomConfirmCallback implements RabbitTemplate.ConfirmCallback {

/**
* 消息发送到交换机后的回调函数,用于确认消息是否成功到达交换机
* <p> 特别注意:如果消息成功发送到交换机,但是交换机根据 RoutingKey 无法将消息路由到匹配的队列,消息会被丢弃,但 ack 参数的值仍然为 true
*
* @param correlationData 关联数据,用于唯一标识发送的消息,可以在发送消息时设置,用于跟踪消息的状态
* @param ack 表示消息是否成功到达交换机。true 表示成功,false 表示失败
* @param cause 如果 ack 为 false,该字段表示失败的原因;如果 ack 为 true,该字段通常为 null
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String msgId = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("ID 为 {} 的消息成功发送到交换机", msgId);
} else {
log.error("ID 为 {} 的消息发送到交换机失败,原因:{}", msgId, cause);
}
}

}

特别注意

实现 RabbitTemplate.ConfirmCallback 接口后,如果消息成功发送到交换机,但是交换机根据 RoutingKey 无法将消息路由到匹配的队列,消息会被丢弃,但 ack 参数的值仍然为 true。换句话说,在仅启用发布确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息;如果交换机在投递消息时,发现消息不可路由(比如路由键写错、没有匹配的队列等),那么消息会被直接丢弃,此时消息生产者是不知道消息被丢弃这个事件的。如果需要解决这个问题,则需要使用到 RabbitMQ 的消息退回机制或者备份交换机。

  • 消息退回的回调实现类(实现了 RabbitTemplate.ReturnsCallback 接口)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;

@Slf4j
@Component
public class CustomReturnsCallback implements RabbitTemplate.ReturnsCallback {

/**
* 当消息成功发送到交换机,但因为路由失败(比如没有匹配的队列)而被退回时,会触发该回调方法
*
* @param returned 包含被退回消息的详细信息,如消息内容、路由键、交换机、退回原因等
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
String cause = returned.getReplyText();
String exchagne = returned.getExchange();
String routingKey = returned.getRoutingKey();
String message = new String(returned.getMessage().getBody(), StandardCharsets.UTF_8);
log.error("消息 {} 被交换机 {} 退回, 路由键: {}, 退回原因: {}", message, exchagne, routingKey, cause);
}

}
  • RabbitTemplate 的配置类(设置 RabbitTemplate 消息发布确认和消息退回的回调实现类)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

@Autowired
private RabbitTemplate.ConfirmCallback confirmCallback;

@Autowired
private RabbitTemplate.ReturnsCallback returnsCallback;

/**
* 配置 RabbitTemplate Bean,用于自定义消息发送行为,比如发布确认和消息退回处理
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

// 设置发布确认回调,用于确认消息是否成功到达交换机,需要在配置中启用 spring.rabbitmq.publisher-confirm-type=correlated
rabbitTemplate.setConfirmCallback(confirmCallback);

// 启用强制消息投递(mandatory=true),如果消息无法被路由到队列(即找不到匹配队列),则会触发 ReturnsCallback 回调
rabbitTemplate.setMandatory(true);

// 设置消息退回回调,用于处理消息未被队列接收的情况(比如路由键错误),需要在配置中启用 spring.rabbitmq.publisher-returns=true
rabbitTemplate.setReturnsCallback(returnsCallback);

return rabbitTemplate;
}

}
  • Java 配置类(负责队列、交换机的声明和绑定)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {

// 交换机的名称
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";

// 队列的名称
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

// 队列的路由键(绑定键)
public static final String CONFIRM_QUEUE_ROUTING_KEY = "key1";

// 声明交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}

// 声明队列
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}

// 绑定交换机和队列
@Bean
public Binding bindingQueue(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_QUEUE_ROUTING_KEY);
}

}
  • 生产者代码(负责发送消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import com.clay.rabbitmq.config.QueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;
import java.util.UUID;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

@Resource
private RabbitTemplate rabbitTemplate;

@GetMapping("/sendMsg/{msg}")
public String sendMsg(@PathVariable("msg") String message) {
log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message);
// 指定消息 ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend(QueueConfig.CONFIRM_EXCHANGE_NAME, QueueConfig.CONFIRM_QUEUE_ROUTING_KEY, message, correlationData);
return "success";
}

@GetMapping("/sendNoRouteMsg/{msg}")
public String sendNoRouteMsg(@PathVariable("msg") String message) {
log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message);
// 指定消息 ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息(特意不指定 RoutingKey,让交换机无法将消息路由到任何队列,从而验证消息是否会被退回)
rabbitTemplate.convertAndSend(QueueConfig.CONFIRM_EXCHANGE_NAME, "", message, correlationData);
return "success";
}

}
  • 消费者代码(负责消费消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import com.clay.rabbitmq.config.QueueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.Date;

@Slf4j
@Component
public class QueueConsumer {

@RabbitListener(queues = QueueConfig.CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("当前时间: {}, 接收到信息: {}", new Date(), msg);
}

}
  • 主启动类
1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitApplication {

public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}

}

特别注意

Spring Boot 2.x 之前,需要在主启动类上添加 @EnableRabbit 注解。

代码测试

  • (1) 启动 SpringBoot 应用

  • (2) 通过浏览器调用接口 http://127.0.0.1:8080/confirm/sendMsg/hello,SpringBoot 应用会在控制台打印以下日志信息:

1
2
3
INFO 51355 --- [nio-8080-exec-1] c.c.r.controller.ProducerController      : 当前时间: Wed Apr 22 22:07:01 CST 2021, 发送一条信息给队列: hello
INFO 51355 --- [ntContainer#0-1] c.clay.rabbitmq.consumer.QueueConsumer : 当前时间: Wed Apr 22 22:07:01 CST 2021, 接收到信息: hello
INFO 51355 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 4520dc03-0c8a-425d-8230-8440cffa3d12 的消息成功发送到交换机
  • (3) 通过浏览器调用接口 http://127.0.0.1:8080/confirm/sendNoRouteMsg/hello,SpringBoot 应用会在控制台打印以下日志信息:
1
2
3
INFO 71199 --- [nio-8080-exec-1] c.c.r.controller.ProducerController      : 当前时间: Wed Apr 22 22:27:23 CST 2021, 发送一条信息给队列: hello
INFO 71199 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 a08f5856-e75e-496b-b95b-a1cb6220b954 的消息成功发送到交换机
ERROR 71199 --- [nectionFactory1] c.c.r.callback.CustomReturnsCallback : 消息 hello 被交换机 confirm.exchange 退回, 路由键: xxxx, 退回原因: NO_ROUTE
  • (4) 在第二次调用发送消息的接口时,由于路由键(RoutingKey)不正确,交换机无法将消息路由到任何队列,导致消息投递失败,最终 Broker 会将消息退回给生产者。

RabbitMQ 备份交换机

背景介绍

在 RabbitMQ 中,开启 mandatory 参数并结合消息退回机制,确实可以帮助我们感知那些无法被正常路由的消息,从而在消息无法投递到队列时及时发现并进行处理。然而,现实中的处理过程往往并不理想。多数情况下,我们只能简单地打印一条日志,或触发一个告警,然后手动介入处理。这种方式不仅效率低下,而且当生产者服务以集群方式部署在多台机器上时,手动收集日志的成本较高、容易出错,处理体验非常不优雅。更重要的是,一旦启用消息退回机制,就意味着生产者需要额外编写代码逻辑来处理这些被退回的消息,增加了系统的复杂度。此时我们面临一个两难选择:要么丢弃消息,要么增加额外的开发和维护负担。那么,有没有一种方式,既能防止消息丢失,又能不增加生产者的复杂性呢?前面我们提到过死信队列,它可以用来保存那些被消费者拒绝或处理失败的消息。但死信队列只适用于进入队列之后的异常处理,而无法覆盖根本未进入队列的情况 —— 也就是说,那些因为找不到匹配队列而直接被交换机退回的消息,是无法被死信机制捕获的。此时,就需要使用 RabbitMQ 提供的备份交换机(Alternate Exchange)机制来解决这个问题。

概念介绍

备份交换机(Alternate Exchange)可以理解为一个交换机的 “兜底方案” 或 “备用出口”。当某个交换机收到一条消息,而这条消息没有匹配任何已绑定的队列时,RabbitMQ 会自动将这条消息转发给该交换机所指定的备份交换机,由备份交换机来处理这类 “不可路由的消息”。通常,备份交换机会设置为 Fanout 类型,以便将消息广播投递到其下绑定的所有队列中。我们只需在备份交换机上绑定一个(或多个)” 回收队列 / 备份队列”,即可集中存储这些不可路由的消息。同时,还可以在备份交换机上额外绑定一个报警队列,由专门的消费者监听该队列,用于实时触发监控和告警。这样一来,不仅避免了消息丢失,也无需在生产者端增加任何额外的处理逻辑,实现了消息可靠性的同时保持了系统简洁性。

特别注意

当 RabbitMQ 的消息退回机制和备份交换机同时启用的时候,备份交换机的优先级更高;也就是说,无法路由的消息会被转发给备份交换机处理,而不会直接退回给消息生产者。

使用案例

这里将演示 SpringBoot 项目如何使用 RabbitMQ 的发布确认机制 + 消息退回机制 + 备份交换机,队列与交换机的绑定关系如下图所示:

代码下载

本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-18

版本说明

本案例使用的各软件版本如下表所示:

组件版本说明
RabbitMQ Server3.8.26RabbitMQ 服务器
SpringBoot2.7.18
Erlang24.2
Java11

案例代码

  • Maven 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<properties>
<spring-boot.version>2.7.18</spring-boot.version>
</properties>

<dependencyManagement>
<dependencies>
<!-- SpringBoot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!--Web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--RabbitMQ 核心依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!--Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--Test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
  • 配置文件(application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
server:
port: 8080

spring:
application:
name: rabbit-application
rabbitmq:
host: 192.168.2.127
port: 5672
password: admin
username: admin
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
  • 消息发布确认的回调实现类(实现了 RabbitTemplate.ConfirmCallback 接口)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CustomConfirmCallback implements RabbitTemplate.ConfirmCallback {

/**
* 消息发送到交换机后的回调函数,用于确认消息是否成功到达交换机
* <p> 特别注意:如果消息成功发送到交换机,但是交换机根据 RoutingKey 无法将消息路由到匹配的队列,消息会被丢弃,但 ack 参数的值仍然为 true
*
* @param correlationData 关联数据,用于唯一标识发送的消息,可以在发送消息时设置,用于跟踪消息的状态
* @param ack 表示消息是否成功到达交换机。true 表示成功,false 表示失败
* @param cause 如果 ack 为 false,该字段表示失败的原因;如果 ack 为 true,该字段通常为 null
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String msgId = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("ID 为 {} 的消息成功发送到交换机", msgId);
} else {
log.error("ID 为 {} 的消息发送到交换机失败,原因:{}", msgId, cause);
}
}

}

特别注意

实现 RabbitTemplate.ConfirmCallback 接口后,如果消息成功发送到交换机,但是交换机根据 RoutingKey 无法将消息路由到匹配的队列,消息会被丢弃,但 ack 参数的值仍然为 true。换句话说,在仅启用发布确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息;如果交换机在投递消息时,发现消息不可路由(比如路由键写错、没有匹配的队列等),那么消息会被直接丢弃,此时消息生产者是不知道消息被丢弃这个事件的。如果需要解决这个问题,则需要使用到 RabbitMQ 的消息退回机制或者备份交换机。

  • 消息退回的回调实现类(实现了 RabbitTemplate.ReturnsCallback 接口)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;

@Slf4j
@Component
public class CustomReturnsCallback implements RabbitTemplate.ReturnsCallback {

/**
* 当消息成功发送到交换机,但因为路由失败(比如没有匹配的队列)而被退回时,会触发该回调方法
*
* @param returned 包含被退回消息的详细信息,如消息内容、路由键、交换机、退回原因等
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
String cause = returned.getReplyText();
String exchagne = returned.getExchange();
String routingKey = returned.getRoutingKey();
String message = new String(returned.getMessage().getBody(), StandardCharsets.UTF_8);
log.error("消息 {} 被交换机 {} 退回, 路由键: {}, 退回原因: {}", message, exchagne, routingKey, cause);
}

}
  • RabbitTemplate 的配置类(设置 RabbitTemplate 消息发布确认和消息退回的回调实现类)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

@Autowired
private RabbitTemplate.ConfirmCallback confirmCallback;

@Autowired
private RabbitTemplate.ReturnsCallback returnsCallback;

/**
* 配置 RabbitTemplate Bean,用于自定义消息发送行为,比如发布确认和消息退回处理
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

// 设置发布确认回调,用于确认消息是否成功到达交换机,需要在配置中启用 spring.rabbitmq.publisher-confirm-type=correlated
rabbitTemplate.setConfirmCallback(confirmCallback);

// 启用强制消息投递(mandatory=true),如果消息无法被路由到队列(即找不到匹配队列),则会触发 ReturnsCallback 回调
rabbitTemplate.setMandatory(true);

// 设置消息退回回调,用于处理消息未被队列接收的情况(比如路由键错误),需要在配置中启用 spring.rabbitmq.publisher-returns=true
rabbitTemplate.setReturnsCallback(returnsCallback);

return rabbitTemplate;
}

}
  • Java 配置类(负责队列、交换机的声明和绑定)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class QueueConfig {

// 交换机的名称
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";

// 队列的名称
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

// 队列的路由键(绑定键)
public static final String CONFIRM_QUEUE_ROUTING_KEY = "key1";

// 备份交换机的名称
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";

// 备份队列的名称
public static final String BACKUP_QUEUE_NAME = "backup.queue";

// 报警队列的名称
public static final String WARNING_QUEUE_NAME = "warning.queue";

// 声明交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
Map<String, Object> arguments = new HashMap<>();
// 设置当前交换机的备份交换机
arguments.put("alternate-exchange", BACKUP_EXCHANGE_NAME);
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArguments(arguments).build();
}

// 声明队列
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}

// 绑定交换机和队列
@Bean
public Binding bindingQueue(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_QUEUE_ROUTING_KEY);
}

/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// 声明备份交换机
@Bean("backupExchange")
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}

// 声明备份队列
@Bean("backupQueue")
public Queue backupQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}

// 声明报警队列
@Bean("warningQueue")
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}

// 绑定备份交换机和备份队列
@Bean
public Binding bindingBackupQueue(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(backupQueue).to(backupExchange);
}

// 绑定备份交换机和报警队列
@Bean
public Binding bindingWarningQueue(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(warningQueue).to(backupExchange);
}

}
  • 生产者代码(负责发送消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import com.clay.rabbitmq.config.QueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;
import java.util.UUID;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

@Resource
private RabbitTemplate rabbitTemplate;

@GetMapping("/sendMsg/{msg}")
public String sendMsg(@PathVariable("msg") String message) {
log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message);
// 指定消息 ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend(QueueConfig.CONFIRM_EXCHANGE_NAME, QueueConfig.CONFIRM_QUEUE_ROUTING_KEY, message, correlationData);
return "success";
}

@GetMapping("/sendNoRouteMsg/{msg}")
public String sendNoRouteMsg(@PathVariable("msg") String message) {
log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message);
// 指定消息 ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息(特意不指定 RoutingKey,让交换机无法将消息路由到任何队列,从而验证消息是否会被备份交换机处理)
rabbitTemplate.convertAndSend(QueueConfig.CONFIRM_EXCHANGE_NAME, "", message, correlationData);
return "success";
}

}
  • 消费者一代码(负责从普通队列消费消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import com.clay.rabbitmq.config.QueueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.Date;

@Slf4j
@Component
public class CommonQueueConsumer {

@RabbitListener(queues = QueueConfig.CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("当前时间: {}, 接收到信息: {}", new Date(), msg);
}

}
  • 消费者二的代码(负责从报警队列消费消息)
1
2
3
4
5
6
7
8
9
10
11
@Slf4j
@Component
public class WarningQueueConsumer {

@RabbitListener(queues = QueueConfig.WARNING_QUEUE_NAME)
public void receiveMsg(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.error("当前时间: {}, 报警发现不可路由消息: {}", new Date(), msg);
}

}
  • 主启动类
1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitApplication {

public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}

}

特别注意

Spring Boot 2.x 之前,需要在主启动类上添加 @EnableRabbit 注解。

代码测试

  • (1) 启动 SpringBoot 应用

  • (2) 通过浏览器调用接口 http://127.0.0.1:8080/confirm/sendMsg/hello,SpringBoot 应用会在控制台打印以下日志信息:

1
2
3
INFO 51355 --- [nio-8080-exec-1] c.c.r.controller.ProducerController      : 当前时间: Wed Apr 22 22:07:01 CST 2021, 发送一条信息给队列: hello
INFO 51355 --- [ntContainer#0-1] c.clay.rabbitmq.consumer.QueueConsumer : 当前时间: Wed Apr 22 22:07:01 CST 2021, 接收到信息: hello
INFO 51355 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 4520dc03-0c8a-425d-8230-8440cffa3d12 的消息成功发送到交换机
  • (3) 通过浏览器调用接口 http://127.0.0.1:8080/confirm/sendNoRouteMsg/hello,SpringBoot 应用会在控制台打印以下日志信息:
1
2
3
INFO 71199 --- [nio-8080-exec-1] c.c.r.controller.ProducerController      : 当前时间: Wed Apr 22 22:27:23 CST 2021, 发送一条信息给队列: hello
ERROR 44139 --- [ntContainer#1-1] c.c.r.consumer.WarningQueueConsumer : 当前时间: Wed Apr 22 22:27:23 CST 2021, 报警发现不可路由消息: hello
INFO 71199 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 a08f5856-e75e-496b-b95b-a1cb6220b954 的消息成功发送到交换机
  • (4) 在第二次调用发送消息的接口时,由于路由键(RoutingKey)不正确,交换机无法将消息路由到任何队列,消息转而发送到备份交换机,接着消息被投递到备份队列和报警队列,最终监听报警队列的消费者会消费掉消息,而备份队列同时会将消息存储起来(如下图所示)。

RabbitMQ 其他知识

幂等性

幂等性的核心概念

幂等性(Idempotency)指的是一个操作无论执行多少次,产生的副作用和结果都是一样的,就像只执行了一次一样。幂等性可以保证在网络异常、重复请求等情况下,多次执行操作不会导致数据错误或系统不一致。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了;用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单体应用系统中,通常只需要把数据操作放入同一个事务中即可,发生错误立即回滚,但是在响应客户端的时候也有可能出现网络中断等意外。这时候就需要保证支付操作的幂等性,否则就会出现前面提到的重复扣款情况。

幂等性的实现方案

  • 业务层设计时避免副作用,如重复扣款、重复下单
  • 使用唯一请求 ID / 幂等键(Idempotency Key)
  • 利用数据库的唯一约束(如唯一索引)
  • 在处理前判断操作是否已经执行

幂等性的使用场景

- HTTP 接口
    - `GET`:天然幂等,获取资源,不会改变状态。
    - `PUT`:幂等,多次更新资源内容,结果一致。
    - `POST`:非幂等,可能每次都创建新资源。但是,可以人为让 `POST` 变成幂等,比如通过幂等键(Idempotency Key) 控制重复提交。
- 消息系统
    - 消息可能重复投递,消费端需要确保幂等消费,比如处理订单时避免重复扣库存。
- 数据库操作
    - 更新操作(如设置状态为 "已完成")可设计为幂等。
    - 插入操作可通过 "唯一约束" 或 "先查后插" 实现幂等。

消息重复消费

消息重复消费的概念

当消息队列(MQ)将一条消息发送给消费者后,消费者在成功处理完该消息并准备向 MQ 返回确认(ACK)时,恰好发生了网络中断,导致 MQ 并未收到这条 ACK 确认信息。此时,MQ 会认为该消息未被成功消费,因此会将该消息重新分发给其他消费者,或者在网络恢复后再次发送给原消费者。实际上,该消费者可能已经完成了这条消息的处理,这就会导致同一条消息被重复消费的问题。因此,为了确保系统的正确性,消费者在处理消息时必须具备幂等性,以避免因消息重复投递而产生的业务逻辑错误。

消息重复消费的解决

为了解决 MQ 消息重复消费的问题,消费者通常需要实现幂等性机制。常见的做法是为每条消息分配一个全局唯一标识符,例如使用时间戳、UUID,或者由业务系统按特定规则生成的唯一 ID。在订单类等业务场景中,也可以直接使用 MQ 消息自带的消息 ID 作为唯一标识。消费者在接收到消息时,首先根据该 ID 判断该消息是否已经被消费过,如果是,则跳过处理;如果没有,则继续执行业务逻辑,并记录该 ID,防止重复消费。这种方式可以有效保证在网络异常、消息重试等情况下,系统依然保持数据一致性和业务正确性。目前业界主流的幂等性实现方式主要有两种,第一种是使用 Reids 的原子性操作,第二种是使用唯一 ID + 指纹码机制。

Reids 的原子性操作

利用 Redis 的原子性(如 SETNX 命令或 Lua 脚本)实现对消息 ID 的唯一性判断,天生具有幂等性,从而快速、高效地实现幂等校验,适用于高并发场景。

唯一 ID + 指纹码机制

指纹码是指根据一定的业务规则生成的唯一标识,一般由时间戳、业务字段、或其他服务提供的唯一信息拼接而成。这种标识不一定是 MQ 系统自动生成的,而是由具体的业务逻辑组合而来,核心要求是保证其全局唯一性。在幂等性实现中,可以通过查询数据库,判断该指纹码是否已存在来决定是否执行后续操作。这种方式的优势在于实现简单:只需拼接生成唯一标识并进行一次查询判断即可;劣势则是在高并发场景下,如果依赖单个数据库,容易出现写入性能瓶颈。当然,也可以通过分库分表等方式来提升系统的并发能力,但这种实现方式相对复杂,并不是最推荐的解决方案。因此,指纹码机制适用于中小规模系统或并发量不高的业务场景,若要应用于高并发系统,则需要搭配更高效的存储和查询策略。