RocketMQ 开发随笔

消息批量发送和批量消费

批量发送消息

消息发送大小

在 RocketMQ 中,单条消息的最大默认大小为 4MB。不过,这里有几种不同类型消息的具体限制,需要留意一下:

消息类型最大的大小限制补充说明
普通消息、顺序消息 4 MB 这是最通用的限制,无论是开源版本还是云厂商服务都遵循此默认值。
事务消息、定时 / 延时消息 64 KB 这类消息的大小限制要严格得多,设计上就要求其体量更轻量。
消息自定义属性 16 KB 无论消息本身多大,其携带的自定义属性(properties)总和不能超过此限制。

生产者通过 send() 方法发送的 Message,并不会被直接序列化后发送到网络,而是会基于该 Message 生成一个字符串再发送出去。这个字符串由四部分组成:Topic、消息 Body、消息日志(占用 20 字节),以及一组用于描述消息的键值对属性(Key-Value)。这些属性中包含了如生产者地址、生产时间、目标 QueueId 等信息。最终写入 Broker 的消息单元中的数据,全部来源于这些属性。

批量发送限制

生产者进行消息发送时可以一次发送多条消息,这可以大大提升生产者的发送效率。不过需要注意以下几点:

  • 批量发送的消息必须具有相同的 Topic
  • 批量发送的消息必须具有相同的刷盘策略
  • 批量发送的消息不能是延时消息与事务消息

批量发送大小

在默认情况下,批量发送消息的总大小不能超过 4MB。如果想超出该值,有两种解决方案:

  • 方案一:

    • 将批量消息进行拆分,拆分为若干不大于 4MB 的消息集合分多次批量发送
  • 方案二:在 Broker 端与 Producer 端修改属性

    • Broker 端需要修改其加载的配置文件中的 maxMessageSize 属性
    • Producer 端需要在发送消息之前,设置 Producer 的 maxMessageSize 属性
    • 特别注意,Broker 端和 Producer 端都需要更改 maxMessageSize 属性的值,缺一不可

特别注意

尽管技术上可以修改批量发送消息的总大小,但非常不推荐超过 1 MB。因为过大的消息会成为系统的性能瓶颈,导致网络带宽紧张、Broker 内存和磁盘压力增大,从而拖垮整个系统的吞吐能力。

批量消费消息

批量消费相关的属性

在 Consumer 的 MessageListenerConcurrently 监听接口中,consumeMessage() 方法的第一个参数是消息列表,但默认情况下每次只能消费一条消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {

// 一旦 Broker 中有了其订阅的消息就会触发该方法的执行,其返回值为当前 Consumer 消费的状态
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 遍历消息
for (MessageExt msg : msgs) {
System.out.println(msg);
}
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

});

若希望一次消费多条消息,可以通过修改 Consumer 的 consumeMessageBatchMaxSize 属性来指定一次消费的最大消息数量。不过,该属性的值默认不能超过 32,因为默认情况下消费者每次最多可以拉取 32 条消息。若要修改一次拉取的最大消息数量,则可以通过修改 Consumer 的 pullBatchSize 属性来实现。

1
2
3
4
5
// 指定消费者每次从 Broker 拉取的最大消息数量,默认值是 32
consumer.setPullBatchSize(32);

// 指定消费者每次消费的最大消息数量,默认值是 1,不能超过 pullBatchSize 属性的值
consumer.setConsumeMessageBatchMaxSize(5);

批量消费存在的问题

Consumer 的 pullBatchSizeconsumeMessageBatchMaxSize 属性是否设置得越大越好?答案:当然不是。

  • pullBatchSize 值越大,Consumer 每次拉取消息所需的时间就越长,且在网络上传输时出现问题的可能性也越高。如果在拉取过程中出现问题,则该批次的所有消息都需要全部重新拉取。
  • consumeMessageBatchMaxSize 值越大,Consumer 的消息并发消费能力就越低,因为这一批次的消息只会被一个线程处理。而且,这批消息会共享相同的消费结果:只要其中有一条消息处理异常,整批消息都需要全部重新消费(可能会出现重复消费问题)。

事务消息相关的常见配置参数

在 RocketMQ 中,三个常见事务属性的配置如下:

  • transactionTimeout=20:指定 TM 应在 20 秒内将最终确认状态发送给 TC,否则会引发事务状态回查。默认值为 60 秒。
  • transactionCheckMax=5:指定最多回查 5 次,超过此次数后将丢弃消息并记录错误日志。默认值为 15 次。
  • transactionCheckInterval=10:指定多次事务状态回查之间的时间间隔为 10 秒。默认值为 60 秒。

消费者在 Push 方式下控制消费速率

RocketMQ 的 Push 消费方式本质上仍然是 “拉取(Pull)+ 封装”,只是由客户端代替用户自动完成拉取调度。因此,客户端并不能像 Pull 消费方式那样完全手动控制消息的拉取速度,而是通过一些参数和机制来进行间接调节拉取速度。常见的控制参数包括:

  • (1) 控制消费并发度(最直接的手段)

    • 参数:
      • consumeThreadMin,表示最小消费线程数,默认值是 20
      • consumeThreadMax,表示最大消费线程数,默认值是 20
    • 本质:控制消费线程池大小
    • 影响:
      • 并发越高 → 消费能力越强 → 本地堆积减少 → 拉取可以更快进行
      • 并发越低 → 消费变慢 → 本地堆积增加 → 反向抑制拉取速度(形成背压)
  • (2) 控制单次拉取的消息数量

    • 参数:pullBatchSize,默认值是 32
    • 本质:控制每次从 Broker 批量拉取的消息条数
    • 影响:
      • 数量越大 → 单次网络吞吐更高,但可能导致本地短时间堆积
      • 数量越小 → 拉取更平滑、延迟更低,但整体吞吐下降
  • (3) 控制批量消费的消息数量

    • 参数:consumeMessageBatchMaxSize,默认值是 1
    • 本质:控制每次批量消费的最大消息数量,其值不允许超过 pullBatchSize
    • 影响:
      • 数量越大 → 消息的并发消费能力就越低,因为这一批次的消息只会被一个线程处理,并且只要其中有一条消息处理异常,整批消息都需要全部重新消费
      • 数量越小 → 整体的消费吞吐下降
  • (4) 流控机制(核心:消费端背压)

    • RocketMQ 内置了基于本地缓存的流控机制:
      • pullThresholdForQueue(按消息条数限制),默认值是 1000。表示当本地队列缓存的消息数量超过此阀值时,Consumer 将暂停从 Broker 拉取新消息,以避免内存压力过大
      • pullThresholdSizeForQueue(按消息大小限制),默认值是 100MB。表示当本地缓存消息的总大小超过此阈值时,Consumer 也会触发流控,暂停拉取新消息
    • 工作机制:
      • 消息会先进入本地缓存(ProcessQueue
      • ProcessQueue 中的消息堆积超过阈值时:
      • 客户端会延迟或暂停拉取请求(而不是完全停止)
      • 等待消费线程处理一部分消息后,再恢复拉取