RocketMQ 4.x 入门教程之一

大纲

前言

学习资源

版本说明

本文所使用的各软件版本如下表所示:

组件版本说明
JDK11Java、SpringBoot、SpringCloud 项目案例使用的 JDK 版本
RocketMQ Server4.9.0RocketMQ 服务端(包括 NameServer 和 Broker),运行时依赖 JDK 1.8+

消息队列的概述

MQ 的概述

MQ(Message Queue)是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程 API 的软件系统。消息即数据,一般消息的体积不会很大。

MQ 的用途

流量削峰

MQ 可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高。而异步调用则可以解决这些问题,所以两层之间若要实现由同步到异步的转化,一般性做法就是在这两层间添加一个 MQ 层。

数据收集

分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过 MQ 完成此类数据收集是最好的选择。

MQ 协议有哪些

一般情况下 MQ 的实现是要遵循一些常规性协议的。常见的 MQ 协议如下:

  • JMS

    • JMS,Java Messaging Service(Java 消息服务)。
    • JMS 是 Java 平台上有关 MOM(Message Oriented Middleware,面向消息的中间件 PO/OO/AO)的技术规范。
    • JMS 用于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的生产、发送、接收消息的接口,简化企业应用的开发。
    • ActiveMQ 是 JMS 协议的典型实现。
  • STOMP

    • STOMP,Streaming Text Orientated Message Protocol(面向流文本的消息协议),是一种 MOM 设计的简单文本协议。
    • STOMP 提供一个可互操作的连接格式,允许客户端与任意 STOMP 消息代理(Broker)进行交互。
    • ActiveMQ 是 STOMP 协议的典型实现,RabbitMQ 可以通过插件支持该协议。
  • AMQP

    • AMQP,Advanced Message Queuing Protocol(高级消息队列协议)。
    • AMQP 是一个提供统一消息服务的应用层标准,是应用层协议的一个开放标准,是一种 MOM 设计。
    • 基于 AMQP 协议的客户端与消息中间件可传递消息,并不受客户端 / 中间件不同产品,不同开发语言等条件的限制。
    • RabbitMQ 是 AMQP 协议的典型实现。
  • MQTT

    • MQTT,Message Queuing Telemetry Transport(消息队列遥测传输)。
    • MQTT 是 IBM 开发的一个即时通讯协议,是一种二进制协议,主要用于服务器和低功耗 IoT(物联网)设备间的通信。
    • MQTT 协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器的通信协议。
    • RabbitMQ 可以通过插件支持 MQTT 协议。

MQ 产品有哪些

  • ActiveMQ

    • ActiveMQ 是使用 Java 语言开发一款 MQ 产品。
    • 早期很多公司与项目中都在使用,但现在的社区活跃度已经很低,目前在项目中已经很少使用了。
  • RabbitMQ

    • RabbitMQ 是使用 ErLang 语言开发的一款 MQ 产品。
    • 其吞吐量较 Kafka 与 RocketMQ 要低,且由于其不是 Java 语言开发,所以大多数公司内部对其实现定制化开发难度较大。
  • Kafka

    • Kafka 是使用 Scala / Java 语言开发的一款 MQ 产品。
    • 其最大的特点就是高吞吐率,常用于大数据领域的实时计算、日志采集等场景。
    • 其没有遵循任何常见的 MQ 协议(比如 AMQP),而是使用自研协议。对于 Spring Cloud Netflix,其仅支持 RabbitMQ 与 Kafka。
  • RocketMQ

    • RocketMQ 是使用 Java 语言开发的一款 MQ 产品。
    • 经过多年阿里双 11 的考验,性能与稳定性都非常高。
    • 其没有遵循任何常见的 MQ 协议(比如 AMQP),而是使用自研协议。对于 Spring Cloud Alibaba,其支持 RabbitMQ、Kafka,但推荐使用 RocketMQ。

MQ 选型的建议

不同 MQ 的适用场景

  • Kafka

    • Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。
    • 大型公司建议可以选用,如果有日志采集功能,肯定是首选 Kafka。
  • RocketMQ

    • 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。
    • RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验。如果业务有上述并发场景,建议可以选择 RocketMQ。
  • RabbitMQ

    • 结合 Erlang 语言本身的并发优势,性能较好,时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便。
    • 如果数据量并不是很大,中小型公司优先选择功能比较完备的 RabbitMQ。

不同 MQ 产品的对比

关键词 ActiveMQRabbitMQKafkaRocketMQ
开发语言 JavaErlangJavaJava
单机吞吐量万级万级十万级十万级
时效性毫秒级微秒级(低延迟)毫秒级(高吞吐优先)毫秒级(延迟与吞吐平衡较好)
Topic-- 百级 Topic 时会影响吞吐量千级 Topic 时会影响吞吐量
社区活跃度

RocketMQ 的概述

RocketMQ 是⼀款阿⾥巴巴开源的消息中间件,是一个统一消息引擎、轻量级数据处理平台。2016 年 11 ⽉ 28 ⽇,阿⾥巴巴向 Apache 软件基⾦会捐赠 RocketMQ,成为 Apache 孵化项⽬。2017 年 9 ⽉ 25 ⽇,Apache 宣布 RocketMQ 孵化成为 Apache 顶级项⽬(TLP ),成为国内⾸个互联⽹中间件在 Apache 上的顶级项⽬。

发展历程

  • 2007 年,阿里开始五彩石项目,Notify 作为项目中交易核心消息流转系统,应运而生。Notify 系统是 RocketMQ 的雏形。
  • 2010 年,B2B 大规模使用 ActiveMQ 作为阿里的消息内核。阿里急需一个具有海量堆积能力的消息系统。
  • 2011 年初,Kafka 开源。淘宝中间件团队在对 Kafka 进行了深入研究后,开发了一款新的 MetaQ。
  • 2012 年,MetaQ 发展到了 v3.0 版本,在它基础上进行了进一步的抽象,形成了 RocketMQ,然后就将其进行了开源。
  • 2015 年,阿里在 RocketMQ 的基础上,又推出了一款专门针对阿里云上用户的消息系统 Aliware MQ。
  • 2016 年双十一,RocketMQ 承载了万亿级消息的流转,跨越了一个新的里程碑。同年 11 ⽉ 28 ⽇,阿⾥巴巴向 Apache 软件基⾦会捐赠 RocketMQ,成为 Apache 孵化项⽬。
  • 2017 年 9 ⽉ 25 ⽇,Apache 宣布 RocketMQ 孵化成为 Apache 顶级项⽬(TLP ),成为国内⾸个互联⽹中间件在 Apache 上的顶级项⽬。

基础概念

消息(Message)

  • 消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。

主题(Topic)

  • Topic 表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。
  • 一个生产者可以同时发送多种 Topic 的消息;而一个消费者只对某种特定的 Topic 感兴趣,即只可以订阅和消费一种 Topic 的消息。

提示

  • ActiveMQ:基于 JMS,明确支持 Topic。
  • Kafka、RocketMQ:原生以 Topic 为核心抽象。
  • RabbitMQ:没有直接的 Topic 实体,但通过 Topic Exchange 实现类似功能。

标签(Tag)

  • 为消息设置的标签,用于同一主题(Topic)下区分不同类型的消息。
  • 来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。
  • 标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。
  • 消费者可以根据标签实现对不同子主题的不同消费逻辑,实现更好的扩展性。
  • 可以理解为:Topic 是消息的一级分类,Tag 是消息的二级分类。

队列(Queue)

  • 队列是存储消息的物理实体。
  • 一个 Topic 中可以包含多个 Queue,每个 Queue 中存储的就是该 Topic 的消息。
  • 一个 Topic 的 Queue 也被称为一个 Topic 中消息的分区(Partition)。
  • 一个 Topic 的某一个 Queue 中的消息,只能被一个消费者组中的一个消费者消费。
  • 一个 Queue 中的消息,不允许同一个消费者组中的多个消费者同时消费。

  • 在学习参考其它相关资料时,可能还会看到一个非官方的概念:分片(Sharding)。
  • 分片不同于分区,在 RocketMQ 中,分片指的是存储相应 Topic 的 Broker。
  • 每个分片(即 Broker)中会创建出相应数量的分区(即 Queue),每个 Queue 的大小都是相同的。

消息标识(MessageId / Key)

RocketMQ 中每个消息都拥有唯一的 MessageId,且可以携带具有业务标识的 Key,以方便对消息的查询。不过需要注意的是,MessageId 有两个:在生产者发送消息时会自动生成一个 MessageId(msgId),当消息到达 Broker 后,Broker 也会自动生成一个 MessageId(offsetMsgId)。这里的 msgIdoffsetMsgId 与 Key 都称为消息标识。

  • msgId:

    • 由 Producer 端生成,其生成规则为:producerIp + 进程 PID + MessageClientIDSetter 类的 ClassLoader 的 hashCode + 当前时间 + AutomicInteger 自增计数器
  • offsetMsgId:

    • 由 Broker 端生成,其生成规则为:brokerIp + 物理分区的 offset(Queue 中的偏移量)
  • key:

    • 由用户指定的业务相关的唯一标识

系统架构

RocketMQ 整体的系统架构图

Producer

消息生产者(Producer),负责生产消息。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败(Failfast)并且低延迟。

  • 例如,业务系统产生的日志写入到 MQ 的过程,就是消息生产的过程。
  • 再如,电商平台中用户提交的秒杀请求写入到 MQ 的过程,就是消息生产的过程。

RocketMQ 中的消息生产者都是以生产者组(Producer Group)的形式出现的。在 RocketMQ 中,生产者组是具有相同业务语义的一组生产者实例的逻辑集合。组内多个生产者发送的消息类型一致(业务上),但可以将消息发送到多个不同的 Topic。

Kafka 不存在生产组的概念

Kafka 没有生产者组的概念,Producer 是独立的、无状态的,不需要分组管理。Kafka 只在消费者侧提供 Consumer Group(消费者组)概念,用于实现负载均衡和消费进度管理。而像 RocketMQ 中的 Producer Group,主要用于事务消息回查等机制,在 Kafka 中并不存在类似设计。

Consumer

消息消费者(Consumer),负责消费消息。一个消息消费者会从 Broker 服务器中获取到消息,并对消息进行相关业务处理。

  • 例如,QoS 系统从 MQ 中读取日志,并对日志进行解析处理的过程就是消息消费的过程。
  • 再如,电商平台的业务系统从 MQ 中读取到秒杀请求,并对请求进行处理的过程就是消息消费的过程。

RocketMQ 中的消息消费者都是以消费者组(Consumer Group)的形式出现的。在 RocketMQ 中,消费者组(Consumer Group)是同一类消费者的集合,这些 Consumer 通常订阅并消费相同 Topic 的消息。消费者组的设计使得消息消费能够方便地实现负载均衡和容错:在负载均衡方面,是将一个 Topic 下的多个 Queue 分配给同一 Consumer Group 内的不同 Consumer 进行消费(而不是对单条消息进行负载均衡);在容错方面,当某个 Consumer 宕机后,其负责的 Queue 会被重新分配给组内其他 Consumer,从而保证消息能够继续被消费。

消费者组中 Consumer 的数量应该小于等于订阅 Topic 的 Queue 数量。如果超出 Queue 数量,则多出的 Consumer 将不能消费消息。

不过,同一个 Topic 的消息可以被多个消费者组同时消费,但需要注意:

  • (1) 一个消费者组只能消费一个 Topic 的消息,不能同时消费多个 Topic 消息。
  • (2) 同一个消费者组中的消费者必须订阅完全相同的 Topic。

Name Server

功能介绍

NameServer 是 RocketMQ 中用于维护 Topic 与 Broker 路由关系的轻量级注册中心,主要负责 Broker 的注册与发现,以及路由信息的管理。RocketMQ 的设计思想参考了 Kafka。早期的 Kafka 依赖于 Zookeeper 进行元数据管理,因此在 RocketMQ 的前身 MetaQ 的早期版本(v1.0v2.0)中,也曾依赖 Zookeeper。从 MetaQ v3.0(即 RocketMQ)开始,去除了对 Zookeeper 的依赖,转而采用自研的 NameServer 组件来实现服务发现与路由管理。NameServer 的核心功能如下:

  • Broker 管理:

    • 接受 Broker 集群的注册信息,并且保存下来作为路由信息的基本数据。
    • 提供心跳检测机制,检查 Broker 是否还存活。
  • 路由信息管理:

    • 每个 NameServer 中都保存着 Broker 集群的整个路由信息和用于客户端查询的队列信息。
    • Producer 和 Conumser 通过 NameServer 可以获取整个 Broker 集群的路由信息,从而进行消息的投递和消费。

路由注册

NameServer 通常是以集群的方式部署,不过,NameServer 集群是无状态的,即 NameServer 集群中的各个节点间是没有地位差异的,各节点之间相互不进行信息通讯。那各节点中的数据是如何进行数据同步的呢?在 Broker 节点启动时,会轮询 NameServer 列表,与每个 NameServer 节点建立长连接,发起注册请求。在每个 NameServer 的内部维护着⼀个完整的 Broker 列表,用来动态存储 Broker 的信息。特别注意,这与其它 Zookeeper、Eureka、Nacos 等注册中心不同的地方。这种 NameServer 的无状态方式,其优缺点如下:

  • 优点:
    • NameServer 集群搭建简单,扩容简单。
  • 缺点:
    • 对于 Broker 来说,必须明确指出所有 NameServer 地址,否则未指出的 NameServer 将不会去注册;也正因为如此,NameServer 并不能随便扩容。
    • 因为,如果 Broker 不重新配置,新增的 NameServer 节点对于 Broker 来说是不可见的,其不会向这个 NameServer 进行注册。

Broker 节点为了维护与 NameServer 之间的长连接,证明自己是活着的,会将最新的信息以心跳包的方式上报给 NameServer,每隔 30 秒发送一次心跳包。心跳包中包含 BrokerId、Broker 地址 (IP + Port)、Broker 名称、Broker 所属集群名称等等。NameServer 在接收到心跳包后,会更新 Broker 的心跳时间戳,记录这个 Broker 的最新存活时间。

路由剔除

由于 Broker 关机、宕机或网络抖动等原因,导致 NameServer 没有收到 Broker 的心跳包,NameServer 可能会将其从 Broker 列表中剔除。NameServer 中有⼀个定时任务,每隔 10 秒就会扫描⼀次 Broker 列表,检查每一个 Broker 的最新心跳时间戳距离当前时间是否超过 120 秒,如果超过则会判定 Broker 失效,然后将其从 Broker 列表中剔除

  • 对于 RocketMQ 的日常运维工作,例如 Broker 升级版本,需要停掉 Broker 的运行,此时 OP(运维工程师)需要怎么做呢?
  • OP 需要将 Broker 的读写权限禁止掉,一旦 Client(Producer 和 Consumer)向 Broker 发送请求,都会收到 Broker 的 NO_PERMISSION 响应,然后 Client 会进行对其它 Broker 的重试。
  • 当 OP 观察到这个 Broker 没有流量后,再关闭它,从而实现 Broker 从 NameServer 的移除。

专业术语

  • OP:Operations Engineer,运维工程师,偏传统运维(人工 + 工具)
  • SRE:Site Reliability Engineer,站点可靠性工程师,偏工程化运维(代码 + 自动化 + 平台化)

路由发现

RocketMQ 的路由发现机制采用的是 Pull 模型。当 Topic 路由信息出现变化时,NameServer 不会主动推送给客户端,而是客户端定时拉取 Topic 最新的路由信息。客户端默认每隔 30 秒会拉取一次最新的路由信息。常见的客户端与服务端通信模型有以下几种:

  • Push 模型(推送模型)

    • 实时性最好,是典型的 “发布 - 订阅(Pub / Sub)” 模型(如 WebSocket、MQ 推送)。
    • 需要维护长连接(如 TCP / WebSocket),服务器需持续占用连接资源。
    • 适用场景:
      • 实时性要求极高(行情、IM、通知推送)
      • 客户端数量可控
      • 服务端数据变化频繁
  • Pull 模型(拉取模型)

    • 客户端主动定时请求服务端获取数据(轮询)。
    • 优点:
      • 实现简单
      • 服务端无连接压力(无状态)
    • 缺点:
      • 实时性较差(取决于轮询间隔)
      • 可能产生大量无效请求(数据没变也请求)
    • 适用场景:
      • 实时性要求不高
      • 客户端数量较多(容易横向扩展)
  • Long Polling 模型(长轮询模型)

    • 本质是对 Pull 的优化:客户端发起请求后,当服务端发现没有数据,不会立即返回响应结果,而是 “挂起等待”
    • 当数据发生变化或超时后服务端再返回响应结果,客户端再发起下一次请求(伪实时 + HTTP 兼容)
    • 优点:
      • 实时性接近 Push
      • 无需维护真正的长连接(比 WebSocket 简单)
      • 缺点:
      • 服务端需要维护大量挂起请求(线程 / 连接压力)
      • 实现复杂度高于普通轮询
      • 适用场景:
      • 需要较高实时性,但不方便使用 WebSocket
      • 兼容性要求高(HTTP 场景)
    • 实现方式:
      • Long Polling 本身并不依赖 epoll 等 I/O 多路复用技术,它只是应用层的一种通信模型
      • 但在高并发场景下,由于 Long Polling 会产生大量挂起的线程 / 连接,服务端通常会结合 epoll 等 I/O 多路复用技术来提高连接处理效率,避免线程资源被耗尽

客户端选择 NameServer 节点的策略

客户端(Producer 与 Consumer)在配置时必须要写上 NameServer 集群的地址,那么客户端到底连接的是哪个 NameServer 节点呢?客户端首先会生产一个随机数,然后再与 NameServer 节点数量取模,此时得到的就是所要连接的 NameServer 节点索引,然后就会进行连接。如果 NameServer 连接失败,则会采用 Round-Robin 策略(轮询),逐个尝试着去连接其它 NameServer 节点。简而言之,首先采用的是随机策略选择 NameServer 节点,一旦连接失败后,再采用轮询策略选择其它 NameServer 节点。

Zookeeper Client 是如何选择 Server 节点的呢?

Zookeeper Client 在连接 Zookeeper Server 集群时,会先对配置的 Server 列表进行一次随机打乱(Shuffle),然后按顺序尝试连接(通常从第一个开始);如果选中的是域名,则解析出多个 IP 后再进行一次随机打乱(Shuffle),最终选择第一个 IP 地址进行连接,从而实现简单的负载均衡和容灾。

Broker

Broker 的功能介绍

Broker 充当着消息中转角色,负责存储消息、转发消息。Broker 在 RocketMQ 系统中负责接收并存储从生产者发送过来的消息,同时为消费者的消息拉取请求作准备。Broker 同时也存储着与消息相关的元数据,包括消费者组消费进度偏移 Offset、Topic、Queue 等。

Kafka 存储 Offset 信息的位置

Kafka 在 0.9 及以后的版本中,Offset 信息是存储在 Broker 中的,之前的版本是存储在 Zookeeper 中的。

Broker 的模块组成

  • Remoting Module:

    • 整个 Broker 的实体,负责处理来自客户端(Producer 和 Consumer)的请求,而这个 Broker 实体则由以下子模块构成。
  • Client Manager:

    • 客户端管理器,负责接收和解析客户端(Producer 和 Consumer)请求、管理客户端。
    • 例如,维护 Consumer 的 Topic 订阅信息。
  • Store Service:

    • 存储服务,提供方便简单的 API 接口,将消息存储到物理硬盘和提供消息查询功能。
  • HA Service:

    • 高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。
  • Index Service:

    • 索引服务,根据特定的 Message key,对投递到 Broker 的消息进行索引服务,同时也提供根据 Message Key 对消息进行快速查询的功能。

Broker 集群部署架构

  • (1) 为了增强 Broker 的性能与吞吐量,Broker 通常以集群形式部署。各个 Broker 集群节点中可能存放着相同 Topic 的不同 Queue(即分区)。不过,这里存在一个问题:如果某个 Broker 节点宕机,如何保证数据不丢失?解决方案是对 Broker 节点进行横向扩展,即为每个 Broker 节点构建一个高可用(HA)集群,从而避免单点故障。

  • (2) Broker 集群本质上是一个主从集群,集群节点分为 Master 节点 与 Slave 节点两种角色,其中 Master 节点负责处理读写操作请求,Slave 节点负责对 Master 节点中的数据进行同步备份。当 Master 节点宕机后,在具备自动切换能力(例如 DLedger 模式或者 Controller 模式)的情况下,Slave 节点可以被提升为 Master 节点继续对外提供服务,因此整体上可以看作主备架构。特别注意,在 RocketMQ 4.5 版本之前,传统的主从架构并不支持自动主从切换;RocketMQ 从 4.5 版本开始引入 DLedger 模式后,才开始支持自动主从切换;5.x 版本又引入了 Controller 模式,但只有手动开启 Controller 才支持自动切换,否则仍然不具备自动主从切换能力。

  • (3) 在主从关系上,一个 Master 节点可以对应多个 Slave 节点,但一个 Slave 节点只能隶属于一个 Master 节点。Master 节点 与 Slave 节点的对应关系是通过指定相同的 BrokerName、不同的 BrokerId 来确定的,其中 BrokerId 为 0 表示 Master 节点,非 0 表示 Slave 节点。每个 Broker 节点都会与 NameServer 集群中的所有节点建立长连接,并定时向所有 NameServer 节点注册 Topic 路由信息,以保证路由信息的一致性与可用性。

RocketMQ 不同版本对自动主从切换的支持

  • RocketMQ 4.5 版本之前:采用传统 Master-Slave 架构,不支持自动主从切换,一旦 Master 节点宕机,Slave 节点不会自动升级为 Master 节点,需要人工干预恢复服务。
  • RocketMQ 4.5 版本及之后:引入基于 Raft 协议的 DLedger 模式,支持自动 Leader 选举(相当于自动 Master)、节点故障自动切换以及强一致性复制,可以实现高可用。
  • RocketMQ 5.x 版本:引入 Controller 模式(官方推荐),支持主从自动切换,可独立部署 Controller 组件,具备更高灵活性,逐步替代 DLedger 模式。手动开启 Controller 才支持自动切换,否则仍不具备自动主从切换能力。

工作流程

核心工作流程

RocketMQ 的核心工作流程如下:

  • (1) 启动 NameServer 后,其开始监听端口,等待 Broker、Producer 和 Consumer 的连接。随后启动 Broker,Broker 会与 NameServer 集群中的所有节点建立并保持长连接,并通过定时任务周期性(默认每隔 30 秒)向 NameServer 发送心跳包,同时上报自身的 Topic、Queue 等路由信息。

  • (2) 在 Producer 发送消息之前,可以先手动创建 Topic,并指定该 Topic 存储在哪些 Broker 节点上。Topic 创建时会将自己与 Broker 的映射关系写入到 NameServer 中。不过这一步是可选的,在默认配置下,也可以在 Producer 首次发送消息时由 Broker 自动创建 Topic(每个 Broker 默认创建 4 个 Queue)。

  • (3) Producer 在启动时会先与 NameServer 集群中的任意一个节点建立长连接,并从 NameServer 获取 Topic 的路由信息,即 Topic 下各个 Queue 与 Broker 地址(IP + Port)之间的映射关系。随后,Producer 会根据负载均衡策略从这些 Queue 中选择一个,并与该 Queue 所在的 Broker 建立长连接以发送消息。Producer 获取到的路由信息会被缓存到本地,并每隔 30 秒从 NameServer 拉取一次最新路由信息进行更新。

  • (4) Consumer 的工作流程与 Producer 类似,它也会先与 NameServer 集群中的任意一个节点建立长连接,获取所订阅 Topic 的路由信息,并根据负载均衡策略确定需要消费的 Queue,然后直接与该 Queue 所在的 Broker 建立长连接进行消息消费。Consumer 同样会缓存路由信息到本地并定期更新(每隔 30 秒),并与 Broker 保持长连接。与 Producer 不同的是,Consumer 会通过客户端实例向 Broker 发送心跳包,以维持消费关系(如消费者组、订阅信息等),并确保 Broker 能感知其在线状态。

Topic 的创建模式

在 RocketMQ 中,支持自动创建和手动创建 Topic 两种模式。

  • 自动创建 Topic:

    • 自动创建 Topic 时,默认是基于 Broker 端的配置信息来创建的。
    • 通常情况下,每个 Broker 会按照其默认配置(比如 defaultTopicQueueNums 参数,默认值一般为 4)为该 Topic 创建对应数量的 Queue。
    • 因此,从效果上看类似于 “每个 Broker 默认创建 4 个 Queue”,但本质上是由各个 Broker 独立决定的,而不是一个全局统一的配置策略。
  • 手动创建 Topic:

    • 可以为不同的 Broker 节点指定不同的 Queue 数量,也可以为所有 Broker 节点统一指定相同的 Queue 数量。
    • 如果为所有 Broker 节点指定相同的 Queue 数量,那么该 Topic 在整个 Broker 集群中的各个节点上具有一致的队列结构。
    • 如果为不同的 Broker 节点分别指定不同的 Queue 数量,则该 Topic 在各个 Broker 节点上的 Queue 数量可以不同,从而实现更灵活的资源分配。

特别注意

每个 Broker 节点上的 Queue 数量如果不一致,会影响系统的整体负载均衡效果,不仅会影响 Producer 发送消息的均匀性,还会影响 Consumer 消费消息的均衡性。具体来说,Producer 在发送消息时是基于 Queue 进行负载均衡(如轮询或基于权重的选择),如果某些 Broker 节点上的 Queue 数量更多,那么这些 Broker 被选中的概率就更高,从而导致流量倾斜;而在 Consumer 侧,消息消费是以 Queue 为单位进行分配的(一个 Queue 同一时刻只能被一个 Consumer 实例消费),当 Queue 分布不均时,就可能出现部分 Consumer 负载较高,而部分 Consumer 较为空闲的情况,进而影响整体消费吞吐与资源利用率。

读 / 写队列的概念

  • 从物理实现上来看,RocketMQ 中并不存在真正独立的 “读队列” 和 “写队列”,二者本质上是同一批 Queue,只是在逻辑层面通过 “读队列数(readQueueNums)” 和 “写队列数(writeQueueNums)” 进行区分。因此,不存在读写队列之间的数据同步问题。一般情况下,读队列数量与写队列数量是相同的,以保证生产消息和消费消息的均衡性。

  • 例如,在创建 Topic 时如果设置写队列数量为 8 和读队列数量为 4,那么系统实际会创建 8 个 Queue(编号为 0~7)。Producer 可以向这 8 个 Queue 写入消息,但 Consumer 只会从前 4 个 Queue(0~3)中拉取消息,后 4 个 Queue(4~7)中的消息将暂时不会被消费。相反,如果设置写队列数量为 4 和读队列数量为 8,那么系统同样会创建 8 个 Queue(0~7),但 Producer 只会向前 4 个 Queue(0~3)写入消息,而 Consumer 会尝试从全部 8 个 Queue 拉取消息,其中 4~7 号 Queue 实际上是没有消息的。在这种情况下,如果一个 Consumer Group(消费者组)中有两个 Consumer 实例,可能会出现一个 Consumer 实例消费 0~3 队列而负载较高,另一个 Consumer 实例分配到 4~7 队列却无消息可消费的情况,从而导致消费不均衡。

  • 也就是说,当读队列数与写队列数不一致时,往往会带来负载不均或消息不可见的问题,因此在常规场景下不建议这样配置。但这种设计并不是没有意义,其主要目的是为了支持 Topic 的 Queue 平滑缩容。例如,一个 Topic 最初创建时包含 16 个 Queue,如果希望将其缩容为 8 个且不丢失消息,可以先动态将写队列数量调整为 8,而保持读队列数量仍为 16。此时新消息只会写入前 8 个 Queue,而 Consumer 仍然会消费全部 16 个 Queue 中的消息。当后 8 个 Queue(8~15)中的历史消息被完全消费后,再将读队列数量调整为 8,即可完成整个缩容过程。在这个过程中既不会丢失数据,也不会影响正在进行的消费。

  • 此外,在 RocketMQ 的控制台管理界面中(如下图所示),还可以配置 Topic 的 perm 参数,其作用是设置 Topic 的操作权限,其中 2 表示只写,4 表示只读,6 表示可读可写。