RocketMQ 开发随笔
消息批量发送和批量消费
批量发送消息
消息发送大小
在 RocketMQ 中,单条消息的最大默认大小为 4MB。不过,这里有几种不同类型消息的具体限制,需要留意一下:
| 消息类型 | 最大的大小限制 | 补充说明 |
|---|---|---|
| 普通消息、顺序消息 | 4 MB | 这是最通用的限制,无论是开源版本还是云厂商服务都遵循此默认值。 |
| 事务消息、定时 / 延时消息 | 64 KB | 这类消息的大小限制要严格得多,设计上就要求其体量更轻量。 |
| 消息自定义属性 | 16 KB | 无论消息本身多大,其携带的自定义属性(properties)总和不能超过此限制。 |

生产者通过 send() 方法发送的 Message,并不会被直接序列化后发送到网络,而是会基于该 Message 生成一个字符串再发送出去。这个字符串由四部分组成:Topic、消息 Body、消息日志(占用 20 字节),以及一组用于描述消息的键值对属性(Key-Value)。这些属性中包含了如生产者地址、生产时间、目标 QueueId 等信息。最终写入 Broker 的消息单元中的数据,全部来源于这些属性。
批量发送限制
生产者进行消息发送时可以一次发送多条消息,这可以大大提升生产者的发送效率。不过需要注意以下几点:
- 批量发送的消息必须具有相同的 Topic
- 批量发送的消息必须具有相同的刷盘策略
- 批量发送的消息不能是延时消息与事务消息
批量发送大小
在默认情况下,批量发送消息的总大小不能超过 4MB。如果想超出该值,有两种解决方案:
方案一:
- 将批量消息进行拆分,拆分为若干不大于 4MB 的消息集合分多次批量发送
方案二:在 Broker 端与 Producer 端修改属性
- Broker 端需要修改其加载的配置文件中的
maxMessageSize属性 - Producer 端需要在发送消息之前,设置 Producer 的
maxMessageSize属性 - 特别注意,Broker 端和 Producer 端都需要更改
maxMessageSize属性的值,缺一不可
- Broker 端需要修改其加载的配置文件中的
特别注意
尽管技术上可以修改批量发送消息的总大小,但非常不推荐超过 1 MB。因为过大的消息会成为系统的性能瓶颈,导致网络带宽紧张、Broker 内存和磁盘压力增大,从而拖垮整个系统的吞吐能力。
批量消费消息
批量消费相关的属性
在 Consumer 的 MessageListenerConcurrently 监听接口中,consumeMessage() 方法的第一个参数是消息列表,但默认情况下每次只能消费一条消息。
1 | // 注册消息监听器 |
若希望一次消费多条消息,可以通过修改 Consumer 的 consumeMessageBatchMaxSize 属性来指定一次消费的最大消息数量。不过,该属性的值默认不能超过 32,因为默认情况下消费者每次最多可以拉取 32 条消息。若要修改一次拉取的最大消息数量,则可以通过修改 Consumer 的 pullBatchSize 属性来实现。
1 | // 指定消费者每次从 Broker 拉取的最大消息数量,默认值是 32 |
批量消费存在的问题
Consumer 的 pullBatchSize 与 consumeMessageBatchMaxSize 属性是否设置得越大越好?答案:当然不是。
pullBatchSize值越大,Consumer 每次拉取消息所需的时间就越长,且在网络上传输时出现问题的可能性也越高。如果在拉取过程中出现问题,则该批次的所有消息都需要全部重新拉取。consumeMessageBatchMaxSize值越大,Consumer 的消息并发消费能力就越低,因为这一批次的消息只会被一个线程处理。而且,这批消息会共享相同的消费结果:只要其中有一条消息处理异常,整批消息都需要全部重新消费(可能会出现重复消费问题)。
事务消息相关的常见配置参数
在 RocketMQ 中,三个常见事务属性的配置如下:
transactionTimeout=20:指定 TM 应在 20 秒内将最终确认状态发送给 TC,否则会引发事务状态回查。默认值为 60 秒。transactionCheckMax=5:指定最多回查 5 次,超过此次数后将丢弃消息并记录错误日志。默认值为 15 次。transactionCheckInterval=10:指定多次事务状态回查之间的时间间隔为 10 秒。默认值为 60 秒。
消费者在 Push 方式下控制消费速率
RocketMQ 的 Push 消费方式本质上仍然是 “拉取(Pull)+ 封装”,只是由客户端代替用户自动完成拉取调度。因此,客户端并不能像 Pull 消费方式那样完全手动控制消息的拉取速度,而是通过一些参数和机制来进行间接调节拉取速度。常见的控制参数包括:
(1) 控制消费并发度(最直接的手段)
- 参数:
consumeThreadMin,表示最小消费线程数,默认值是 20-
consumeThreadMax,表示最大消费线程数,默认值是 20
- 本质:控制消费线程池大小
- 影响:
- 并发越高 → 消费能力越强 → 本地堆积减少 → 拉取可以更快进行
- 并发越低 → 消费变慢 → 本地堆积增加 → 反向抑制拉取速度(形成背压)
- 参数:
(2) 控制单次拉取的消息数量
- 参数:
pullBatchSize,默认值是 32 - 本质:控制每次从 Broker 批量拉取的消息条数
- 影响:
- 数量越大 → 单次网络吞吐更高,但可能导致本地短时间堆积
- 数量越小 → 拉取更平滑、延迟更低,但整体吞吐下降
- 参数:
(3) 控制批量消费的消息数量
- 参数:
consumeMessageBatchMaxSize,默认值是 1 - 本质:控制每次批量消费的最大消息数量,其值不允许超过
pullBatchSize - 影响:
- 数量越大 → 消息的并发消费能力就越低,因为这一批次的消息只会被一个线程处理,并且只要其中有一条消息处理异常,整批消息都需要全部重新消费
- 数量越小 → 整体的消费吞吐下降
- 参数:
(4) 流控机制(核心:消费端背压)
- RocketMQ 内置了基于本地缓存的流控机制:
pullThresholdForQueue(按消息条数限制),默认值是 1000。表示当本地队列缓存的消息数量超过此阀值时,Consumer 将暂停从 Broker 拉取新消息,以避免内存压力过大pullThresholdSizeForQueue(按消息大小限制),默认值是 100MB。表示当本地缓存消息的总大小超过此阈值时,Consumer 也会触发流控,暂停拉取新消息
- 工作机制:
- 消息会先进入本地缓存(
ProcessQueue) - 当
ProcessQueue中的消息堆积超过阈值时: - 客户端会延迟或暂停拉取请求(而不是完全停止)
- 等待消费线程处理一部分消息后,再恢复拉取
- 消息会先进入本地缓存(
- RocketMQ 内置了基于本地缓存的流控机制:
