SpringBoot 整合 RabbitMQ

前言

本文将介绍 SpringBoot 如何整合 RabbitMQ,并使用 RabbitMQ 的核心功能,比如消息持久化、发布确认机制、消息退回机制、消费确认机制等。

学习资源

整合案例

代码下载

本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-12

案例介绍

本案例将整合 SpringBoot 和 RabbitMQ,并演示如何使用 RabbitMQ 的以下功能:

版本说明

本案例使用的各软件版本如下表所示:

组件版本说明
SpringBoot2.7.18
RabbitMQ Server3.8.26
Erlang24.2
Java11

案例代码

  • Maven 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<properties>
<spring-boot.version>2.7.18</spring-boot.version>
</properties>

<dependencyManagement>
<dependencies>
<!-- SpringBoot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!--Web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--RabbitMQ 核心依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!--Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--Test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
  • 配置文件(application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
server:
port: 8080

spring:
application:
name: rabbit-application
rabbitmq:
host: 192.168.2.127
port: 5672
password: admin
username: admin
virtual-host: /
# 启用发布确认机制
publisher-confirm-type: correlated
# 启用消息退回机制
publisher-returns: true

参数 publisher-confirm-type 的使用说明

上面的 publisher-confirm-type 参数用于确认消息是否成功到达交换机(Exchange),也叫做发布确认机制(Publisher Confirms)。该参数的可选值及其含义如下:

可选值含义
none禁用发布确认机制,是默认值。
correlated启用基于 CorrelationData 的发布确认机制,会调用自定义实现的 RabbitTemplate.ConfirmCallback 接口中的 confirm() 方法。
simple启用简单模式的发布确认机制。Spring 会使用内部的发布确认逻辑(不是基于回调的),但不会像 correlated 那样提供精细的回调。

特别注意

publisher-confirm-type 的值设置为 simple,经测试有两种效果。其一效果和 correlated 值一样会触发 RabbitTemplate.ConfirmCallback 接口的回调方法;其二效果在发布消息成功后,当使用 RabbitTemplate 调用 waitForConfirms()waitForConfirmsOrDie() 方法等待 Broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,此时需要注意的是 waitForConfirmsOrDie() 方法如果返回 false,则会关闭 Channel,导致接下来无法再发送消息到 Broker。

参数 publisher-returns 的使用说明

上面的 publisher-returns 参数用于启用 RabbitMQ 的消息退回机制。通过 publisher-returns: true 启用消息退回机制后,再配合 rabbitTemplate.setMandatory(true);,就可以让自定义实现的 ReturnsCallback 回调方法被触发,这样就能知道哪条消息发送失败了。

特别注意

  • publisher-returns: true 启用的是 RabbitMQ 的消息退回机制。
  • rabbitTemplate.setMandatory(true); 是告诉 RabbitMQ:"如果消息不能路由成功,就把消息退回给生产者,不要直接丢弃"。
  • 两者必须搭配使用,publisher-returns 控制消息退回功能是否启用,mandatory 控制消息是否可退回,然后才能在 RabbitTemplate.ReturnsCallback 中接收到消息退回的通知。

  • 消息发布确认的回调实现类(实现了 RabbitTemplate.ConfirmCallback 接口)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CustomConfirmCallback implements RabbitTemplate.ConfirmCallback {

/**
* 消息发送到交换机后的回调函数,用于确认消息是否成功到达交换机
* <p> 特别注意:如果消息成功发送到交换机,但是交换机根据 RoutingKey 无法将消息路由到匹配的队列,消息会被丢弃,但 ack 参数的值仍然为 true
*
* @param correlationData 关联数据,用于唯一标识发送的消息,可以在发送消息时设置,用于跟踪消息的状态
* @param ack 表示消息是否成功到达交换机。true 表示成功,false 表示失败
* @param cause 如果 ack 为 false,该字段表示失败的原因;如果 ack 为 true,该字段通常为 null
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String msgId = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("ID 为 {} 的消息成功发送到交换机", msgId);
} else {
log.error("ID 为 {} 的消息发送到交换机失败,原因:{}", msgId, cause);
}
}

}

特别注意

实现 RabbitTemplate.ConfirmCallback 接口后,如果消息成功发送到交换机,但是交换机根据 RoutingKey 无法将消息路由到匹配的队列,消息会被丢弃,但 ack 参数的值仍然为 true。换句话说,在仅启用发布确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息;如果交换机在投递消息时,发现消息不可路由(比如路由键写错、没有匹配的队列等),那么消息会被直接丢弃,此时消息生产者是不知道消息被丢弃这个事件的。如果需要解决这个问题,则需要使用到 RabbitMQ 的消息退回机制或者备份交换机。

  • 消息退回的回调实现类(实现了 RabbitTemplate.ReturnsCallback 接口)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;

@Slf4j
@Component
public class CustomReturnsCallback implements RabbitTemplate.ReturnsCallback {

/**
* 当消息成功发送到交换机,但因为路由失败(比如没有匹配的队列)而被退回时,会触发该回调方法
*
* @param returned 包含被退回消息的详细信息,如消息内容、路由键、交换机、退回原因等
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
String cause = returned.getReplyText();
String exchagne = returned.getExchange();
String routingKey = returned.getRoutingKey();
String message = new String(returned.getMessage().getBody(), StandardCharsets.UTF_8);
log.error("消息 {} 被交换机 {} 退回, 路由键: {}, 退回原因: {}", message, exchagne, routingKey, cause);
}

}
  • RabbitTemplate 的配置类(设置 RabbitTemplate 消息发布确认和消息退回的回调实现类)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

@Autowired
private RabbitTemplate.ConfirmCallback confirmCallback;

@Autowired
private RabbitTemplate.ReturnsCallback returnsCallback;

/**
* 配置 RabbitTemplate Bean,用于自定义消息发送行为,比如发布确认处理
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

// 设置发布确认回调,用于确认消息是否成功到达交换机,需要在配置中启用 spring.rabbitmq.publisher-confirm-type=correlated
rabbitTemplate.setConfirmCallback(confirmCallback);

// 启用强制消息投递(mandatory=true),如果消息无法被路由到队列(即找不到匹配队列),则会触发 ReturnsCallback 回调
rabbitTemplate.setMandatory(true);

// 设置消息退回回调,用于处理消息未被队列接收的情况(比如路由键错误),需要在配置中启用 spring.rabbitmq.publisher-returns=true
rabbitTemplate.setReturnsCallback(returnsCallback);

return rabbitTemplate;
}

}
  • Java 配置类(负责队列、交换机的声明和绑定)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {

// 交换机的名称
public static final String ORDER_EXCHANGE_NAME = "order.exchange";

// 队列的名称
public static final String ORDER_QUEUE_NAME = "order.queue";

// 队列的路由键(绑定键)
public static final String ORDER_QUEUE_ROUTING_KEY = "order";

// 声明交换机
@Bean("orderExchange")
public TopicExchange orderExchange() {
// 定义主题交换机
return ExchangeBuilder.topicExchange(ORDER_EXCHANGE_NAME).build();
}

// 声明队列
@Bean("orderQueue")
public Queue orderQueue() {
// 定义持久化队列
return QueueBuilder.durable(ORDER_QUEUE_NAME).build();
}

// 绑定交换机和队列
@Bean
public Binding bindingQueue(@Qualifier("orderQueue") Queue orderQueue, @Qualifier("orderExchange") TopicExchange orderExchange) {
return BindingBuilder.bind(orderQueue).to(orderExchange).with(ORDER_QUEUE_ROUTING_KEY);
}

}
  • 生产者代码(负责发送消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import com.clay.rabbitmq.config.QueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.UUID;

@Slf4j
@RestController
public class ProducerController {

@Resource
private RabbitTemplate rabbitTemplate;

/**
* 正常发送消息
*/
@GetMapping("/sendMsg/{message}")
public String sendMsg(@PathVariable("message") String message) {
log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message);

MessageProperties properties = new MessageProperties();
// 设置消息持久化
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 设置消息 ID
properties.setMessageId(UUID.randomUUID().toString());

// 构建消息
Message msg = new Message(message.getBytes(StandardCharsets.UTF_8), properties);

// 发送消息(第一种写法)
rabbitTemplate.convertAndSend(QueueConfig.ORDER_EXCHANGE_NAME, QueueConfig.ORDER_QUEUE_ROUTING_KEY, msg);

return "success";
}

/**
* 发送消息到错误的交换机
*/
@GetMapping("/sendErrorExchangeMsg/{message}")
public String sendErrorExchangeMsg(@PathVariable("message") String message) {
log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message);

// 发送消息(第二种写法),特意指定错误的交换机,从而验证消息是否会被确认发布
rabbitTemplate.convertAndSend("xxxx", QueueConfig.ORDER_QUEUE_ROUTING_KEY, message, msg -> {
// 设置消息持久化
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 设置消息 ID
msg.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return msg;
});

return "success";
}

/**
* 发送消息到错误的队列
*/
@GetMapping("/sendErrorRoutingKeyMsg/{message}")
public String sendErrorRoutingKeyMsg(@PathVariable("message") String message) {
log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message);

// 发送消息(第二种写法),特意指定错误的路由键,从而验证消息是否会被退回
rabbitTemplate.convertAndSend(QueueConfig.ORDER_EXCHANGE_NAME, "xxxx", message, msg -> {
// 设置消息持久化
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 设置消息 ID
msg.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return msg;
});

return "success";
}

}
  • 消费者代码(负责消费消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import com.clay.rabbitmq.config.QueueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.Date;

@Slf4j
@Component
public class QueueConsumer {

/**
* 监听队列
*
* <p> 参数 ackMode = "MANUAL" 表示使用手动确认消费机制
*/
@RabbitListener(queues = QueueConfig.ORDER_QUEUE_NAME, ackMode = "MANUAL")
public void receiveMsg(Message message, Channel channel) throws Exception {
// 消息内容
String content = new String(message.getBody(), StandardCharsets.UTF_8);

// 消息唯一标记
Long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {
// 模拟消息处理耗时
Thread.sleep(5000);
log.info("当前时间: {}, 接收到信息: {}", new Date(), content);

// 手动确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 消息处理失败时,可选择拒绝消息并决定是否重新入队
channel.basicNack(deliveryTag, false, true);
// 记录错误日志信息
log.error("Consume message failed: {}", e.getMessage());
}
}

}
  • 主启动类
1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitApplication {

public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}

}

特别注意

Spring Boot 2.x 之前,需要在主启动类上添加 @EnableRabbit 注解。

代码测试

测试用例一

  • (1) 启动 SpringBoot 应用后,通过浏览器调用接口 http://127.0.0.1:8080/sendMsg/hello,SpringBoot 应用会在控制台打印以下日志信息:
1
2
3
INFO 51355 --- [nio-8080-exec-1] c.c.r.controller.ProducerController      : 当前时间: Wed Apr 22 22:07:01 CST 2022, 发送一条信息给队列: hello
INFO 51355 --- [ntContainer#0-1] c.clay.rabbitmq.consumer.QueueConsumer : 当前时间: Wed Apr 22 22:07:01 CST 2022, 接收到信息: hello
INFO 51355 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 4520dc03-0c8a-425d-8230-8440cffa3d12 的消息成功发送到交换机
  • (2) 在调用发送消息的接口后,当看到消费者可以正常消费消息,则说明交换机、队列、生产者、消费者的配置都正确。

测试用例二

  • (1) 启动 SpringBoot 应用后,通过浏览器调用接口 http://127.0.0.1:8080/sendErrorExchangeMsg/hello,SpringBoot 应用会在控制台打印以下日志信息:
1
2
INFO 51355 --- [nio-8080-exec-1] c.c.r.controller.ProducerController      : 当前时间: Wed Apr 22 21:08:15 CST 2022, 发送一条信息给队列: hello
INFO 51355 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 56a86b63-2641-4f04-b96b-ef8e64ac20ce 的消息发送到交换机失败,原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'xxxx' in vhost '/', class-id=60, method-id=40)
  • (2) 在调用发送消息的接口后,由于交换机名称不正确,消息无法发送到任何交换机,导致消息投递失败,最终 Broker 会发送消息投递失败的通知(NACK)给生产者,即 RabbitMQ 的发布确认机制生效了。

测试用例三

  • (1) 启动 SpringBoot 应用后,通过浏览器调用接口 http://127.0.0.1:8080/sendErrorRoutingKeyMsg/hello,SpringBoot 应用会在控制台打印以下日志信息:
1
2
3
INFO 71199 --- [nio-8080-exec-1] c.c.r.controller.ProducerController      : 当前时间: Wed Apr 22 22:27:23 CST 2022, 发送一条信息给队列: hello
INFO 71199 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 a08f5856-e75e-496b-b95b-a1cb6220b954 的消息成功发送到交换机
ERROR 71199 --- [nectionFactory1] c.c.r.callback.CustomReturnsCallback : 消息 hello 被交换机 confirm.exchange 退回, 路由键: xxxx, 退回原因: NO_ROUTE
  • (2) 在调用发送消息的接口后,由于路由键(RoutingKey)不正确,交换机无法将消息路由到任何队列,导致消息投递失败,最终 Broker 会将消息退回给生产者,即 RabbitMQ 的消息退回机制生效了。

测试用例四

  • (1) 启动 SpringBoot 应用后,通过浏览器调用接口 http://127.0.0.1:8080/sendMsg/hello

  • (2) 在消费者手动确认消费(ACK)之前,立刻杀死 SpringBoot 应用的进程,此时 RabbitMQ 控制台显示的队列内容如下:

  • (3) SpringBoot 应用的进程被杀死后,等待一段时间,可以看到之前未被 ACK 的消息会重新入队,此时 RabbitMQ 控制台显示的队列内容如下:

  • (4) 当在 RabbitMQ 控制台中看到未被 ACK 的消息重新入队,这就说明了 RabbitMQ 的消费确认机制(手动确认)生效了。