RabbitMQ 入门教程之三

大纲

前言

学习资源

版本说明

本文所有案例代码使用的各软件版本如下表所示:

组件版本说明
RabbitMQ Server3.8.26
RabbitMQ Client5.10.0
Erlang24.2
Java11

RabbitMQ 消费确认

RabbitMQ 支持两种消费确认机制(即消息应答机制),包括自动确认机制(默认)和手动确认机制。

自动确认机制

概念介绍

  • 自动确认机制(AutoAck)就是消息投递给消费者后立即被认为已经投递(处理)成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡。因为在这种模式下,如果消息在消费者接收到之后,消费者突然宕机了,没来得及处理消息,这就会造成消息丢失。特别注意,RabbitMQ 默认使用了自动确认机制,不需要手动开启。
  • 另一方面,在这种模式下消费者那边可以接收过量的消息,没有对投递的消息数量进行限制,这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率处理这些消息的情况下使用。

手动确认机制

概念介绍

  • 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个耗时长的任务并仅只完成了部分工作突然它宕机了,那么会发生什么情况呢?
  • 由于 RabbitMQ 有自动确认机制(AutoAck),一旦向消费者投递了一条消息,就会立即将该消息标记为删除。在这种情况下,突然有个消费者宕机了,将会丢失正该消费者正在处理的消息。
  • 为了保证消息在消费过程中不丢失,Rabbitmq 引入消息的手动确认机制,即消费者在接收到消息并且处理完该消息之后,主动告诉 RabbitMQ 该消息已经处理了,RabbitMQ 可以把该消息删除了。
  • 值得一提的是,手动确认的好处是可以批量确认,并且可以减少网络拥堵;但是批量确认使用得较少,为了数据传输安全性建议选择单个确认。
API 使用说明
  • Channel.basicAck(long deliveryTag, boolean multiple)

    • 用于肯定确认消息,通知 RabbitMQ 该消息成功被处理了,可以将其删除了
  • Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

    • 用于否定确认消息,通知 RabbitMQ 该消息处理失败了
  • Channel.basicReject(long deliveryTag, boolean requeue)

    • 用于否定确认消息,通知 RabbitMQ 拒绝接收该消息,与 Channel.basicNack() 相比少了一个 multiple 参数
消息批量确认

消息批量确认使用的参数是 multiple,具体解释如下:

  • multiple = true

    • 表示批量确认 Channel 上所有未确认的消息
    • 比如:Channel 上有传送 Tag 的消息 5、6、7、8,当前 Tag 是 8,那么此时 5 ~ 8 的这些还未确认的消息都会被确认收到消费确认,如下图所示:
  • multiple = false

    • 表示单个确认消息
    • 比如:Channel 上有传送 Tag 的消息 5、6、7、8,当前 Tag 是 8,那么此时只有 8 这个消息会被确认收到消费确认,5 ~ 6 这三个消息依然不会被确认收到消费确认,如下图所示:
消息自动重新入队

消息自动重新入队使用的参数是 requeue,具体解释如下:

  • requeue = false

    • 表示丢弃 / 死信该消息
  • requeue = true

    • 表示重新将该消息重新放入队列中
    • 如果消费者由于某些原因失去连接(比如 Channel 意外关闭),导致消息未发送 ACK 确认,RabbitMQ 将了解到该消息未完全处理,并将该消息重新放入队列中。
    • 如果此时其他消费者可以处理消息,该消息将很快被重新投递给另一个消费者。这样,即使某个消费者在处理消息时突然宕机了,也可以确保不会丢失任何消息,如图所示

案例代码

由于 RabbitMQ 默认使用的是自动确认机制(AutoAck),因此本节将介绍如何使用 RabbitMQ 的手动确认机制,并且使用的是工作队列模式。值得一提的,消费者开启手动确认机制(与生产者没有任何关系),只需要执行以下两个步骤:

  • (1) 在客户端订阅消息时,关闭自动确认机制,比如:channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
  • (2) 在客户端消费到消息的回调(DeliverCallback)里面,手动确认消息,比如:channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

代码下载

本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-03

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
  • 工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtils {

public static ConnectionFactory connectionFactory;

static {
// 创建连接工厂
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.127");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
}

/**
* 创建信道
*/
public static Channel createChannel() throws Exception {
// 创建连接
Connection connection = connectionFactory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

return channel;
}

}
  • 生产者的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQProducer {

// 队列名称
public static final String QUEUE_NAME = "test";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 从控制台获取需要发送的消息
Scanner scanner = new Scanner(System.in);
// 循环发送消息
while (scanner.hasNext()) {
// 消息内容
String message = scanner.nextLine();
System.out.println("发送消息:" + message);

// 发送消息
// 参数说明:
// exchange – 要将消息发布到的交换机,空字符串表示默认交换机
// routingKey – 路由 Key
// props – 消息的其他属性,比如:使用 MessageProperties.PERSISTENT_TEXT_PLAIN 属性确保消息持久化,配合持久化队列可避免服务器重启导致消息丢失
// body – 消息内容
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
}

// 关闭信道
channel.close();
}

}
  • 消费者一的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQConsumer01 {

// 队列名称
public static final String QUEUE_NAME = "test";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);

// 模拟消息的耗时处理
try {
Thread.sleep(5000);
System.out.println("Successed to consume message : " + msg);
} catch (Exception e) {
e.printStackTrace();
}

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者一等待接收消息,处理消息较快...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
}

}
  • 消费者二的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQConsumer02 {

// 队列名称
public static final String QUEUE_NAME = "test";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);

// 模拟消息的耗时处理
try {
Thread.sleep(30000);
System.out.println("Successed to consume message : " + msg);
} catch (Exception e) {
e.printStackTrace();
}

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者二等待接收消息,处理消息较慢...");

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
}

}

代码测试

  • (1) 分别启动生产者和两个消费者应用

  • (2) 在生产者的控制台中,依次输入以下内容:

1
2
3
4
AA
发送消息:AA
BB
发送消息:BB
  • (3) 由于消费者一的消息处理速度较快,在消费者一的控制台中,会输出以下内容
1
2
3
消费者一等待接收消息...
按回车键退出程序:
Successed to consume message : AA
  • (4) 在生产者发送消息 BB 后,手动将消费者二的进程杀死。按理来说 RabbitMQ 默认会轮询分发消息,即消费者二负责处理消息 BB,由于它处理消息的时间较长,在它还未处理完消息,也就是消费者二还没有执行手动 ACK 代码的时候,消费者二的进程就被杀死了。此时,RabbitMQ 控制台显示的内容如下:

  • (5) 等待一段时间后,会看到消息 BB 会被消息者一接收到了,说明消息 BB 被重新放入队列中,然后投递给能处理消息的消费者一处理了。最终,在消费者一的控制台中,会输出以下内容。这也说明使用手动确认机制后,即使消费者二在处理消息期间宕机了,也不会造成消息丢失。
1
2
3
4
消费者一等待接收消息...
按回车键退出程序:
Successed to consume message : AA
Successed to consume message : BB

RabbitMQ 非公平分发

概念介绍

RabbitMQ 默认采用轮询机制(Round-Robin)进行消息分发 —— 轮询分发,即在多个消费者之间按固定顺序逐一分配消息。这种机制在某些业务场景中可能产生效率问题,例如:当存在两个处理能力差异显著的消费者时(假设消费者 A 的处理效率是消费者 B 的三倍),轮询分发会导致系统资源利用率严重失衡。具体表现为:消费者 A 每完成三次任务仅需消耗消费者 B 处理单个任务的时间,但受限于均等分配规则,消费者 A 在完成既定任务后被迫进入等待状态,而消费者 B 却持续积压未处理消息。这种 “能者等待,弱者过载” 的分配模式本质上源于 RabbitMQ 的设计机制 —— 消息代理系统无法主动感知消费者节点的处理能力差异,仅会机械执行预设的公平分发策略。

  • 为了避免这种情况,可以消费者中设置以下参数:
1
2
3
4
5
6
// 设置预取计数值(当值为 1,其运行效果就是非公平分发,即能者多劳)
int prefetchCount = 1;

// 参数说明:
// prefetchCount – 服务器将传送的最大消息数,如果无限制,则为 0
channel.basicQos(prefetchCount);
  • 启动消费者应用后,在 RabbitMQ 控制台中可以看到以下内容:

  • 此时,多个消费者工作的流程图如下所示。大概的意思就是:如果消费者一还没有处理完消息,或者消费者一还没有确认消息,那么 RabbitMQ 先别将新消息分配给消费者一;消费者一目前只能处理一个消息,然后 RabbitMQ 就会将新消息分配给没有那么忙的其他空闲消费者。当然,如果所有的消费者都还没有处理完手上的消息,且队列还在不停地添加新消息,那么队列就有可能会出现被撑满的情况,这个时候就只能添加新的 Worker(消费者)或者改变消息的存储策略。

案例代码

本节将演示如何使用手动确认机制(ACK)+ QoS 预取计数值来实现 RabbitMQ 消息的非公平分发,并且使用的是工作队列模式。值得一提的是,RabbitMQ 的非公平分发通常是配合手动确认机制(ACK)一起使用。

代码下载

本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-04

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
  • 工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtils {

public static ConnectionFactory connectionFactory;

static {
// 创建连接工厂
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.127");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
}

/**
* 创建信道
*/
public static Channel createChannel() throws Exception {
// 创建连接
Connection connection = connectionFactory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

return channel;
}

}
  • 生产者的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQProducer {

// 队列名称
public static final String QUEUE_NAME = "test";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 从控制台获取需要发送的消息
Scanner scanner = new Scanner(System.in);
// 循环发送消息
while (scanner.hasNext()) {
// 消息内容
String message = scanner.nextLine();
System.out.println("发送消息:" + message);

// 发送消息
// 参数说明:
// exchange – 要将消息发布到的交换机,空字符串表示默认交换机
// routingKey – 路由 Key
// props – 消息的其他属性,比如:使用 MessageProperties.PERSISTENT_TEXT_PLAIN 属性确保消息持久化,配合持久化队列可避免服务器重启导致消息丢失
// body – 消息内容
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
}

// 关闭信道
channel.close();
}

}
  • 消费者一的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQConsumer01 {

// 队列名称
public static final String QUEUE_NAME = "test";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);

// 模拟消息的耗时处理
try {
Thread.sleep(3000);
System.out.println("Successed to consume message : " + msg);
} catch (Exception e) {
e.printStackTrace();
}

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者一等待接收消息,处理消息较快...");

// 设置预取计数值(当值为 1,其运行效果就是非公平分发,即能者多劳)
int prefetchCount = 1;
channel.basicQos(prefetchCount);

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
}

}
  • 消费者二的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQConsumer02 {

// 队列名称
public static final String QUEUE_NAME = "test";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);

// 模拟消息的耗时处理
try {
Thread.sleep(8000);
System.out.println("Successed to consume message : " + msg);
} catch (Exception e) {
e.printStackTrace();
}

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者二等待接收消息,处理消息较慢...");

// 设置预取计数值(当值为 1,其运行效果就是非公平分发,即能者多劳)
int prefetchCount = 1;
channel.basicQos(prefetchCount);

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
}

}

代码测试

  • (1) 分别启动生产者和两个消费者应用

  • (2) 在生产者的控制台中,依次输入以下内容:

1
2
3
4
5
6
7
8
9
10
11
12
AA
发送消息:AA
BB
发送消息:BB
CC
发送消息:CC
DD
发送消息:DD
EE
发送消息:EE
FF
发送消息:FF
  • (3) 由于消费者一的消息处理速度较快,在消费者一的控制台中,会输出以下内容
1
2
3
4
5
6
消费者一等待接收消息,处理消息较快...
按回车键退出程序:
Successed to consume message : BB
Successed to consume message : CC
Successed to consume message : DD
Successed to consume message : FF
  • (4) 由于消费者二的消息处理速度较慢,在消费者二的控制台中,会输出以下内容
1
2
3
4
消费者二等待接收消息,处理消息较慢...
按回车键退出程序:
Successed to consume message : AA
Successed to consume message : EE
  • (5) 观察消费者一和消费者二的控制台输出结果,可以发现符合 “能者多劳” 的特点,也就是在此案例中 RabbitMQ 使用了非公平分发。

RabbitMQ 预取计数值

概念介绍

预取计数值的概念

在上面介绍的 RabbitMQ 非公平分发中,主要是通过 basicQos(int prefetchCount) 方法来实现,而 prefetchCount 就是 “预取计数值”,也被称为 “预取值”。

  • prefetchCount = 0,表示 RabbitMQ 服务器将不会限制往消费者投递的最大消息数量(未确认消息数量)。
  • prefetchCount = 1,表示 RabbitMQ 服务器将会限制往消费者投递的最大消息数量(未确认消息数量)为 1,其最终效果就是非公平分发(能者多劳)。
  • prefetchCount = x,表示 RabbitMQ 服务器将会限制往消费者投递的最大消息数量(未确认消息数量)为 x,其中 x 的值大于 1,即消费者在任意时刻最多可以同时获取到 x 条消息,这有区别于非公平分发实现的效果(能者多劳)。

预取计数值的作用

消息的发送本质上是异步的,因此在任何时刻,Channel 上可能存在多个未处理的消息。同时,消费者的手动确认机制(ACK)也是异步进行的,这就导致存在一个未确认消息的缓冲区。为了避免该缓冲区无限增长,开发人员应当限制其大小,否则会导致资源占用过多,影响系统稳定性。可以通过 basicQos() 方法设置 “预取计数值” 来控制缓冲区的大小,该参数定义了 Channel 上允许的最大未确认消息数量。当未确认的消息数量达到该上限时,RabbitMQ 将暂停向该 Channel 继续投递新消息,直到至少有一条未确认的消息被确认(ACK)。例如,假设 Channel 上当前有 5、6、7、8 四条未确认消息,而预取计数值设置为 4,那么 RabbitMQ 不会再发送更多消息,除非其中至少有一条消息被确认(ACK)。若 tag = 6 的消息被确认,RabbitMQ 便会感知到并立即投递一条新消息到 Channel 上。

消费确认机制与 QoS 预取计数值对吞吐量的影响

通常,适当增加预取计数值可以提高 RabbitMQ 向消费者投递消息的速度。虽然自动确认机制能够实现最快的消息处理速度,但此时 RabbitMQ 可能会向消费者无限制地发送消息,在这种情况下已投递但尚未处理的消息会不断积压,从而导致消费者 RAM(物理内存)占用激增。因此,在使用自动确认机制或手动确认机制时,应避免设置无限预取,以免出现内存耗尽的风险。当消费者需要处理大量消息时,若未及时确认(ACK),会导致消费者连接节点的内存消耗变大。因此,选择合适的预取计数值是一个需要反复试验的过程,不同的业务负载对应不同的最佳取值。一般而言,预取计数值设在 100 ~ 300 之间可以在吞吐量和资源占用之间取得较好的平衡,不会给消费者带来太大的风险。若预取计数值为 1,虽然是最安全的,但吞吐量会显著降低,尤其是在消费者连接存在较大延迟的情况下。对于大多数应用而言,稍微提高预取计数值可以获得更高的性能。

案例代码

本节将演示如何使用手动确认机制(ACK)+ QoS 预取计数值,并且使用的是工作队列模式。

代码下载

本节所需的完整案例代码,可以直接从 GitHub 下载对应章节 rabbitmq-lesson-05

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
  • 工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtils {

public static ConnectionFactory connectionFactory;

static {
// 创建连接工厂
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.2.127");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
}

/**
* 创建信道
*/
public static Channel createChannel() throws Exception {
// 创建连接
Connection connection = connectionFactory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

return channel;
}

}
  • 生产者的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQProducer {

// 队列名称
public static final String QUEUE_NAME = "test";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 从控制台获取需要发送的消息
Scanner scanner = new Scanner(System.in);
// 循环发送消息
while (scanner.hasNext()) {
// 消息内容
String message = scanner.nextLine();
System.out.println("发送消息:" + message);

// 发送消息
// 参数说明:
// exchange – 要将消息发布到的交换机,空字符串表示默认交换机
// routingKey – 路由 Key
// props – 消息的其他属性,比如:使用 MessageProperties.PERSISTENT_TEXT_PLAIN 属性确保消息持久化,配合持久化队列可避免服务器重启导致消息丢失
// body – 消息内容
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
}

// 关闭信道
channel.close();
}

}
  • 消费者一的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQConsumer01 {

// 队列名称
public static final String QUEUE_NAME = "test";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);

// 模拟消息的耗时处理
try {
Thread.sleep(5000);
System.out.println("Successed to consume message : " + msg);
} catch (Exception e) {
e.printStackTrace();
}

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者一等待接收消息,处理消息较快...");

// 设置预取计数值
int prefetchCount = 2;
channel.basicQos(prefetchCount);

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
}

}
  • 消费者二的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import com.clay.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class MQConsumer02 {

// 队列名称
public static final String QUEUE_NAME = "test";

public static void main(String[] args) throws Exception {
// 创建信道
Channel channel = RabbitMQUtils.createChannel();

// 声明队列
// 参数说明:
// queue – 队列的名称
// durable – 如果需要声明一个持久队列,则为 true(队列将在服务器重启后继续存在)
// exclusive – 如果需要声明一个独占队列(仅限于此连接使用,连接关闭后队列自动删除),则为 true。
// autoDelete – 如果需要声明 autoDelete 队列,则为 true(服务器将在最后一个消费者断开连接以后,自动删除该队列)
// arguments – 队列的其他属性(构造参数)
// 特别注意:如果确定队列已存在,消费者可以不声明队列。但是,强烈建议无论生产者还是消费者,都应该声明队列,确保参数可控
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 消费消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);

// 模拟消息的耗时处理
try {
Thread.sleep(10000);
System.out.println("Successed to consume message : " + msg);
} catch (Exception e) {
e.printStackTrace();
}

// 手动确认消息
// 参数说明:
// deliveryTag – 消息的标记
// multiple – true 表示确认所有消息,包括提供的送达标签为止的所有消息;false 仅确认提供的投放标记
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

// 取消消费时的回调(比如,在消费的时候队列已被删除掉)
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("Failed to consume message : " + consumerTag);
};

System.out.println("消费者二等待接收消息,处理消息较慢...");

// 设置预取计数值
int prefetchCount = 4;
channel.basicQos(prefetchCount);

// 关闭自动确认机制
boolean autoAck = false;

// 消费消息
// 参数说明:
// queue – 队列的名称
// autoAck – 如果需要服务器在消息投递后自动确认消息,则为 true;如果需要客户端手动确认消息,则为 false
// deliverCallback – 消费消息时的回调
// cancelCallback – 取消消费时的回调
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

// 让消费者持续运行
System.out.println("按回车键退出程序:");
new Scanner(System.in).nextLine();

// 关闭信道
channel.close();
}

}

代码测试

  • (1) 分别启动生产者和两个消费者应用

  • (2) 在生产者的控制台中,依次输入以下内容:

1
2
3
4
5
6
7
8
9
10
11
12
AA
发送消息:AA
BB
发送消息:BB
CC
发送消息:CC
DD
发送消息:DD
EE
发送消息:EE
FF
发送消息:FF
  • (3) 由于消费者一的消息处理速度较快,并且预取计数值设置为 2,在消费者一的控制台中,会输出以下内容
1
2
3
4
消费者一等待接收消息,处理消息较快...
按回车键退出程序:
Successed to consume message : AA
Successed to consume message : CC
  • (4) 由于消费者二的消息处理速度较慢,并且预取计数值设置为 4,在消费者二的控制台中,会输出以下内容
1
2
3
4
5
6
消费者二等待接收消息,处理消息较慢...
按回车键退出程序:
Successed to consume message : BB
Successed to consume message : DD
Successed to consume message : EE
Successed to consume message : FF
  • (5) 观察消费者一和消费者二的控制台输出结果,可以发现每个消费者都会获取到与预取计数值对应的消息数量,也就是不符合 “能者多劳” 的特点,这有区别于非公平分发实现的效果。