前言
本文将介绍 SpringBoot 如何整合 RabbitMQ,并使用 RabbitMQ 的核心功能,比如消息持久化、发布确认机制、消息退回机制、消费确认机制等。
学习资源
整合案例
代码下载
本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-12。
案例介绍
本案例将整合 SpringBoot 和 RabbitMQ,并演示如何使用 RabbitMQ 的以下功能:
版本说明
本案例使用的各软件版本如下表所示:
| 组件 | 版本 | 说明 |
|---|
| SpringBoot | 2.7.18 | |
| RabbitMQ Server | 3.8.26 | |
| Erlang | 24.2 | |
| Java | 11 | |
案例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| <properties> <spring-boot.version>2.7.18</spring-boot.version> </properties>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| server: port: 8080
spring: application: name: rabbit-application rabbitmq: host: 192.168.2.127 port: 5672 password: admin username: admin virtual-host: / publisher-confirm-type: correlated publisher-returns: true
|
参数 publisher-confirm-type 的使用说明
上面的 publisher-confirm-type 参数用于确认消息是否成功到达交换机(Exchange),也叫做发布确认机制(Publisher Confirms)。该参数的可选值及其含义如下:
| 可选值 | 含义 |
|---|
none | 禁用发布确认机制,是默认值。 |
correlated | 启用基于 CorrelationData 的发布确认机制,会调用自定义实现的 RabbitTemplate.ConfirmCallback 接口中的 confirm() 方法。 |
simple | 启用简单模式的发布确认机制。Spring 会使用内部的发布确认逻辑(不是基于回调的),但不会像 correlated 那样提供精细的回调。 |
特别注意
当 publisher-confirm-type 的值设置为 simple,经测试有两种效果。其一效果和 correlated 值一样会触发 RabbitTemplate.ConfirmCallback 接口的回调方法;其二效果在发布消息成功后,当使用 RabbitTemplate 调用 waitForConfirms() 或 waitForConfirmsOrDie() 方法等待 Broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,此时需要注意的是 waitForConfirmsOrDie() 方法如果返回 false,则会关闭 Channel,导致接下来无法再发送消息到 Broker。
参数 publisher-returns 的使用说明
上面的 publisher-returns 参数用于启用 RabbitMQ 的消息退回机制。通过 publisher-returns: true 启用消息退回机制后,再配合 rabbitTemplate.setMandatory(true);,就可以让自定义实现的 ReturnsCallback 回调方法被触发,这样就能知道哪条消息发送失败了。
特别注意
publisher-returns: true 启用的是 RabbitMQ 的消息退回机制。rabbitTemplate.setMandatory(true); 是告诉 RabbitMQ:"如果消息不能路由成功,就把消息退回给生产者,不要直接丢弃"。- 两者必须搭配使用,
publisher-returns 控制消息退回功能是否启用,mandatory 控制消息是否可退回,然后才能在 RabbitTemplate.ReturnsCallback 中接收到消息退回的通知。
- 消息发布确认的回调实现类(实现了
RabbitTemplate.ConfirmCallback 接口)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;
@Slf4j @Component public class CustomConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String msgId = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("ID 为 {} 的消息成功发送到交换机", msgId); } else { log.error("ID 为 {} 的消息发送到交换机失败,原因:{}", msgId, cause); } }
}
|
特别注意
实现 RabbitTemplate.ConfirmCallback 接口后,如果消息成功发送到交换机,但是交换机根据 RoutingKey 无法将消息路由到匹配的队列,消息会被丢弃,但 ack 参数的值仍然为 true。换句话说,在仅启用发布确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息;如果交换机在投递消息时,发现消息不可路由(比如路由键写错、没有匹配的队列等),那么消息会被直接丢弃,此时消息生产者是不知道消息被丢弃这个事件的。如果需要解决这个问题,则需要使用到 RabbitMQ 的消息退回机制或者备份交换机。
- 消息退回的回调实现类(实现了
RabbitTemplate.ReturnsCallback 接口)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets;
@Slf4j @Component public class CustomReturnsCallback implements RabbitTemplate.ReturnsCallback {
@Override public void returnedMessage(ReturnedMessage returned) { String cause = returned.getReplyText(); String exchagne = returned.getExchange(); String routingKey = returned.getRoutingKey(); String message = new String(returned.getMessage().getBody(), StandardCharsets.UTF_8); log.error("消息 {} 被交换机 {} 退回, 路由键: {}, 退回原因: {}", message, exchagne, routingKey, cause); }
}
|
- RabbitTemplate 的配置类(设置 RabbitTemplate 消息发布确认和消息退回的回调实现类)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitConfig {
@Autowired private RabbitTemplate.ConfirmCallback confirmCallback;
@Autowired private RabbitTemplate.ReturnsCallback returnsCallback;
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnsCallback);
return rabbitTemplate; }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.ExchangeBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.core.TopicExchange; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class QueueConfig {
public static final String ORDER_EXCHANGE_NAME = "order.exchange";
public static final String ORDER_QUEUE_NAME = "order.queue";
public static final String ORDER_QUEUE_ROUTING_KEY = "order";
@Bean("orderExchange") public TopicExchange orderExchange() { return ExchangeBuilder.topicExchange(ORDER_EXCHANGE_NAME).build(); }
@Bean("orderQueue") public Queue orderQueue() { return QueueBuilder.durable(ORDER_QUEUE_NAME).build(); }
@Bean public Binding bindingQueue(@Qualifier("orderQueue") Queue orderQueue, @Qualifier("orderExchange") TopicExchange orderExchange) { return BindingBuilder.bind(orderQueue).to(orderExchange).with(ORDER_QUEUE_ROUTING_KEY); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| import com.clay.rabbitmq.config.QueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.Date; import java.util.UUID;
@Slf4j @RestController public class ProducerController {
@Resource private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}") public String sendMsg(@PathVariable("message") String message) { log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message);
MessageProperties properties = new MessageProperties(); properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); properties.setMessageId(UUID.randomUUID().toString());
Message msg = new Message(message.getBytes(StandardCharsets.UTF_8), properties);
rabbitTemplate.convertAndSend(QueueConfig.ORDER_EXCHANGE_NAME, QueueConfig.ORDER_QUEUE_ROUTING_KEY, msg);
return "success"; }
@GetMapping("/sendErrorExchangeMsg/{message}") public String sendErrorExchangeMsg(@PathVariable("message") String message) { log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message);
rabbitTemplate.convertAndSend("xxxx", QueueConfig.ORDER_QUEUE_ROUTING_KEY, message, msg -> { msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); msg.getMessageProperties().setMessageId(UUID.randomUUID().toString()); return msg; });
return "success"; }
@GetMapping("/sendErrorRoutingKeyMsg/{message}") public String sendErrorRoutingKeyMsg(@PathVariable("message") String message) { log.info("当前时间: {}, 发送一条信息给队列: {}", new Date(), message);
rabbitTemplate.convertAndSend(QueueConfig.ORDER_EXCHANGE_NAME, "xxxx", message, msg -> { msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); msg.getMessageProperties().setMessageId(UUID.randomUUID().toString()); return msg; });
return "success"; }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import com.clay.rabbitmq.config.QueueConfig; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.util.Date;
@Slf4j @Component public class QueueConsumer {
@RabbitListener(queues = QueueConfig.ORDER_QUEUE_NAME, ackMode = "MANUAL") public void receiveMsg(Message message, Channel channel) throws Exception { String content = new String(message.getBody(), StandardCharsets.UTF_8);
Long deliveryTag = message.getMessageProperties().getDeliveryTag();
try { Thread.sleep(5000); log.info("当前时间: {}, 接收到信息: {}", new Date(), content);
channel.basicAck(deliveryTag, false); } catch (Exception e) { channel.basicNack(deliveryTag, false, true); log.error("Consume message failed: {}", e.getMessage()); } }
}
|
1 2 3 4 5 6 7 8 9 10 11
| import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class RabbitApplication {
public static void main(String[] args) { SpringApplication.run(RabbitApplication.class, args); }
}
|
特别注意
Spring Boot 2.x 之前,需要在主启动类上添加 @EnableRabbit 注解。
代码测试
测试用例一
- (1) 启动 SpringBoot 应用后,通过浏览器调用接口
http://127.0.0.1:8080/sendMsg/hello,SpringBoot 应用会在控制台打印以下日志信息:
1 2 3
| INFO 51355 --- [nio-8080-exec-1] c.c.r.controller.ProducerController : 当前时间: Wed Apr 22 22:07:01 CST 2022, 发送一条信息给队列: hello INFO 51355 --- [ntContainer#0-1] c.clay.rabbitmq.consumer.QueueConsumer : 当前时间: Wed Apr 22 22:07:01 CST 2022, 接收到信息: hello INFO 51355 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 4520dc03-0c8a-425d-8230-8440cffa3d12 的消息成功发送到交换机
|
- (2) 在调用发送消息的接口后,当看到消费者可以正常消费消息,则说明交换机、队列、生产者、消费者的配置都正确。
测试用例二
- (1) 启动 SpringBoot 应用后,通过浏览器调用接口
http://127.0.0.1:8080/sendErrorExchangeMsg/hello,SpringBoot 应用会在控制台打印以下日志信息:
1 2
| INFO 51355 --- [nio-8080-exec-1] c.c.r.controller.ProducerController : 当前时间: Wed Apr 22 21:08:15 CST 2022, 发送一条信息给队列: hello INFO 51355 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 56a86b63-2641-4f04-b96b-ef8e64ac20ce 的消息发送到交换机失败,原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'xxxx' in vhost '/', class-id=60, method-id=40)
|
- (2) 在调用发送消息的接口后,由于交换机名称不正确,消息无法发送到任何交换机,导致消息投递失败,最终 Broker 会发送消息投递失败的通知(NACK)给生产者,即 RabbitMQ 的发布确认机制生效了。
测试用例三
- (1) 启动 SpringBoot 应用后,通过浏览器调用接口
http://127.0.0.1:8080/sendErrorRoutingKeyMsg/hello,SpringBoot 应用会在控制台打印以下日志信息:
1 2 3
| INFO 71199 --- [nio-8080-exec-1] c.c.r.controller.ProducerController : 当前时间: Wed Apr 22 22:27:23 CST 2022, 发送一条信息给队列: hello INFO 71199 --- [nectionFactory1] c.c.r.callback.CustomConfirmCallback : ID 为 a08f5856-e75e-496b-b95b-a1cb6220b954 的消息成功发送到交换机 ERROR 71199 --- [nectionFactory1] c.c.r.callback.CustomReturnsCallback : 消息 hello 被交换机 confirm.exchange 退回, 路由键: xxxx, 退回原因: NO_ROUTE
|
- (2) 在调用发送消息的接口后,由于路由键(RoutingKey)不正确,交换机无法将消息路由到任何队列,导致消息投递失败,最终 Broker 会将消息退回给生产者,即 RabbitMQ 的消息退回机制生效了。
测试用例四
![]()
- (3) SpringBoot 应用的进程被杀死后,等待一段时间,可以看到之前未被 ACK 的消息会重新入队,此时 RabbitMQ 控制台显示的队列内容如下:
![]()
- (4) 当在 RabbitMQ 控制台中看到未被 ACK 的消息重新入队,这就说明了 RabbitMQ 的消费确认机制(手动确认)生效了。