RabbitMQ 入门教程之七

大纲

前言

学习资源

RabbitMQ 延迟队列实现

概念介绍

延迟队列(Delayed Queue)是指:消息被发送到队列中,并不会立刻被消费者消费,而是延迟一段时间后才会被消费。延迟队列的内部是有序的,最重要的特性就体现在它的延迟属性上,延迟队列中的消息是希望在指定时间到了以后取出和处理。在 RabbitMQ 中,并不是原生直接支持延迟队列功能,但可以通过以下两种方式来实现。

  • 延迟队列的实现方式

    • (1) 基于 TTL(生存时间)+ DLX(死信交换机)实现延迟队列
      • 设置队列或者消息的 TTL(生存时间)。
      • 消息在 TTL(生存时间)到期后不会被消费,而是被投递到 DLX(死信交换机)。
      • DLX(死信交换机)再将消息转发到死信队列,供消费者实时消费。
    • (2) 基于 RabbitMQ 插件实现真正的延迟队列
      • RabbitMQ 插件 rabbitmq_delayed_message_exchange 实现了消息的 “延迟投递” 功能。
      • 使用了自定义的交换机类型 x-delayed-message
      • 每条消息可单独指定延迟时间(毫秒级)。
      • 插件实现方式比 TTL(生存时间)+ DLX(死信交换机)实现方式更加可靠、更加灵活高效。
  • 延迟队列的核心特性

    • 延迟投递:支持在特定的延迟时间后将消息投递给消费者。
    • 简单配置:通过设置参数即可创建延迟队列,无需修改业务逻辑。
    • 兼容性强:与 RabbitMQ 其它功能(如消息确认、持久化等)兼容。
    • 灵活性:可以根据需求灵活设置延迟时间,支持不同的时间值。
  • 延迟队列的使用场景

    • 订单支付超时处理
      • 用户下单后,延迟(如 30 分钟)检查订单是否已支付,未支付则取消订单。
    • 商品库存释放
      • 用户下单未完成支付时,延迟释放预占库存。
    • 定时任务
      • 定时发送通知、邮件或短信。
      • 延迟执行某些业务逻辑,如提醒、推送等。
    • 重试机制
      • 某些任务执行失败后,延迟一段时间再次尝试执行。
    • 流量控制
      • 在高峰期平滑处理请求,通过延迟处理来降低瞬时负载,避免瞬时流量冲击。
    • 用户行为跟踪
      • 根据用户活动延迟执行某些操作,如发送建议或提醒。

延迟队列在企业项目中的实战应用

延迟队列在企业项目中的真实应用场景如下:

  • 订单在三十分钟之内未支付,则自动取消订单。
  • 新创建的店铺,如果在十天内都没有上架过商品,则自动发送消息提醒。
  • 用户注册成功后,如果三天内没有登陆,则发送短信提醒。
  • 用户发起退款后,如果三天内没有得到处理,则通知相关运营人员。
  • 预定会议室后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在三十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭。这看起来似乎可以使用定时任务来实现,一直轮询订单数据,每秒查询一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于 “如果账单一周内未支付则进行自动结算” 这样的需求;如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的订单,确实也是一个可行的方案。但对于数据量比较大,并且时效性要求较强的业务场景,比如:对于 “订单三十分钟内未支付则关闭”,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用定时任务的轮询方式显然是不可取的,很可能在一秒内无法完成所有订单的支付状态检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。在这种业务场景下,应该使用延迟队列来实现,如图所示

使用案例

本节将演示 RabbitMQ 如何基于 TTL(生存时间)+ DLX(死信交换机)实现延迟队列。这里将创建两个普通队列 QA 和 QB,两个队列的 TTL 属性分别设置为 10s 和 40s;然后,再创建一个交换机 X 和死信交换机 Y,它们的交换机类型都是 direct;最后,创建一个死信队列 QD,它们的绑定关系如下图所示:

代码下载

本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-13

版本说明

本案例使用的各软件版本如下表所示:

组件版本说明
RabbitMQ Server3.8.26RabbitMQ 服务器
SpringBoot2.7.18
Erlang24.2
Java11

案例代码

  • Maven 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<properties>
<spring-boot.version>2.7.18</spring-boot.version>
</properties>

<dependencyManagement>
<dependencies>
<!-- SpringBoot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!--Web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--RabbitMQ 核心依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!--Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--Test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
  • 配置文件(application.yml
1
2
3
4
5
6
7
8
9
10
11
12
server:
port: 8080

spring:
application:
name: rabbit-application
rabbitmq:
host: 192.168.2.127
port: 5672
password: admin
username: admin
virtual-host: /
  • Java 配置类(负责队列、交换机的声明和绑定)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class QueueConfig {

// 死信交换机的名称
public static final String DEAD_LETTER_EXCHANGE = "Y";

// 死信队列的名称
public static final String DEAD_LETER_QUEUE = "QD";

// 死信队列的路由键(绑定键)
public static final String DEAD_LETTER_ROUTING_KEY = "YD";

// 普通交换机的名称
public static final String NORMAL_EXCHANGE = "X";

// 普通队列的名称
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";

// 普通队列的路由键(绑定键)
public static final String ROUTING_KEY_QUEUE_A = "XA";
public static final String ROUTING_KEY_QUEUE_B = "XB";

/**
* 声明死信交换机
*/
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}

/**
* 声明死信队列
*/
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETER_QUEUE);
}

/**
* 声明普通交换机
*/
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}

/**
* 声明普通队列 QA
*/
@Bean("queueA")
public Queue queueA() {
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信队列的路由键(RoutingKey)
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
// 声明当前队列的 TTL(单位毫秒)
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}

/**
* 声明普通队列 QB
*/
@Bean("queueB")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信队列的路由键(RoutingKey)
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
// 声明当前队列的 TTL(单位毫秒)
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}

// 绑定死信交换机和死信队列
@Bean
public Binding bindingDeadLetter(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with(DEAD_LETTER_ROUTING_KEY);
}

// 绑定普通交换机和普通队列 QA
@Bean
public Binding BindingQueueA(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with(ROUTING_KEY_QUEUE_A);
}

// 绑定普通交换机和普通队列 QB
@Bean
public Binding BindingQueueB(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with(ROUTING_KEY_QUEUE_B);
}

}
  • 生产者代码(负责发送消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.clay.rabbitmq.config.QueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;

@Slf4j
@RestController
@RequestMapping("/delay")
public class SendMsgController {

@Resource
private RabbitTemplate rabbitTemplate;

/**
* 发送消息
*/
@GetMapping("/sendMsg/{msg}")
public String sendMsg(@PathVariable("msg") String message) {
log.info("当前时间: {}, 发送一条信息给两个 TTL 队列: {}", new Date(), message);
rabbitTemplate.convertAndSend(QueueConfig.NORMAL_EXCHANGE, QueueConfig.ROUTING_KEY_QUEUE_A, "消息来自 TTL 为 10s 的队列: " + message);
rabbitTemplate.convertAndSend(QueueConfig.NORMAL_EXCHANGE, QueueConfig.ROUTING_KEY_QUEUE_B, "消息来自 TTL 为 40s 的队列: " + message);
return "success";
}

}
  • 消费者代码(负责接收消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import com.clay.rabbitmq.config.QueueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.Date;

@Slf4j
@Component
public class DeadLetterQueueConsumer {

/**
* 消费消息
*/
@RabbitListener(queues = QueueConfig.DEAD_LETER_QUEUE)
public void receiveDeadLetter(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("当前时间: {}, 接收到死信队列信息: {}", new Date(), msg);
}

}
  • 主启动类
1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitApplication {

public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}

}

特别注意

Spring Boot 2.x 之前,需要在主启动类上添加 @EnableRabbit 注解。

代码测试

  • (1) 启动 SpringBoot 应用,然后通过浏览器调用接口 http://127.0.0.1:8080/delay/sendMsg/hello 来发送消息

  • (2) SpringBoot 应用会在控制台打印以下日志信息:

1
2
3
INFO 2164 --- [nio-8080-exec-1] c.c.r.controller.SendMsgController       : 当前时间: Sat Apr 06 21:24:05 CST 2021, 发送一条信息给两个 TTL 队列: hello
INFO 2164 --- [ntContainer#0-1] c.c.r.consumer.DeadLetterQueueConsumer : 当前时间: Sat Apr 06 21:25:15 CST 2021, 接收到死信队列信息: 消息来自 TTL 为 10s 的队列: hello
INFO 2164 --- [ntContainer#0-1] c.c.r.consumer.DeadLetterQueueConsumer : 当前时间: Sat Apr 06 21:25:45 CST 2021, 接收到死信队列信息: 消息来自 TTL 为 40s 的队列: hello
  • (3) 从打印的日志信息可以发现:队列 QA 里的消息会在 10s 后变成死信消息,然后经过死信交换机被投递到死信队列中,最后由消费者消费掉。

  • (4) 从打印的日志信息可以发现:队列 QB 里的消息会在 40s 后变成死信消息,然后经过死信交换机被投递到死信队列中,最后由消费者消费掉。

RabbitMQ 延迟队列优化

背景介绍

如果按照上面案例这样使用延迟队列的话,岂不是每增加一个新的时间需求,就要新增一个队列。上面只有 10s 和 40s 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列。如果是预定会议室,然后提前通知这样的业务场景,岂不是要增加无数个队列才能满足需求?当然不是了,可以新增了一个队列 QC,该队列不设置 TTL 时间,而是在生产者发送消息到该队列的时候,才指定消息的 TTL(生存时间),更改后的绑定关系如下图所示:

特别注意

  • 当设置了消息或者队列的 TTL,即使消息过期了,也不一定会被立刻丢弃(如果配置了死信队列,会被转发到死信队列)。因为消息是否过期是在即将投递给消费者之前判定的,如果队列有严重的消息积压情况,则已过期的消息可能还会存活较长一段时间。
  • 无论是设置消息的 TTL,还是设置队列的 TTL(x-message-ttl),RabbitMQ 的 TTL 判定是惰性的,不是主动的。死信机制依赖于 "消息被投递或检查时",才会判定是否需要丢进死信队列。如果需要更实时的 TTL 处理机制,可以考虑使用延迟队列插件。
  • 如果不设置 TTL,表示消息永远不会过期;如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃(如果配置了死信队列,会被转发到死信队列)。

使用案例

本节将演示 RabbitMQ 如何基于 TTL(生存时间)+ DLX(死信交换机)实现延迟队列。这里在上面案例的基础上,新增了一个队列 QC,该队列不设置 TTL 时间,而是在生产者发送消息到该队列的时候,才指定消息的 TTL(生存时间),更改后的绑定关系如下图所示:

代码下载

本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-14

版本说明

本案例使用的各软件版本如下表所示:

组件版本说明
RabbitMQ Server3.8.26RabbitMQ 服务器
SpringBoot2.7.18
Erlang24.2
Java11

案例代码

  • Maven 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<properties>
<spring-boot.version>2.7.18</spring-boot.version>
</properties>

<dependencyManagement>
<dependencies>
<!-- SpringBoot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!--Web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--RabbitMQ 核心依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!--Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--Test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
  • 配置文件(application.yml
1
2
3
4
5
6
7
8
9
10
11
12
server:
port: 8080

spring:
application:
name: rabbit-application
rabbitmq:
host: 192.168.2.127
port: 5672
password: admin
username: admin
virtual-host: /
  • Java 配置类(负责队列、交换机的声明和绑定)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class QueueConfig {

// 死信交换机的名称
public static final String DEAD_LETTER_EXCHANGE = "Y";

// 死信队列的名称
public static final String DEAD_LETER_QUEUE = "QD";

// 死信队列的路由键(绑定键)
public static final String DEAD_LETTER_ROUTING_KEY = "YD";

// 普通交换机的名称
public static final String NORMAL_EXCHANGE = "X";

// 普通队列的名称
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String QUEUE_C = "QC";

// 普通队列的路由键(绑定键)
public static final String ROUTING_KEY_QUEUE_A = "XA";
public static final String ROUTING_KEY_QUEUE_B = "XB";
public static final String ROUTING_KEY_QUEUE_C = "XC";

/**
* 声明死信交换机
*/
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}

/**
* 声明死信队列
*/
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETER_QUEUE);
}

/**
* 声明普通交换机
*/
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}

/**
* 声明普通队列 QA
*/
@Bean("queueA")
public Queue queueA() {
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信队列的路由键(RoutingKey)
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
// 声明当前队列的 TTL(单位毫秒)
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}

/**
* 声明普通队列 QB
*/
@Bean("queueB")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信队列的路由键(RoutingKey)
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
// 声明当前队列的 TTL(单位毫秒)
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}

/**
* 声明普通队列 QC
*/
@Bean("queueC")
public Queue queueC() {
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信队列的路由键(RoutingKey)
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
// 不声明当前队列的 TTL
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}

// 绑定死信交换机和死信队列
@Bean
public Binding bindingDeadLetter(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with(DEAD_LETTER_ROUTING_KEY);
}

// 绑定普通交换机和普通队列 QA
@Bean
public Binding BindingQueueA(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with(ROUTING_KEY_QUEUE_A);
}

// 绑定普通交换机和普通队列 QB
@Bean
public Binding BindingQueueB(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with(ROUTING_KEY_QUEUE_B);
}

// 绑定普通交换机和普通队列 QC
@Bean
public Binding BindingQueueC(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with(ROUTING_KEY_QUEUE_C);
}

}
  • 生产者代码(负责发送消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.clay.rabbitmq.config.QueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;

@Slf4j
@RestController
@RequestMapping("/delay")
public class SendMsgController {

@Resource
private RabbitTemplate rabbitTemplate;

@GetMapping("/sendExpireMsg/{msg}/{ttl}")
public String sendExpireMsg(@PathVariable("msg") String message, @PathVariable("ttl") String ttl) {
log.info("当前时间: {}, 发送一条时长 {} 毫秒 的 TTL 信息给队列: {}", new Date(), ttl, message);
rabbitTemplate.convertAndSend(QueueConfig.NORMAL_EXCHANGE, QueueConfig.ROUTING_KEY_QUEUE_C, message, msg -> {
// 指定消息的 TTL(生存时间)
msg.getMessageProperties().setExpiration(ttl);
return msg;
});
return "success";
}

}
  • 消费者代码(负责接收消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import com.clay.rabbitmq.config.QueueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.Date;

@Slf4j
@Component
public class DeadLetterQueueConsumer {

/**
* 消费消息
*/
@RabbitListener(queues = QueueConfig.DEAD_LETER_QUEUE)
public void receiveDeadLetter(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("当前时间: {}, 接收到死信队列信息: {}", new Date(), msg);
}

}
  • 主启动类
1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitApplication {

public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}

}

特别注意

Spring Boot 2.x 之前,需要在主启动类上添加 @EnableRabbit 注解。

代码测试

  • (1) 启动 SpringBoot 应用,然后通过浏览器调用以下接口:

    • http://127.0.0.1:8080/delay/sendExpireMsg/hello1/20000
    • http://127.0.0.1:8080/delay/sendExpireMsg/hello2/10000
  • (2) SpringBoot 应用会在控制台打印以下日志信息:

1
2
3
4
INFO 61839 --- [nio-8080-exec-3] c.c.r.controller.SendMsgController       : 当前时间: Mon Apr 06 22:21:21 CST 2021, 发送一条时长 20000 毫秒 的 TTL 信息给队列: hello1
INFO 61839 --- [nio-8080-exec-4] c.c.r.controller.SendMsgController : 当前时间: Mon Apr 06 22:21:25 CST 2021, 发送一条时长 10000 毫秒 的 TTL 信息给队列: hello2
INFO 61839 --- [ntContainer#0-1] c.c.r.consumer.DeadLetterQueueConsumer : 当前时间: Mon Apr 06 22:21:51 CST 2021, 接收到死信队列信息: hello1
INFO 61839 --- [ntContainer#0-1] c.c.r.consumer.DeadLetterQueueConsumer : 当前时间: Mon Apr 06 22:22:51 CST 2021, 接收到死信队列信息: hello2
  • (3) 从打印的日志信息可以发现:队列 QC 里的第一条消息会在 20s 后变成死信消息,然后经过死信交换机被投递到死信队列中,最后由消费者消费掉。

  • (4) 从打印的日志信息可以发现:队列 QC 里的第二条消息虽然设置了 TTL 为 10s,但它也是在 20s 后才变成死信消息,然后经过死信交换机被投递到死信队列中,最后由消费者消费掉。

  • (5) 第二条消息之所以没有比第一条消息先被消费,是因为消息是否过期是在即将投递给消费者之前判定的,如果队列有严重的消息积压情况,则已过期的消息可能还会存活较长一段时间。也就是说,如果使用在消息或者队列属性上设置 TTL 的方式,消息过期后可能并不会立刻被丢弃;因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列;如果第一个消息的 TTL(生存时间)很长,即使第二个消息的 TTL(生存时间)很短,第二个消息也不会优先得到处理(比如丢到死信队列)。为了使用更实时的 TTL 处理机制来实现延迟队列,需要使用到延迟队列插件。

RabbitMQ 延迟队列插件

延迟队列插件的背景

当设置了消息或者队列的 TTL,即使消息过期了,也不一定会被立刻丢弃(如果配置了死信队列,会被转发到死信队列)。因为消息是否过期是在即将投递给消费者之前判定的,如果队列有严重的消息积压情况,则已过期的消息可能还会存活较长一段时间。简而言之,RabbitMQ 的 TTL 判定是惰性的,不是主动的。死信机制依赖于 “消息被投递或检查时”,才会判定是否需要丢进死信队列。为了彻底解决这个问题,需要使用到 RabbitMQ 的延迟队列插件。下图是延迟队列两种实现方式的对比:

延迟队列插件的安装

官网 下载延迟队列插件(rabbitmq_delayed_message_exchange),然后将文件拷贝到 RabbitMQ 安装目录下的插件目录(plgins)。最后,进入 RabbitMQ 安装目录下的插件目录(plgins),执行以下命令启用延迟队列插件,然后重启 RabbitMQ 服务。

  • 下载延迟队列插件(特别注意:延迟队列插件的版本必须与 RabbitMQ 版本兼容
1
2
3
4
5
# 下载延迟队列插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.17/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez

# 拷贝文件到插件目录
cp rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez /opt/rabbitmq-3.8.26/plugins/
  • 启用延迟队列插件
1
2
3
4
5
6
7
8
# 进入插件目录
cd /opt/rabbitmq-3.8.26/plugins/

# 启用延迟队列插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 查看延迟队列插件是否启用
rabbitmq-plugins list
  • 重启 RabbitMQ 服务
1
systemctl restart rabbitmq-server
  • 延迟队列插件安装成功后,在 RabbitMQ 的 Web 控制台中,打开 Exchanges 标签页,点击 Add a new exchange,就可以看到新的交换机类型(x-delayed_message),如下图所示:

延迟队列插件的使用

本节将演示如何使用 RabbitMQ 的延迟队列插件来实现延迟队列。这里定义了一个队列 delayed.queue 和一个自定义交换机 delayed.exchange在自定义的交换机中,使用了一种新的交换机类型 - 延迟交换机(x-delayed-message),由延迟队列插件提供,该交换机支持消息延迟投递机制;当消息投递到延迟交换机后,消息并不会立即被投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中;当消息到达投递时间时,延迟交换机才会将消息真正投递到目标队列中。队列与交换机的绑定关系如下图所示:

代码下载

本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-15

版本说明

本案例使用的各软件版本如下表所示:

组件版本说明
RabbitMQ Server3.8.26RabbitMQ 服务器
SpringBoot2.7.18
Erlang24.2
Java11

案例代码

  • Maven 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<properties>
<spring-boot.version>2.7.18</spring-boot.version>
</properties>

<dependencyManagement>
<dependencies>
<!-- SpringBoot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!--Web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--RabbitMQ 核心依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!--Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--Test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
  • 配置文件(application.yml
1
2
3
4
5
6
7
8
9
10
11
12
server:
port: 8080

spring:
application:
name: rabbit-application
rabbitmq:
host: 192.168.2.127
port: 5672
password: admin
username: admin
virtual-host: /
  • Java 配置类(负责队列、交换机的声明和绑定)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class QueueConfig {

// 自定义交换机的类型
public static final String CUSTOM_EXCHANGE_TYPE = "x-delayed-message";

// 延迟交换机的名称
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";

// 延迟队列的名称
public static final String DELAYED_QUEUE_NAME = "delayed.queue";

// 延迟队列的路由键(绑定键)
public static final String DELAYED_QUEUE_ROUTING_KEY = "delayed.routing.key";

// 声明延迟交换机
@Bean("delayedExchange")
public CustomExchange delayedExchange() {
Map<String, Object> arguments = new HashMap<>(1);
// 当消息的延迟时间到达之后,使用哪种类型的交换机逻辑来处理消息路由,例如 direct、fanout、topic、headers
arguments.put("x-delayed-type", "direct");

// 自定义交换机
// 参数说明:
// name 交换机的名称,用于在 RabbitMQ 中标识该交换机
// type 交换机的类型,例如 direct、fanout、topic、headers、x-delayed-message
// durable 是否持久化,如果为 true,则在 RabbitMQ 服务重启后该交换机仍然存在
// autoDelete 是否自动删除,如果为 true,则在没有队列绑定到该交换机时会被自动删除
// arguments 额外的配置参数
return new CustomExchange(DELAYED_EXCHANGE_NAME, CUSTOM_EXCHANGE_TYPE, true, false, arguments);
}

// 声明延迟队列
@Bean("delayedQueue")
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}

// 绑定延迟交换机和延迟队列
@Bean
public Binding delayedBinding(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_QUEUE_ROUTING_KEY).noargs();
}

}
  • 生产者代码(负责发送消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.clay.rabbitmq.config.QueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;

@Slf4j
@RestController
@RequestMapping("/delay")
public class SendMsgController {

@Resource
private RabbitTemplate rabbitTemplate;

@GetMapping("/sendDelayMsg/{msg}/{delayTime}")
public String sendDelayMsg(@PathVariable("msg") String message, @PathVariable("delayTime") Integer delayTime) {
log.info("当前时间: {}, 发送一条时长 {} 毫秒的信息给延迟队列: {}", new Date(), delayTime, message);
rabbitTemplate.convertAndSend(QueueConfig.DELAYED_EXCHANGE_NAME, QueueConfig.DELAYED_QUEUE_ROUTING_KEY, message, msg -> {
// 指定消息的延迟时间(单位毫秒)
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
return "success";
}

}
  • 消费者代码(负责接收消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import com.clay.rabbitmq.config.QueueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.Date;

@Slf4j
@Component
public class DelayedQueueConsumer {

@RabbitListener(queues = QueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("当前时间: {}, 接收到延迟队列信息: {}", new Date(), msg);
}

}
  • 主启动类
1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitApplication {

public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}

}

特别注意

Spring Boot 2.x 之前,需要在主启动类上添加 @EnableRabbit 注解。

代码测试

  • (1) 启动 SpringBoot 应用,然后通过浏览器调用以下接口:

    • http://127.0.0.1:8080/delay/sendDelayMsg/hello1/20000
    • http://127.0.0.1:8080/delay/sendDelayMsg/hello2/10000
  • (2) SpringBoot 应用会在控制台打印以下日志信息:

1
2
3
4
INFO 64113 --- [nio-8080-exec-2] c.c.r.controller.SendMsgController       : 当前时间: Mon Apr 06 22:41:35 CST 2021, 发送一条时长 20000 毫秒的信息给延迟队列: hello1
INFO 64113 --- [nio-8080-exec-4] c.c.r.controller.SendMsgController : 当前时间: Mon Apr 06 22:41:40 CST 2021, 发送一条时长 10000 毫秒的信息给延迟队列: hello2
INFO 64113 --- [ntContainer#0-1] c.c.r.consumer.DelayedQueueConsumer : 当前时间: Mon Apr 06 22:41:50 CST 2021, 接收到延迟队列信息: hello2
INFO 64113 --- [ntContainer#0-1] c.c.r.consumer.DelayedQueueConsumer : 当前时间: Mon Apr 06 22:41:55 CST 2021, 接收到延迟队列信息: hello1
  • (3) 从打印的日志信息可以发现:第二条发送的消息会在 10s 后经过延迟交换机被投递到延迟队列中,最后由消费者消费掉。

  • (4) 从打印的日志信息可以发现:第一条发送的消息会在 20s 后经过延迟交换机被投递到延迟队列中,最后由消费者消费掉。

  • (5) 也就是说,在使用延迟队列插件后,延迟时间较短的消息(第二条消息)会被先消费掉,符合预期结果。

RabbitMQ 延迟队列总结

延迟队列在需要延迟处理的业务场景下是非常有用的,使用 RabbitMQ 来实现延迟队列可以很好的利用 RabbitMQ 的特性,比如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好地解决单点故障问题,不会因为单个节点挂掉而导致延迟队列不可用或者消息丢失。当然,延迟队列还有很多其它实现方案,比如利用 Java 的 DelayQueue、利用 Redis 的 zset,利用 Quartz、利用 Kafka 的时间轮,这些方案各有各的特点,需要根据不同的业务场景进行选择。