RocketMQ 4.x 入门教程之一
大纲
前言
学习资源
版本说明
本文所使用的各软件版本如下表所示:
| 组件 | 版本 | 说明 |
|---|---|---|
| JDK | 11 | Java、SpringBoot、SpringCloud 项目案例使用的 JDK 版本 |
| RocketMQ Server | 4.9.0 | RocketMQ 服务端(包括 NameServer 和 Broker),运行时依赖 JDK 1.8+ |
消息队列的概述
MQ 的概述
MQ(Message Queue)是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程 API 的软件系统。消息即数据,一般消息的体积不会很大。
MQ 的用途
流量削峰
MQ 可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

异步解耦
上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高。而异步调用则可以解决这些问题,所以两层之间若要实现由同步到异步的转化,一般性做法就是在这两层间添加一个 MQ 层。

数据收集
分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过 MQ 完成此类数据收集是最好的选择。

MQ 协议有哪些
一般情况下 MQ 的实现是要遵循一些常规性协议的。常见的 MQ 协议如下:
JMS
- JMS,Java Messaging Service(Java 消息服务)。
- JMS 是 Java 平台上有关 MOM(Message Oriented Middleware,面向消息的中间件 PO/OO/AO)的技术规范。
- JMS 用于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的生产、发送、接收消息的接口,简化企业应用的开发。
- ActiveMQ 是 JMS 协议的典型实现。
STOMP
- STOMP,Streaming Text Orientated Message Protocol(面向流文本的消息协议),是一种 MOM 设计的简单文本协议。
- STOMP 提供一个可互操作的连接格式,允许客户端与任意 STOMP 消息代理(Broker)进行交互。
- ActiveMQ 是 STOMP 协议的典型实现,RabbitMQ 可以通过插件支持该协议。
AMQP
- AMQP,Advanced Message Queuing Protocol(高级消息队列协议)。
- AMQP 是一个提供统一消息服务的应用层标准,是应用层协议的一个开放标准,是一种 MOM 设计。
- 基于 AMQP 协议的客户端与消息中间件可传递消息,并不受客户端 / 中间件不同产品,不同开发语言等条件的限制。
- RabbitMQ 是 AMQP 协议的典型实现。
MQTT
- MQTT,Message Queuing Telemetry Transport(消息队列遥测传输)。
- MQTT 是 IBM 开发的一个即时通讯协议,是一种二进制协议,主要用于服务器和低功耗 IoT(物联网)设备间的通信。
- MQTT 协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器的通信协议。
- RabbitMQ 可以通过插件支持 MQTT 协议。
MQ 产品有哪些
ActiveMQ
- ActiveMQ 是使用 Java 语言开发一款 MQ 产品。
- 早期很多公司与项目中都在使用,但现在的社区活跃度已经很低,目前在项目中已经很少使用了。
RabbitMQ
- RabbitMQ 是使用 ErLang 语言开发的一款 MQ 产品。
- 其吞吐量较 Kafka 与 RocketMQ 要低,且由于其不是 Java 语言开发,所以大多数公司内部对其实现定制化开发难度较大。
Kafka
- Kafka 是使用 Scala / Java 语言开发的一款 MQ 产品。
- 其最大的特点就是高吞吐率,常用于大数据领域的实时计算、日志采集等场景。
- 其没有遵循任何常见的 MQ 协议(比如 AMQP),而是使用自研协议。对于 Spring Cloud Netflix,其仅支持 RabbitMQ 与 Kafka。
RocketMQ
- RocketMQ 是使用 Java 语言开发的一款 MQ 产品。
- 经过多年阿里双 11 的考验,性能与稳定性都非常高。
- 其没有遵循任何常见的 MQ 协议(比如 AMQP),而是使用自研协议。对于 Spring Cloud Alibaba,其支持 RabbitMQ、Kafka,但推荐使用 RocketMQ。
MQ 选型的建议
不同 MQ 的适用场景
Kafka
- Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。
- 大型公司建议可以选用,如果有日志采集功能,肯定是首选 Kafka。
RocketMQ
- 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。
- RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验。如果业务有上述并发场景,建议可以选择 RocketMQ。
RabbitMQ
- 结合 Erlang 语言本身的并发优势,性能较好,时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便。
- 如果数据量并不是很大,中小型公司优先选择功能比较完备的 RabbitMQ。
不同 MQ 产品的对比
| 关键词 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
|---|---|---|---|---|
| 开发语言 | Java | Erlang | Java | Java |
| 单机吞吐量 | 万级 | 万级 | 十万级 | 十万级 |
| 时效性 | 毫秒级 | 微秒级(低延迟) | 毫秒级(高吞吐优先) | 毫秒级(延迟与吞吐平衡较好) |
| Topic | - | - | 百级 Topic 时会影响吞吐量 | 千级 Topic 时会影响吞吐量 |
| 社区活跃度 | 低 | 高 | 高 | 高 |
扩展阅读
RocketMQ 的概述
RocketMQ 是⼀款阿⾥巴巴开源的消息中间件,是一个统一消息引擎、轻量级数据处理平台。2016 年 11 ⽉ 28 ⽇,阿⾥巴巴向 Apache 软件基⾦会捐赠 RocketMQ,成为 Apache 孵化项⽬。2017 年 9 ⽉ 25 ⽇,Apache 宣布 RocketMQ 孵化成为 Apache 顶级项⽬(TLP ),成为国内⾸个互联⽹中间件在 Apache 上的顶级项⽬。
发展历程

- 2007 年,阿里开始五彩石项目,Notify 作为项目中交易核心消息流转系统,应运而生。Notify 系统是 RocketMQ 的雏形。
- 2010 年,B2B 大规模使用 ActiveMQ 作为阿里的消息内核。阿里急需一个具有海量堆积能力的消息系统。
- 2011 年初,Kafka 开源。淘宝中间件团队在对 Kafka 进行了深入研究后,开发了一款新的 MetaQ。
- 2012 年,MetaQ 发展到了 v3.0 版本,在它基础上进行了进一步的抽象,形成了 RocketMQ,然后就将其进行了开源。
- 2015 年,阿里在 RocketMQ 的基础上,又推出了一款专门针对阿里云上用户的消息系统 Aliware MQ。
- 2016 年双十一,RocketMQ 承载了万亿级消息的流转,跨越了一个新的里程碑。同年 11 ⽉ 28 ⽇,阿⾥巴巴向 Apache 软件基⾦会捐赠 RocketMQ,成为 Apache 孵化项⽬。
- 2017 年 9 ⽉ 25 ⽇,Apache 宣布 RocketMQ 孵化成为 Apache 顶级项⽬(TLP ),成为国内⾸个互联⽹中间件在 Apache 上的顶级项⽬。
基础概念
消息(Message)
- 消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。
主题(Topic)

- Topic 表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。
- 一个生产者可以同时发送多种 Topic 的消息;而一个消费者只对某种特定的 Topic 感兴趣,即只可以订阅和消费一种 Topic 的消息。
提示
- ActiveMQ:基于 JMS,明确支持 Topic。
- Kafka、RocketMQ:原生以 Topic 为核心抽象。
- RabbitMQ:没有直接的 Topic 实体,但通过 Topic Exchange 实现类似功能。
标签(Tag)
- 为消息设置的标签,用于同一主题(Topic)下区分不同类型的消息。
- 来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。
- 标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。
- 消费者可以根据标签实现对不同子主题的不同消费逻辑,实现更好的扩展性。
- 可以理解为:Topic 是消息的一级分类,Tag 是消息的二级分类。
队列(Queue)
- 队列是存储消息的物理实体。
- 一个 Topic 中可以包含多个 Queue,每个 Queue 中存储的就是该 Topic 的消息。
- 一个 Topic 的 Queue 也被称为一个 Topic 中消息的分区(Partition)。
- 一个 Topic 的某一个 Queue 中的消息,只能被一个消费者组中的一个消费者消费。
- 一个 Queue 中的消息,不允许同一个消费者组中的多个消费者同时消费。

- 在学习参考其它相关资料时,可能还会看到一个非官方的概念:分片(Sharding)。
- 分片不同于分区,在 RocketMQ 中,分片指的是存储相应 Topic 的 Broker。
- 每个分片(即 Broker)中会创建出相应数量的分区(即 Queue),每个 Queue 的大小都是相同的。

消息标识(MessageId / Key)
RocketMQ 中每个消息都拥有唯一的 MessageId,且可以携带具有业务标识的 Key,以方便对消息的查询。不过需要注意的是,MessageId 有两个:在生产者发送消息时会自动生成一个 MessageId(msgId),当消息到达 Broker 后,Broker 也会自动生成一个 MessageId(offsetMsgId)。这里的 msgId、offsetMsgId 与 Key 都称为消息标识。
msgId:
- 由 Producer 端生成,其生成规则为:producerIp + 进程 PID + MessageClientIDSetter 类的 ClassLoader 的 hashCode + 当前时间 + AutomicInteger 自增计数器
offsetMsgId:
- 由 Broker 端生成,其生成规则为:brokerIp + 物理分区的 offset(Queue 中的偏移量)
key:
- 由用户指定的业务相关的唯一标识
系统架构
RocketMQ 整体的系统架构图

Producer
消息生产者(Producer),负责生产消息。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败(Failfast)并且低延迟。
- 例如,业务系统产生的日志写入到 MQ 的过程,就是消息生产的过程。
- 再如,电商平台中用户提交的秒杀请求写入到 MQ 的过程,就是消息生产的过程。
RocketMQ 中的消息生产者都是以生产者组(Producer Group)的形式出现的。在 RocketMQ 中,生产者组是具有相同业务语义的一组生产者实例的逻辑集合。组内多个生产者发送的消息类型一致(业务上),但可以将消息发送到多个不同的 Topic。
Kafka 不存在生产组的概念
Kafka 没有生产者组的概念,Producer 是独立的、无状态的,不需要分组管理。Kafka 只在消费者侧提供 Consumer Group(消费者组)概念,用于实现负载均衡和消费进度管理。而像 RocketMQ 中的 Producer Group,主要用于事务消息回查等机制,在 Kafka 中并不存在类似设计。
Consumer
消息消费者(Consumer),负责消费消息。一个消息消费者会从 Broker 服务器中获取到消息,并对消息进行相关业务处理。
- 例如,QoS 系统从 MQ 中读取日志,并对日志进行解析处理的过程就是消息消费的过程。
- 再如,电商平台的业务系统从 MQ 中读取到秒杀请求,并对请求进行处理的过程就是消息消费的过程。
RocketMQ 中的消息消费者都是以消费者组(Consumer Group)的形式出现的。在 RocketMQ 中,消费者组(Consumer Group)是同一类消费者的集合,这些 Consumer 通常订阅并消费相同 Topic 的消息。消费者组的设计使得消息消费能够方便地实现负载均衡和容错:在负载均衡方面,是将一个 Topic 下的多个 Queue 分配给同一 Consumer Group 内的不同 Consumer 进行消费(而不是对单条消息进行负载均衡);在容错方面,当某个 Consumer 宕机后,其负责的 Queue 会被重新分配给组内其他 Consumer,从而保证消息能够继续被消费。

消费者组中 Consumer 的数量应该小于等于订阅 Topic 的 Queue 数量。如果超出 Queue 数量,则多出的 Consumer 将不能消费消息。

不过,同一个 Topic 的消息可以被多个消费者组同时消费,但需要注意:
- (1) 一个消费者组只能消费一个 Topic 的消息,不能同时消费多个 Topic 消息。
- (2) 同一个消费者组中的消费者必须订阅完全相同的 Topic。
Name Server
功能介绍
NameServer 是 RocketMQ 中用于维护 Topic 与 Broker 路由关系的轻量级注册中心,主要负责 Broker 的注册与发现,以及路由信息的管理。RocketMQ 的设计思想参考了 Kafka。早期的 Kafka 依赖于 Zookeeper 进行元数据管理,因此在 RocketMQ 的前身 MetaQ 的早期版本(v1.0、v2.0)中,也曾依赖 Zookeeper。从 MetaQ v3.0(即 RocketMQ)开始,去除了对 Zookeeper 的依赖,转而采用自研的 NameServer 组件来实现服务发现与路由管理。NameServer 的核心功能如下:
Broker 管理:
- 接受 Broker 集群的注册信息,并且保存下来作为路由信息的基本数据。
- 提供心跳检测机制,检查 Broker 是否还存活。
路由信息管理:
- 每个 NameServer 中都保存着 Broker 集群的整个路由信息和用于客户端查询的队列信息。
- Producer 和 Conumser 通过 NameServer 可以获取整个 Broker 集群的路由信息,从而进行消息的投递和消费。
路由注册
NameServer 通常是以集群的方式部署,不过,NameServer 集群是无状态的,即 NameServer 集群中的各个节点间是没有地位差异的,各节点之间相互不进行信息通讯。那各节点中的数据是如何进行数据同步的呢?在 Broker 节点启动时,会轮询 NameServer 列表,与每个 NameServer 节点建立长连接,发起注册请求。在每个 NameServer 的内部维护着⼀个完整的 Broker 列表,用来动态存储 Broker 的信息。特别注意,这与其它 Zookeeper、Eureka、Nacos 等注册中心不同的地方。这种 NameServer 的无状态方式,其优缺点如下:
- 优点:
- NameServer 集群搭建简单,扩容简单。
- 缺点:
- 对于 Broker 来说,必须明确指出所有 NameServer 地址,否则未指出的 NameServer 将不会去注册;也正因为如此,NameServer 并不能随便扩容。
- 因为,如果 Broker 不重新配置,新增的 NameServer 节点对于 Broker 来说是不可见的,其不会向这个 NameServer 进行注册。
Broker 节点为了维护与 NameServer 之间的长连接,证明自己是活着的,会将最新的信息以心跳包的方式上报给 NameServer,每隔 30 秒发送一次心跳包。心跳包中包含 BrokerId、Broker 地址 (IP + Port)、Broker 名称、Broker 所属集群名称等等。NameServer 在接收到心跳包后,会更新 Broker 的心跳时间戳,记录这个 Broker 的最新存活时间。
路由剔除
由于 Broker 关机、宕机或网络抖动等原因,导致 NameServer 没有收到 Broker 的心跳包,NameServer 可能会将其从 Broker 列表中剔除。NameServer 中有⼀个定时任务,每隔 10 秒就会扫描⼀次 Broker 列表,检查每一个 Broker 的最新心跳时间戳距离当前时间是否超过 120 秒,如果超过则会判定 Broker 失效,然后将其从 Broker 列表中剔除。
- 对于 RocketMQ 的日常运维工作,例如 Broker 升级版本,需要停掉 Broker 的运行,此时 OP(运维工程师)需要怎么做呢?
- OP 需要将 Broker 的读写权限禁止掉,一旦 Client(Producer 和 Consumer)向 Broker 发送请求,都会收到 Broker 的
NO_PERMISSION响应,然后 Client 会进行对其它 Broker 的重试。 - 当 OP 观察到这个 Broker 没有流量后,再关闭它,从而实现 Broker 从 NameServer 的移除。
专业术语
- OP:Operations Engineer,运维工程师,偏传统运维(人工 + 工具)
- SRE:Site Reliability Engineer,站点可靠性工程师,偏工程化运维(代码 + 自动化 + 平台化)
路由发现
RocketMQ 的路由发现机制采用的是 Pull 模型。当 Topic 路由信息出现变化时,NameServer 不会主动推送给客户端,而是客户端定时拉取 Topic 最新的路由信息。客户端默认每隔 30 秒会拉取一次最新的路由信息。常见的客户端与服务端通信模型有以下几种:
Push 模型(推送模型)
- 实时性最好,是典型的 “发布 - 订阅(Pub / Sub)” 模型(如 WebSocket、MQ 推送)。
- 需要维护长连接(如 TCP / WebSocket),服务器需持续占用连接资源。
- 适用场景:
- 实时性要求极高(行情、IM、通知推送)
- 客户端数量可控
- 服务端数据变化频繁
Pull 模型(拉取模型)
- 客户端主动定时请求服务端获取数据(轮询)。
- 优点:
- 实现简单
- 服务端无连接压力(无状态)
- 缺点:
- 实时性较差(取决于轮询间隔)
- 可能产生大量无效请求(数据没变也请求)
- 适用场景:
- 实时性要求不高
- 客户端数量较多(容易横向扩展)
Long Polling 模型(长轮询模型)
- 本质是对 Pull 的优化:客户端发起请求后,当服务端发现没有数据,不会立即返回响应结果,而是 “挂起等待”
- 当数据发生变化或超时后服务端再返回响应结果,客户端再发起下一次请求(伪实时 + HTTP 兼容)
- 优点:
- 实时性接近 Push
- 无需维护真正的长连接(比 WebSocket 简单)
- 缺点:
- 服务端需要维护大量挂起请求(线程 / 连接压力)
- 实现复杂度高于普通轮询
- 适用场景:
- 需要较高实时性,但不方便使用 WebSocket
- 兼容性要求高(HTTP 场景)
- 实现方式:
- Long Polling 本身并不依赖 epoll 等 I/O 多路复用技术,它只是应用层的一种通信模型
- 但在高并发场景下,由于 Long Polling 会产生大量挂起的线程 / 连接,服务端通常会结合 epoll 等 I/O 多路复用技术来提高连接处理效率,避免线程资源被耗尽
客户端选择 NameServer 节点的策略
客户端(Producer 与 Consumer)在配置时必须要写上 NameServer 集群的地址,那么客户端到底连接的是哪个 NameServer 节点呢?客户端首先会生产一个随机数,然后再与 NameServer 节点数量取模,此时得到的就是所要连接的 NameServer 节点索引,然后就会进行连接。如果 NameServer 连接失败,则会采用 Round-Robin 策略(轮询),逐个尝试着去连接其它 NameServer 节点。简而言之,首先采用的是随机策略选择 NameServer 节点,一旦连接失败后,再采用轮询策略选择其它 NameServer 节点。
Zookeeper Client 是如何选择 Server 节点的呢?
Zookeeper Client 在连接 Zookeeper Server 集群时,会先对配置的 Server 列表进行一次随机打乱(Shuffle),然后按顺序尝试连接(通常从第一个开始);如果选中的是域名,则解析出多个 IP 后再进行一次随机打乱(Shuffle),最终选择第一个 IP 地址进行连接,从而实现简单的负载均衡和容灾。
Broker
Broker 的功能介绍
Broker 充当着消息中转角色,负责存储消息、转发消息。Broker 在 RocketMQ 系统中负责接收并存储从生产者发送过来的消息,同时为消费者的消息拉取请求作准备。Broker 同时也存储着与消息相关的元数据,包括消费者组消费进度偏移 Offset、Topic、Queue 等。
Kafka 存储 Offset 信息的位置
Kafka 在 0.9 及以后的版本中,Offset 信息是存储在 Broker 中的,之前的版本是存储在 Zookeeper 中的。
Broker 的模块组成

Remoting Module:
- 整个 Broker 的实体,负责处理来自客户端(Producer 和 Consumer)的请求,而这个 Broker 实体则由以下子模块构成。
Client Manager:
- 客户端管理器,负责接收和解析客户端(Producer 和 Consumer)请求、管理客户端。
- 例如,维护 Consumer 的 Topic 订阅信息。
Store Service:
- 存储服务,提供方便简单的 API 接口,将消息存储到物理硬盘和提供消息查询功能。
HA Service:
- 高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。
Index Service:
- 索引服务,根据特定的 Message key,对投递到 Broker 的消息进行索引服务,同时也提供根据 Message Key 对消息进行快速查询的功能。
Broker 集群部署架构

(1) 为了增强 Broker 的性能与吞吐量,Broker 通常以集群形式部署。各个 Broker 集群节点中可能存放着相同 Topic 的不同 Queue(即分区)。不过,这里存在一个问题:如果某个 Broker 节点宕机,如何保证数据不丢失?解决方案是对 Broker 节点进行横向扩展,即为每个 Broker 节点构建一个高可用(HA)集群,从而避免单点故障。
(2) Broker 集群本质上是一个主从集群,集群节点分为 Master 节点 与 Slave 节点两种角色,其中 Master 节点负责处理读写操作请求,Slave 节点负责对 Master 节点中的数据进行同步备份。当 Master 节点宕机后,在具备自动切换能力(例如 DLedger 模式或者 Controller 模式)的情况下,Slave 节点可以被提升为 Master 节点继续对外提供服务,因此整体上可以看作主备架构。特别注意,在 RocketMQ
4.5版本之前,传统的主从架构并不支持自动主从切换;RocketMQ 从4.5版本开始引入 DLedger 模式后,才开始支持自动主从切换;5.x版本又引入了 Controller 模式,但只有手动开启 Controller 才支持自动切换,否则仍然不具备自动主从切换能力。(3) 在主从关系上,一个 Master 节点可以对应多个 Slave 节点,但一个 Slave 节点只能隶属于一个 Master 节点。Master 节点 与 Slave 节点的对应关系是通过指定相同的 BrokerName、不同的 BrokerId 来确定的,其中 BrokerId 为 0 表示 Master 节点,非 0 表示 Slave 节点。每个 Broker 节点都会与 NameServer 集群中的所有节点建立长连接,并定时向所有 NameServer 节点注册 Topic 路由信息,以保证路由信息的一致性与可用性。
RocketMQ 不同版本对自动主从切换的支持
- RocketMQ
4.5版本之前:采用传统 Master-Slave 架构,不支持自动主从切换,一旦 Master 节点宕机,Slave 节点不会自动升级为 Master 节点,需要人工干预恢复服务。 - RocketMQ
4.5版本及之后:引入基于 Raft 协议的 DLedger 模式,支持自动 Leader 选举(相当于自动 Master)、节点故障自动切换以及强一致性复制,可以实现高可用。 - RocketMQ
5.x版本:引入 Controller 模式(官方推荐),支持主从自动切换,可独立部署 Controller 组件,具备更高灵活性,逐步替代 DLedger 模式。手动开启 Controller 才支持自动切换,否则仍不具备自动主从切换能力。
工作流程
核心工作流程
RocketMQ 的核心工作流程如下:
(1) 启动 NameServer 后,其开始监听端口,等待 Broker、Producer 和 Consumer 的连接。随后启动 Broker,Broker 会与 NameServer 集群中的所有节点建立并保持长连接,并通过定时任务周期性(默认每隔 30 秒)向 NameServer 发送心跳包,同时上报自身的 Topic、Queue 等路由信息。
(2) 在 Producer 发送消息之前,可以先手动创建 Topic,并指定该 Topic 存储在哪些 Broker 节点上。Topic 创建时会将自己与 Broker 的映射关系写入到 NameServer 中。不过这一步是可选的,在默认配置下,也可以在 Producer 首次发送消息时由 Broker 自动创建 Topic(每个 Broker 默认创建 4 个 Queue)。
(3) Producer 在启动时会先与 NameServer 集群中的任意一个节点建立长连接,并从 NameServer 获取 Topic 的路由信息,即 Topic 下各个 Queue 与 Broker 地址(IP + Port)之间的映射关系。随后,Producer 会根据负载均衡策略从这些 Queue 中选择一个,并与该 Queue 所在的 Broker 建立长连接以发送消息。Producer 获取到的路由信息会被缓存到本地,并每隔 30 秒从 NameServer 拉取一次最新路由信息进行更新。
(4) Consumer 的工作流程与 Producer 类似,它也会先与 NameServer 集群中的任意一个节点建立长连接,获取所订阅 Topic 的路由信息,并根据负载均衡策略确定需要消费的 Queue,然后直接与该 Queue 所在的 Broker 建立长连接进行消息消费。Consumer 同样会缓存路由信息到本地并定期更新(每隔 30 秒),并与 Broker 保持长连接。与 Producer 不同的是,Consumer 会通过客户端实例向 Broker 发送心跳包,以维持消费关系(如消费者组、订阅信息等),并确保 Broker 能感知其在线状态。
Topic 的创建模式
在 RocketMQ 中,支持自动创建和手动创建 Topic 两种模式。
自动创建 Topic:
- 自动创建 Topic 时,默认是基于 Broker 端的配置信息来创建的。
- 通常情况下,每个 Broker 会按照其默认配置(比如
defaultTopicQueueNums参数,默认值一般为 4)为该 Topic 创建对应数量的 Queue。 - 因此,从效果上看类似于 “每个 Broker 默认创建 4 个 Queue”,但本质上是由各个 Broker 独立决定的,而不是一个全局统一的配置策略。
手动创建 Topic:
- 可以为不同的 Broker 节点指定不同的 Queue 数量,也可以为所有 Broker 节点统一指定相同的 Queue 数量。
- 如果为所有 Broker 节点指定相同的 Queue 数量,那么该 Topic 在整个 Broker 集群中的各个节点上具有一致的队列结构。
- 如果为不同的 Broker 节点分别指定不同的 Queue 数量,则该 Topic 在各个 Broker 节点上的 Queue 数量可以不同,从而实现更灵活的资源分配。
特别注意
每个 Broker 节点上的 Queue 数量如果不一致,会影响系统的整体负载均衡效果,不仅会影响 Producer 发送消息的均匀性,还会影响 Consumer 消费消息的均衡性。具体来说,Producer 在发送消息时是基于 Queue 进行负载均衡(如轮询或基于权重的选择),如果某些 Broker 节点上的 Queue 数量更多,那么这些 Broker 被选中的概率就更高,从而导致流量倾斜;而在 Consumer 侧,消息消费是以 Queue 为单位进行分配的(一个 Queue 同一时刻只能被一个 Consumer 实例消费),当 Queue 分布不均时,就可能出现部分 Consumer 负载较高,而部分 Consumer 较为空闲的情况,进而影响整体消费吞吐与资源利用率。
读 / 写队列的概念
从物理实现上来看,RocketMQ 中并不存在真正独立的 “读队列” 和 “写队列”,二者本质上是同一批 Queue,只是在逻辑层面通过 “读队列数(
readQueueNums)” 和 “写队列数(writeQueueNums)” 进行区分。因此,不存在读写队列之间的数据同步问题。一般情况下,读队列数量与写队列数量是相同的,以保证生产消息和消费消息的均衡性。例如,在创建 Topic 时如果设置写队列数量为 8 和读队列数量为 4,那么系统实际会创建 8 个 Queue(编号为 0~7)。Producer 可以向这 8 个 Queue 写入消息,但 Consumer 只会从前 4 个 Queue(0~3)中拉取消息,后 4 个 Queue(4~7)中的消息将暂时不会被消费。相反,如果设置写队列数量为 4 和读队列数量为 8,那么系统同样会创建 8 个 Queue(0~7),但 Producer 只会向前 4 个 Queue(0~3)写入消息,而 Consumer 会尝试从全部 8 个 Queue 拉取消息,其中 4~7 号 Queue 实际上是没有消息的。在这种情况下,如果一个 Consumer Group(消费者组)中有两个 Consumer 实例,可能会出现一个 Consumer 实例消费 0~3 队列而负载较高,另一个 Consumer 实例分配到 4~7 队列却无消息可消费的情况,从而导致消费不均衡。
也就是说,当读队列数与写队列数不一致时,往往会带来负载不均或消息不可见的问题,因此在常规场景下不建议这样配置。但这种设计并不是没有意义,其主要目的是为了支持 Topic 的 Queue 平滑缩容。例如,一个 Topic 最初创建时包含 16 个 Queue,如果希望将其缩容为 8 个且不丢失消息,可以先动态将写队列数量调整为 8,而保持读队列数量仍为 16。此时新消息只会写入前 8 个 Queue,而 Consumer 仍然会消费全部 16 个 Queue 中的消息。当后 8 个 Queue(8~15)中的历史消息被完全消费后,再将读队列数量调整为 8,即可完成整个缩容过程。在这个过程中既不会丢失数据,也不会影响正在进行的消费。
此外,在 RocketMQ 的控制台管理界面中(如下图所示),还可以配置 Topic 的
perm参数,其作用是设置 Topic 的操作权限,其中 2 表示只写,4 表示只读,6 表示可读可写。

