大纲 前言 学习资源 版本说明 本文所有案例代码使用的各软件版本如下表所示:
组件 版本 说明 RabbitMQ Server 3.8.26RabbitMQ Client 5.10.0Erlang 24.2Java 11
RabbitMQ 消息持久化 概念介绍 RabbitMQ 的手动确认机制可以保证消费者在处理消息时不会出现丢失消息的情况,但是如何保障当 RabbitMQ 服务器宕机,之前消息生产者发送过来的消息不丢失呢? 默认情况下 RabbitMQ 退出或由于某种原因宕机时,它会忽视队列和消息,除非告知它不要这样做。 RabbitMQ 确保消息不会丢失需要做两件事,分别是需要将队列和消息都标记为持久化,后者只需要由生产者来完成即可。 特别注意:即使同时将队列和消息都标记为持久化,RabbitMQ 仍然无法完全保证消息不丢失,需要配合其他机制才能实现(比如 RabbitMQ 提供的发布确认机制)。 队列实现持久化 当创建的队列是非持久化的,如果 RabbitMQ 服务器重启,该队列就会被删除掉。如果要队列实现持久化,需要在生产者 / 消费者声明队列的时候把 durable 参数设置为 true。
1 2 3 4 5 6 7 8 9 10 11 boolean durable = true ;channel.queueDeclare(QUEUE_NAME, durable, false , false , null );
特别注意
如果之前声明的队列不是持久化的,则必须将原来的队列先删除,或者创建一个新的持久化队列,不然 RabbitMQ 的客户端会出现错误,如图所示 。
下图是在 RabbitMQ 控制台中非持久化队列与持久化队列的 UI 显示区别,持久化队列的 Feature 属性为 D,这个时候即使重启 RabbitMQ 服务器,队列也依然存在。
消息实现持久化 若希望让 RabbitMQ 消息实现持久化,除了需要声明持久化队列之外,还需要将消息标记为持久化,也就是需要在消息生产者发送消息时,添加 MessageProperties.PERSISTENT_TEXT_PLAIN 这个属性。
1 2 3 4 5 6 7 8 9 10 AMQP.BasicProperties msgProperties = MessageProperties.PERSISTENT_TEXT_PLAIN; channel.basicPublish("" , QUEUE_NAME, msgProperties, "hello" .getBytes(StandardCharsets.UTF_8));
特别注意
在使用 RabbitMQ 的时候,即使同时将队列和消息都标记为持久化,但这仍然无法完全避免由于缓存机制带来的数据丢失问题。因为从消息发送到真正写入磁盘并完成刷盘(Flush)之间存在一个短暂的时间窗口,在此期间如果节点宕机,消息依然可能丢失。例如,当消息刚进入缓存,尚未完全写入磁盘时,如果 RabbitMQ 服务器突然宕机,该消息可能无法恢复导致丢失。因此,RabbitMQ 的持久化机制并不能提供绝对可靠的保障,但对于一些简单的任务队列应用而言,已足够满足需求。在高可靠性要求的场景中,应该结合使用队列持久化、消息持久化、发布确认机制、镜像队列(或者 Quorum Queue)等机制,才能构建更稳健的消息投递保障体系。
案例代码 本节将演示如何将队列和消息标记为持久化,以此避免 RabbitMQ 服务器重启后导致消息丢失的问题。
代码下载
本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-06。
1 2 3 4 5 <dependency > <groupId > com.rabbitmq</groupId > <artifactId > amqp-client</artifactId > <version > 5.10.0</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;public class MQProducer { public static final String QUEUE_NAME = "test" ; public static void main (String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.2.127" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("admin" ); try ( Connection connection = factory.newConnection(); Channel channel = connection.createChannel() ) { boolean durable = true ; channel.queueDeclare(QUEUE_NAME, durable, false , false , null ); AMQP.BasicProperties msgProperties = MessageProperties.PERSISTENT_TEXT_PLAIN; channel.basicPublish("" , QUEUE_NAME, msgProperties, "hello" .getBytes(StandardCharsets.UTF_8)); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;import java.util.Scanner;public class MQConsumer { public static final String QUEUE_NAME = "test" ; public static void main (String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.2.127" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("admin" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); boolean durable = true ; channel.queueDeclare(QUEUE_NAME, durable, false , false , null ); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); try { Thread.sleep(60000 ); System.out.println("Successed to consume message : " + msg); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("Failed to consume message : " + consumerTag); }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback); System.out.println("按回车键退出程序:" ); new Scanner(System.in).nextLine(); channel.close(); connection.close(); } }
代码测试
(3) 重启 RabbitMQ 服务器,等重启完成后,如果在 RabbitMQ 的控制台,仍然可以看到有未被消费(Ready)或者正在消费(Unacked)的消息,则说明队列和消息的持久化都生效了,如下图所示:
(4) 最终消息会被消费者消费(注意:这里可能会出现重复消费的现象),消费者的控制台输出结果如下: 1 2 按回车键退出程序: Successed to consume message : hello
RabbitMQ 发布确认 扩展阅读
基于 SpringBoot 使用 RabbitMQ 的发布确认机制、消息退回机制和备份交换机,可以参考这里 的教程。
发布确认的概念 当生产者将信道设置为 Confirm 模式后,在该信道上发布的每条消息都会被分配一个从 1 开始递增的唯一 ID。 一旦消息成功投递到所有匹配的队列,Broker 便会发布确认消息给生产者,其中包含该消息的唯一 ID,从而让生产者确认消息已正确到达目标队列。如果消息和队列是持久化的,Broker 会在将消息写入磁盘后再执行发布确认(如下图所示)。 确认消息的 delivery-tag 字段表示被确认的消息序列号。此外,Broker 还可以在 basic.ack() 中设置 multiple 标志,表示该序列号及其之前的所有消息均已确认(即批量确认)。Confirm 模式的最大优势在于其异步特性。生产者发布消息后,无需等待确认即可继续发送下一条消息,而信道会在消息最终被确认时,通过回调方法通知生产者应用进行处理。如果 RabbitMQ 由于内部错误导致消息丢失,它会发送一条 nack(否定确认)消息,生产者应用同样可以在回调方法中处理该 nack,以便采取相应措施(比如重新发布消息)。
发布确认机制的缺陷
发布确认机制是 RabbitMQ 提供的一种用于确认生产者是否将消息成功发送到交换机的机制 。它允许生产者在发送消息后收到 Broker 的确认(ACK)或否认(NACK)反馈,从而确认消息是否真正被 RabbitMQ 接收和处理。特别注意,在仅启用 RabbitMQ 发布确认机制的情况下,交换机接收到消息后,会直接给生产者发送确认消息;如果交换机在投递消息时,发现无法将消息路由到任何队列(比如路由键写错、没有匹配的队列等),那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的 。为了解决这个问题,需要使用到 RabbitMQ 的消息退回机制 或者备份交换机 。
为什么需要发布确认机制
在使用 RabbitMQ 的时候,即使同时将队列和消息都标记为持久化,但这仍然无法完全避免由于缓存机制带来的数据丢失问题。因为从消息发送到真正写入磁盘并完成刷盘(Flush)之间存在一个短暂的时间窗口,在此期间如果节点宕机,消息依然可能丢失。例如,当消息刚进入缓存,尚未完全写入磁盘时,如果 RabbitMQ 服务器突然宕机,该消息可能无法恢复导致丢失。因此,RabbitMQ 的持久化机制并不能提供绝对可靠的保障,但对于一些简单的任务队列应用而言,已足够满足需求。在高可靠性要求的场景中,应该结合使用队列持久化、消息持久化、发布确认机制、镜像队列(或者 Quorum Queue)等机制,才能构建更稳健的消息投递保障体系。
发布确认的开启 RabbitMQ 的发布确认机制默认是没有开启的,如果要开启需要调用方法 channel.confirmSelect(),每当要想使用发布确认机制,都需要在 Channel 上调用该方法。
1 2 3 4 5 Channel channel = connection.createChannel() channel.confirmSelect();
在生产者发送消息后,可以使用以下 API 让生产者等待 Broker 的发布确认响应:
方法 返回值 超时时间 失败时行为 适用场景 waitForConfirms()boolean无 有未确认消息时返回 false,不会关闭 Channel 需要手动处理失败的情况 waitForConfirms(long timeout)boolean有 超时或有未确认消息时返回 false,不会关闭 Channel 限制等待时间,避免长时间阻塞 waitForConfirmsOrDie()void无 有未确认消息时抛出 IOException 并关闭 Channel 适用于希望在失败时终止 Channel 的情况 waitForConfirmsOrDie(long timeout)void有 超时或有未确认消息时抛出 IOException 并关闭 Channel 需要严格保证消息成功且限制等待时间
发布确认的策略 RabbitMQ 提供了三种消息发布确认策略,如下:
单个确认发布(同步单个确认)
原理:发送一条消息后,阻塞等待服务器返回确认消息。 优点: 缺点:吞吐量低,每次发送消息都需要等待确认。 延迟较高,不适用于高并发场景。 批量确认发布(同步批量确认)
原理:一次性发送多条消息,然后等待服务器返回批量确认。 优点:比单个确认发布性能更高,减少了等待时间,提高了吞吐量。 缺点:本质上还是同步确认操作,不适用于高并发场景。 如果出现问题,无法确定具体是哪条消息发布失败,可能导致部分消息丢失或重复发送。 异步确认发布(异步回调确认)
原理:消息发送后不阻塞,RabbitMQ 通过回调机制(ConfirmCallback)异步通知哪些消息被确认或丢失。 优点:异常处理,拥有最高的性能,支持高并发和高吞吐量。 可以在回调中维护消息状态,准确跟踪失败消息。 资源占用更低。 缺点:实现较复杂,需要自己管理未确认的消息队列(如使用 ConcurrentSkipListMap)。 代码逻辑相对较难调试。 单个确认发布 这种发布方式是一种同步确认机制,即每发布一条消息,必须等待其确认后才能继续发布下一条消息。 方法 waitForConfirmsOrDie(long) 仅在消息被确认时才会返回,如果在指定时间内未收到确认,则会抛出异常。这种确认方式有一个最大的缺点就是:发布速度特别的慢。由于未确认的消息会阻塞后续消息的发布,其吞吐量通常仅能达到每秒数百条消息。尽管如此,对于某些应用场景而言,这样的性能可能已经足够。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;import java.util.UUID;public class MQProducer1 { public static void publishMessageIndividually () throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.2.127" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("admin" ); try ( Connection connection = factory.newConnection(); Channel channel = connection.createChannel() ) { String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true , false , false , null ); channel.confirmSelect(); int total = 1000 ; long begin = System.currentTimeMillis(); for (int i = 0 ; i < total; i++) { String message = "消息" + i; channel.basicPublish("" , queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("消息发送成功" ); } else { System.out.println("消息发送失败" ); } } long end = System.currentTimeMillis(); System.out.println("发布" + total + "个单独确认消息,耗时" + (end - begin) + "ms" ); } } }
批量确认发布 相较于逐条发送并确认消息的机制,批量发布消息后统一确认的方式能够显著提升系统吞吐量。 然而这种批量处理策略存在两点主要限制:首先在消息发布异常的场景中,由于采用批量确认机制,系统无法快速定位具体故障消息,需要将整个消息批次持久化存储,以便后续故障诊断和批量重发;其次,虽然采用了批量处理技术,但消息发布过程本质上仍属于同步操作模式,发布线程在等待确认期间仍会处于阻塞状态。 这种设计在提升吞吐量的同时,需要权衡额外的内存资源消耗和异常处理复杂度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;import java.util.UUID;public class MQProducer2 { public static void publishMessageBatch () throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.2.127" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("admin" ); try ( Connection connection = factory.newConnection(); Channel channel = connection.createChannel() ) { String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true , false , false , null ); channel.confirmSelect(); int batchSize = 100 ; int outstandingMessageCount = 0 ; int total = 1000 ; long begin = System.currentTimeMillis(); for (int i = 0 ; i < total; i++) { String message = "消息" + i; channel.basicPublish("" , queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); outstandingMessageCount++; if (outstandingMessageCount == batchSize) { channel.waitForConfirms(); outstandingMessageCount = 0 ; } } if (outstandingMessageCount > 0 ) { channel.waitForConfirms(); } long end = System.currentTimeMillis(); System.out.println("发布" + total + "个批量确认消息,耗时" + (end - begin) + "ms" ); } } }
异步确认发布 异步确认虽然在编程逻辑上比前两种方式更复杂,但在可靠性和效率方面表现最佳,性价比最高。它通过回调函数实现消息的可靠传递,而消息中间件也利用回调机制来确认消息是否成功投递。异步确认发布的实现方案如下图所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConfirmCallback;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;import java.nio.charset.StandardCharsets;import java.util.UUID;import java.util.concurrent.ConcurrentNavigableMap;import java.util.concurrent.ConcurrentSkipListMap;public class MQProducer3 { public static void publishMessageAsync () throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.2.127" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("admin" ); try ( Connection connection = factory.newConnection(); Channel channel = connection.createChannel() ) { String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true , false , false , null ); channel.confirmSelect(); ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); ConfirmCallback ackCallback = (deliveryTag, multiple) -> { if (multiple) { ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true ); confirmed.clear(); } else { outstandingConfirms.remove(deliveryTag); } }; ConfirmCallback nackCallback = (deliveryTag, multiple) -> { String message = outstandingConfirms.get(deliveryTag); System.out.println("发布的消息" + message + "未被确认,消息序列号" + deliveryTag); }; channel.addConfirmListener(ackCallback, nackCallback); int total = 1000 ; long begin = System.currentTimeMillis(); for (int i = 0 ; i < total; i++) { String message = "消息" + i; outstandingConfirms.put(channel.getNextPublishSeqNo(), message); channel.basicPublish("" , queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); } long end = System.currentTimeMillis(); System.out.println("发布" + total + "个异步确认消息,耗时" + (end - begin) + "ms" ); } } }
如何处理异步未确认的消息
最好的解决方案就是把未确认的消息存储到一个基于内存的能被消息发布线程访问的队列,比如使用 ConcurrentLinkedQueue 队列,这个队列在 ConfirmCallback(负责专门处理 NACK 的情况)与消息发布线程之间进行消息的传递,以此实现未确认消息的重新发送。值得一提的是,重新发送消息后,为了避免消费者在消费消息时,可能出现重复消费的问题,消费者需要实现幂等消费。
性能压测结果 RabbitMQ 发布确认策略的压测结果如下表所示:
发布确认策略 消息发送数量 消息发送耗时 单个确认发布 1000 50278 ms 批量确认发布 1000 635 ms 异步确认发布 1000 92 ms