RocketMQ 4.x 入门教程之三

大纲

前言

学习资源

版本说明

本文所使用的各软件版本如下表所示:

组件版本说明
JDK11Java、SpringBoot、SpringCloud 项目案例使用的 JDK 版本
RocketMQ Server4.9.0RocketMQ 服务端(包括 NameServer 和 Broker),运行时依赖 JDK 1.8+

RocketMQ 的工作原理

消息的生产

消息的生产过程

Producer 可以将消息写入到某个 Broker 中的某个 Queue 中,其经历了如下过程:

  • Producer 在发送消息之前,会先向 NameServer 请求获取指定 Topic 的路由表
  • NameServer 返回该 Topic 的路由表以及对应的 Broker 列表
  • Producer 根据代码中指定的 Queue 选择策略,从 Queue 列表中选出一个 Queue,用于存储消息
  • Producer 会对消息进行必要的预处理,例如当消息体超过 4MB 时会进行压缩
  • Producer 向所选 Queue 所在的 Broker 发起 RPC 请求,将消息发送到对应的 Queue

路由表的介绍

路由表本质是一个 Map 结构,key 为 Topic 名称,value 为 QueueData 实例列表。需要注意的是,并不是一个 Queue 对应一个 QueueData,而是一个 Broker 中该 Topic 的所有 Queue 共同对应一个 QueueData。也就是说,只要某个 Broker 涉及该 Topic,就会对应一个 QueueData。QueueData 中包含 brokerName。简单理解,路由表可以看作是:key 为 Topic,value 为涉及该 Topic 的所有 brokerName 列表

Broker 列表的介绍

Broker 列表本质也是一个 Map 结构,key 为 brokerName,value 为 BrokerData。需要注意的是,并不是一个 Broker 对应一个 BrokerData,而是一组 brokerName 相同的 Master-Slave 小集群(如图所示)对应一个 BrokerData。BrokerData 中包含 brokerName 以及一个 Map,该 Map 的 key 为 brokerId,而 value 为对应 Broker 的地址,其中 brokerId 为 0 表示 Master 节点,非 0 表示 Slave 节点。

队列的选择算法

对于无序消息,Producer 在发送消息时,会根据 Queue(队列)选择算法(也称为消息投递算法)来选择目标 Queue,然后将消息发送出去。常见的 Queue 选择算法主要有两种:

  • 轮询算法:

    • 默认的 Queue 选择算法,能够保证消息在各个 Queue 之间均匀分布
    • 存在的问题:在某些情况下,部分 Broker 上的 Queue 投递延迟较高,可能导致 Producer 缓存队列中出现较大的消息积压,从而影响消息的投递性能
  • 最小投递延迟算法:

    • 通过统计每次消息投递的延迟时间,将消息优先投递到延迟最小的 Queue
    • 当多个 Queue 的投递延迟时间相同时,采用轮询算法进行投递,从而提升整体投递性能
    • 存在的问题:会导致消息在各个 Queue 上分配不均(数据倾斜),延迟较小的 Queue 可能积累大量消息,增加对应消费者的压力,降低消费能力,甚至可能引发消息堆积

消息的存储

Broker 中的消息存储在本地文件系统中,这些相关文件默认存放在当前用户主目录下的 store 目录中,如下图所示:

  • lock 文件

    • Broker 运行期间使用的全局资源锁文件
  • abort 文件

    • 该文件在 Broker 启动后会自动创建
    • 正常关闭 Broker 时,该文件会自动删除
    • 如果在未启动 Broker 的情况下发现该文件仍然存在,说明上一次 Broker 是非正常关闭
  • checkpoint 文件

    • 记录 commitlog、consumequeue、index 文件的最后刷盘时间戳
  • commitlog 目录

    • 存放 commitlog 文件
    • Broker 的所有消息数据都会写入到 commitlog 文件中
  • config 目录

    • 存放 Broker 运行期间的一些配置数据
  • consumequeue 目录

    • 存放 consumequeue 文件
    • consumequeue 文件用于存储消息在 commitlog 中的索引信息,从而可以根据 Topic + QueueId 定位到具体的消息
  • index 目录

    • 存放消息索引文件(indexFile)
    • 消息索引文件用于根据 key 或时间范围快速检索并定位到 commitlog 中的消息

commitlog

在很多网络资料中,commitlog 目录里面的文件通常被简单称为 “commitlog 文件”,但在 RocketMQ 源码中,这些文件被统一抽象并命名为 MappedFile 对象。简而言之,commitlog 文件等价于 MappedFile 文件

目录与文件

在 commitlog 目录里面,存放着很多个 MappedFile 文件,当前 Broker 中的所有消息都是落盘到这些 MappedFile 文件中的。MappedFile 文件的大小为 1G(小于等于 1G),文件名由 20 位十进制数构成,表示当前文件的第一条消息的起始偏移量(offset),如下表所示:

MappedFile 文件文件大小(字节)起始 offset 结束 offset 文件名
第 1 个文件 10737418240107374182300000000000000000000
第 2 个文件 107374182410737418242147483647000000001073741824
第 3 个文件 107374182421474836483221225471000000002147483648
  • 第一个 MappedFile 文件

    • 文件名由 20 位的 0 组成,即 00000000000000000000
    • 原因是:第一个 MappedFile 文件中第一条消息的偏移量(offset)为 0
  • MappedFile 文件的读写性能

    • MappedFile 文件是按顺序读写的,所有其读写效率很高
    • 无论是 SSD 磁盘还是 SATA 磁盘,通常情况下,顺序读写效率都会高于随机读写
  • MappedFile 文件的滚动机制

    • 当第一个 MappedFile 文件写满后,会自动创建下一个 MappedFile 文件继续存储消息
  • MappedFile 文件的命名规则

    • 每个 MappedFile 文件名表示该文件的起始偏移量(offset)
    • 假设第一个文件大小为 1073741824 字节(1G),则第二个 MappedFile 文件名为 00000000001073741824
    • 以此类推,第 n 个 MappedFile 文件名等于前 n-1 个 MappedFile 文件的大小之和(即其起始偏移量)
  • MappedFile 文件的连续性特点

    • 在同一个 Broker 中,所有 MappedFile 对应的起始偏移量(offset)是连续递增的

需要注意的是,一个 Broker 中仅包含一个 commitlog 目录,所有的 MappedFile 文件都是存放在该目录中的。即无论当前 Broker 中存储着多少个 Topic 的消息,这些消息都是被顺序写入到了 MappedFile 文件中的。也就是说,这些消息在 Broker 中存储时并没有被按照 Topic 进行分类存储

MappedFile 文件总结说明

  • 每个 MappedFile 文件 = 一段连续的物理文件区间
  • MappedFile 的文件名 = 这段区间的起始 offset
  • 每个 MappedFile 文件的起始 offset 是按顺序递增的
  • 下一个 MappedFile 文件的起始 offset = 上一个 MappedFile 文件的起始 offset + 固定文件大小(默认 1G)
消息单元

MappedFile 的文件内容由一个个消息单元(Message Unit)顺序组成。每个消息单元中包含丰富的消息元数据,主要包括:

  • 消息总长度(MsgLen)
  • 消息物理偏移量(PhysicalOffset)
  • 消息体长度(BodyLength)
  • 消息体内容(Body)
  • 消息主题(Topic)
  • 消息主题的长度(TopicLength)
  • 消息生产者地址(BornHost)
  • 消息发送时间戳(BornTimestamp)
  • 消息所在队列(QueueId)
  • 消息在 Queue 中的偏移量(QueueOffset)

此外,还包含其他多种扩展属性(整体约 20 余项),用于支持消息存储、索引构建以及查询等功能。


  • 消息单元中包含了与 Queue 相关的属性,因此在后续学习中,需要重点理解 commitlog 与 Queue 之间的关系
  • 在一个 MappedFile 文件中,第 m + 1 个消息单元在 commitlog 中的起始偏移量(offset)的计算方式如下:
  • L(m + 1) = L(m) + MsgLen(m),其中 m ≥ 0
    • m 表示消息的序号(第 m 条消息)
    • L(m) 表示第 m 条消息在 commitlog 中的起始偏移量(offset)
    • L(m + 1) 表示第 m + 1 条消息在 commitlog 中的起始偏移量(offset)
    • 这条公式代表的意思:下一条消息的起始 offset = 当前消息的起始 offset + 当前消息的长度

总结

每条消息都是顺序写入 MappedFile 的。第 m 条消息写完之后,第 m + 1 条消息会紧跟着它的末尾开始写入,所以当前消息的起始 offset 等于前一条消息的 offset 加上前一条消息的长度。

consumequeue

目录与文件

为了提高消息的消费效率,Broker 会为每个 Topic 在 ~/store/consumequeue 目录下创建对应的存储目录。

  • consumequeue 目录下,会按 Topic 名称创建子目录

    • 子目录名称 = Topic 名称
  • 在每个 Topic 目录下,会为该 Topic 的每个 Queue 创建子目录

    • 子目录名称 = queueId
  • 在每个 Queue 目录中,会创建若干个 consumequeue 文件

    • consumequeue 文件是 MappedFile 文件的索引结构
    • 通过 Queue 维度可以快速定位消息在 MappedFile 文件中的物理位置

~/store/consumequeue 目录下,对应的存储目录结构如下所示:

1
2
3
4
5
6
7
8
9
10
11
~/store/
└── consumequeue/
└── TopicA/
├── 0/
│ └── 00000000000000000000
├── 1/
│ └── 00000000000000000000
├── 2/
│ └── 00000000000000000000
└── 3/
└── 00000000000000000000

  • consumequeue 文件的核心作用

    • consumequeue 文件作为 MappedFile 文件的索引结构,可以快速定位到 MappedFile 文件中的具体消息
    • consumequeue 文件用于存储消息在 MappedFile 文件中的索引信息(如物理偏移量、消息大小等)
  • consumequeue 文件的命名规则

    • consumequeue 文件名由 20 位十进制数构成
    • 文件名表示当前 consumequeue 文件中第一个索引条目的起始偏移量(offset)
  • consumequeue 文件与 MappedFile 文件的区别

    • consumequeue 文件:
      • 文件大小固定
      • 因此后续文件名是按固定步长递增的
      • 文件名的命名规则相对简单且规律稳定
    • MappedFile 文件:
      • 文件名由实际物理偏移量(offset)决定
      • 随着消息不断写入,消息占用空间增加,导致当前 MappedFile 文件写满后,offset 按顺序递增,从而生成新的 MappedFile 文件
  • consumequeue 文件与 indexFile 文件的区别

    • 二者是针对 MappedFile 文件不同维度的索引
      • consumequeue 文件是按队列顺序消费的索引(即消费索引)
      • indexFile 文件是按 key 或时间范围查询的索引(即查询索引)
    • consumequeue(消费队列索引)
      • 维度:Topic + QueueId
      • 作用:支持顺序消费
      • 特点:
        • 按消息写入顺序递增
        • 通过逻辑 offset(QueueOffset)查找
        • 每条记录固定大小(20 字节)
      • 使用场景:
        • Consumer 拉取消息
        • 顺序 / 普通消费
    • indexFile(索引文件)
      • 维度:消息 key / 时间
      • 作用:支持快速查询消息
      • 特点:
        • 类似 Hash 索引结构
        • 支持根据 key 查询
        • 支持时间范围过滤
      • 使用场景:
        • 根据业务 key 查询消息
索引条目

  • 每个 consumequeue 文件最多包含约 30 万个索引条目(默认 300000 条)
  • 每个索引条目由固定长度的三部分组成:
    • 消息在 MappedFile 文件中的物理偏移量(CommitLog Offset,8 字节)
    • 消息长度(Size,4 字节)
    • 消息 Tag 的 Hash 值(Tag HashCode,8 字节)
  • 因此,每个索引条目的长度为 20 字节(8 + 4 + 8)
  • 每个 consumequeue 文件的大小固定为:
    • 300000 × 20 字节 = 6000000 字节(约 5.72 MB)
  • 在同一个 consumequeue 文件中:
    • 所有消息都属于同一个 Topic 和同一个 Queue(由目录结构决定)
    • 但每条消息的 Tag 可以不同

对文件的读写

消息写入

一条消息进入 Broker 后,通常需要经过以下几个步骤,最终完成持久化:

  • (1) Broker 根据 QueueId,计算并获取该消息在 consumequeue 文件中对应的写入位置(即 QueueOffset);
  • (2) 将 QueueId、QueueOffset 等元数据与消息体一起封装为一个完整的消息单元;
  • (3) 将该消息单元顺序写入 commitlog 文件(消息真正的持久化位置);
  • (4) 同时,根据消息内容生成对应的消息索引条目(如物理偏移量、消息大小、Tag Hash 等);
  • (5) 最后,将生成的索引条目分发并写入对应的 consumequeue 文件中,用于后续消费定位。
消息拉取

当 Consumer 从 Broker 拉取消息时,通常会经历以下几个步骤:

  • (1) Consumer 获取其要消费的 Queue 的消费偏移量(消费 offset),用于表示当前消费进度(即已经消费到该 Queue 的第几条消息)
    • 消费 offset 本质上是一个逻辑位置
    • 下一条待消费的消息位置通常为:消息 offset = 消费 offset + 1
  • (2) Consumer 向 Broker 发起拉取消息的请求,请求中会携带:
    • 目标 Queue(Topic + QueueId)
    • 要拉取的起始消息 offset
    • 订阅的 Tag 过滤条件
  • (3) Broker 根据请求中的消息 offset,计算其在 consumequeue 文件中对应的物理位置(queueOffset):
    • queueOffset = 消息 offset × 单个索引条目大小(20 字节)
  • (4) Broker 从该 queueOffset 位置开始,顺序查找符合指定 Tag 的索引条目
  • (5) 找到匹配的索引条目后,解析其前 8 个字节,得到该消息在 commitlog 文件中的物理偏移量(commitlog offset)
  • (6) Broker 根据该 commitlog offset,从 commitlog 文件中读取完整的消息单元,并返回给 Consumer

consumequeue 文件的作用

consumequeue 文件本质上是一个索引结构,通过固定大小的索引条目(20 字节),用于将逻辑上的消息 offset 映射到 commitlog 文件中的物理偏移量,从而避免 Broker 顺序扫描 commitlog 文件,实现消息的高效随机访问。

性能提升
高性能原理

在 RocketMQ 中,无论是消息本身还是消息索引,最终都是持久化存储在磁盘上的。那么,这是否会影响消息的读写性能?答案是:基本不会。实际上,RocketMQ 在主流消息队列产品中具有非常高的性能,这得益于其一系列优化机制。

  • 首先,RocketMQ 在文件读写上大量使用 mmap(内存映射机制),将对磁盘的操作转化为对内存地址的访问,并结合使用操作系统的 PageCache(页缓存),从而减少数据拷贝开销(” 接近零拷贝” 的效果),显著提升了读写效率。

  • 其次,consumequeue 中的索引数据是按顺序存储的,且每个索引条目都是固定大小(20 字节)。同时配合 PageCache(页缓存)的预读机制,使得对 consumequeue 的访问通常可以命中内存缓存,其读取性能接近于内存访问。即使在存在消息堆积的情况下,也不会成为性能瓶颈。


相较之下,RocketMQ 中可能影响性能的主要是对 commitlog 文件的读取。这是因为:

  • consumequeue 文件只负责索引定位消息的物理位置;
  • 真正的消息数据存储在 commitlog 文件中;
  • 根据索引从 commitlog 文件读取消息时,往往需要进行随机磁盘访问。

而随机 I/O 相比顺序 I/O 的性能要低得多。不过,这一问题可以通过多种方案来缓解,例如:

  • 使用 SSD 固态硬盘提升随机读性能;
  • 选择合适的操作系统 I/O 调度算法(如 Deadline);
  • 利用操作系统 PageCache(页缓存)提高缓存命中率。

总结

RocketMQ 通过 "顺序写 + 索引结构加速 + PageCache",将磁盘 I/O 的影响降到最低,从而实现高吞吐的读写性能。

mmap 机制

RocketMQ 没有直接使用 sendfile 这种严格意义上的零拷贝技术,而是通过 mmap + PageCache,将磁盘操作转化为内存操作,减少了用户态与内核态之间的数据拷贝,可以认为是一种 “接近零拷贝” 的高性能读写实现。

  • mmap(内存映射机制)

    • RocketMQ 主要通过 mmap 将文件映射到用户进程的虚拟内存(底层对应内核的 PageCache);
    • 应用程序对文件的读写,转化为对这块虚拟内存的操作;
    • 数据实际存储在 PageCache 中,并由操作系统负责刷盘。
    • 优点:
      • 减少用户态 / 内核态之间的数据拷贝
      • 减少系统调用开销
  • PageCache(页缓存)

    • 文件写入先进入 PageCache,再异步刷盘;
    • 文件读取优先命中 PageCache,避免磁盘 I/O。
  • 与真正零拷贝(sendfile)的区别

    • Kafka:
      • 大量使用 sendfile(零拷贝技术)
      • 数据可以直接从 PageCache 传输到网卡
      • 避免了数据从内核态拷贝到用户态、再从用户态拷贝回内核态的过程,从而减少了 CPU 和内存拷贝开销(真正的零拷贝实现)
    • RocketMQ:
      • 主要是使用 mmap + PageCache,将磁盘操作转化为内存操作,减少数据拷贝次数
      • mmap 可以将磁盘数据映射到用户进程的虚拟内存,避免传统读 / 写模型导致的用户态与内核态之间的显式数据拷贝,从而实现 “接近零拷贝” 的高性能读写

mmap 的本质作用

mmap 的作用是将文件映射到用户进程的虚拟内存,使应用程序可以通过操作内存地址来读写文件。但需要注意,这块 "内存" 并不是用户进程独立持有的数据副本,其底层实际对应的是操作系统内核的 PageCache。因此,应用对内存的修改本质上是在修改 PageCache,最终由操作系统负责将数据刷盘。

PageCache 机制

PageCache(页缓存)机制是操作系统对文件进行缓存的一种机制,用于加速文件的读写操作。通常情况下,程序对文件进行顺序读写时,其性能可以接近内存读写,主要原因在于操作系统利用 PageCache 对 I/O 进行了优化,将一部分内存用作文件缓存。

  • 写操作:

    • 操作系统会先将数据写入 PageCache,然后由内核线程(如 pdflush / flush / writeback)以异步方式将脏页(Dirty Page)刷写到物理磁盘。
  • 读操作:

    • 当程序读取数据时,优先从 PageCache 中读取;若未命中,则操作系统会从磁盘加载数据到 PageCache。
    • 同时,操作系统通常会进行预读取(readahead),将相邻的数据块一并加载到 PageCache,从而提升后续顺序读取的性能。

总结

PageCache 通过 "优先写入缓存、读取优先命中缓存、顺序访问时预读" 的机制,大幅提升了磁盘 I/O 性能。

与 Kafka 的对比

  • 存储模型的差异:

    • 存储模型的关系
      • RocketMQ 的很多设计思想来源于 Kafka,例如 commitlog 和 consumequeue 的存储模型就与 Kafka 有一定的相似性
      • 在 RocketMQ 中,commitlog + consumequeue 的组合,在整体设计上类似于 Kafka 中的 Partition(分区);而 commitlog 文件则类似于 Kafka 中的 Segment(段文件)
    • 在 Kafka 中:
      • 一个 Topic 会被划分为多个 Partition;
      • Partition 是物理概念,在磁盘上对应为 Topic 目录下的多个分区目录;
      • 每个 Partition 内包含多个 Segment 文件,用于实际存储消息数据;
      • Kafka 的消息存储目录结构如下:
        1
        2
        3
        4
        5
        6
        7
        Topic/
        ├── Partition-0/
        │ ├── Segment-0.log
        │ ├── Segment-1.log
        ├── Partition-1/
        ├── Segment-0.log
        ├── Segment-1.log
    • 在 RocketMQ 中:
      • 一个 Topic 会被划分为多个 Queue(即分区);
      • Queue 是逻辑概念,在 Broker 端对应的是 commitlog + consumequeue 的组合;
      • 消息实际存储在 commitlog 中,所有 Topic 的消息都会顺序写入 commitlog 文件;
      • consumequeue 用于按 Topic + QueueId 建立索引,将逻辑 offset 映射到 commitlog 的物理位置,从而加速消息定位与读取;
      • RocketMQ 的消息存储目录结构如下:
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        ~/store/
        ├── commitlog/
        │ ├── 00000000000000000000
        │ ├── 00000000000001024000
        ├── consumequeue/
        │ ├── TopicA/
        │ │ ├── 0/
        │ │ │ ├── 00000000000000000000
        │ │ │ ├── 00000000000000000020
        │ │ ├── 1/
        │ │ ├── 00000000000000000000
        │ │ ├── 00000000000000000020
  • 消息索引机制的差异:

    • Kafka 没有 Tag(标签)机制,消息过滤能力相对较弱(通常依赖 key 或上层处理)
    • Kafka 不使用类似 RocketMQ 的 consumequeue 索引结构:
      • 但 Kafka 并不是完全没有使用索引,而是没有像 RocketMQ 那样的 “每条消息对应一条索引(consumequeue)”
      • Kafka 中每个 Segment 文件都有对应的稀疏索引文件:
        • .index(offset → 物理位置)
        • .timeindex(时间 → offset)
      • Kafka 中索引使用的特点:
        • 使用稀疏索引(不是每条消息都有索引)
        • 先定位消息的大致位置,再顺序扫描一小段数据
        • 索引的磁盘空间占用小,但查找仍然很快
    • Kafka 与 RocketMQ 的索引机制对比
      • RocketMQ:随机读 + 完整索引(consumequeue),用空间换时间,实现快速随机定位每条消息的具体位置
      • Kafka:顺序读 + 稀疏索引,用扫描少量数据换取更简单的索引结构(用时间换空间),保证高吞吐的同时避免了额外索引结构的开销
  • 零拷贝实现方式的差异:

    • Kafka 基于 sendfile 实现真正的零拷贝能力,避免了用户态与内核态之间的数据拷贝,将数据传输尽量限制在内核态中完成,从而大幅减少 CPU 开销(严格来说是减少用户态参与的数据拷贝)
    • RocketMQ 基于 mmap 实现 “接近零拷贝” 的性能,通过将文件映射到用户进程的虚拟内存(底层对应内核的 PageCache),使应用程序对文件的读写转化为对内存的操作,从而避免了 PageCache 到用户态缓冲区的一次数据拷贝,实现性能优化

消息的索引

indexFile

除了可以通过指定 Topic 进行消息消费外,RocketMQ 还提供了根据 key 进行消息查询的功能。该查询是通过 ~/store 目录中的 index 子目录中的 indexFile 文件(如下所示)进行索引实现的快速查询。当然,这个 indexFile 文件中的索引数据是在包含了 key 的消息被发送到 Broker 时写入的;如果消息中没有包含 Key,则不会将对应的索引数据写入 indexFile 文件。

1
2
3
4
5
6
~/store/
└── index/
├── 20210811211858915
├── 20210811213000102
├── 20210811214533788
└── 20210811223012045
索引条目的结构

每个 Broker 中都会包含一组 indexFile 文件,每个 indexFile 文件通常以创建时的时间戳命名。每个 indexFile 由三部分组成:indexHeader、slots(哈希槽)、indexes(索引数据),如下图所示。其中,每个 indexFile 文件通常包含约 500 万个 slot 槽位,每个 slot 槽位通过哈希映射可能会挂载多个 index 索引条目,用于解决哈希冲突并实现消息索引的快速查找。


在 indexHeader 中,其固定大小是 40 个字节,存放着如下数据:

  • beginTimestamp:该 indexFile 文件中第一条消息的存储时间戳
  • endTimestamp:该 indexFile 文件中最后一条消息的存储时间戳
  • beginPhyOffset:该 indexFile 文件中第一条消息在 commitlog 文件中的物理偏移量(commitlog offset)
  • endPhyOffset:该 indexFile 文件中最后一条消息在 commitlog 文件中的物理偏移量(commitlog offset)
  • hashSlotCount:已被使用(已挂载索引单元)的 slot 数量,并不是所有 slot 都一定有索引数据,这里统计的是 “实际被占用的 slot 数量”
  • indexCount:该 indexFile 文件中索引单元的总数量,即所有 slot 下挂载的 index 索引单元数量之和(可能有多个 index 链挂在同一个 slot 上)

在 indexFile 文件中,最复杂的是 slots(哈希槽)与 indexes(索引数据)的关系。在实际存储时,indexes 是存放在 slots 后面的,但为了便于理解,将它们的关系展示为如下形式(类似 Java 中的 HashMap 存储结构):

  • key 的 hash 值对固定槽位数取模:slot = hash(key) % 500w,该结果即为对应的 slot 槽位。slot 中存储的是该 slot 当前最新的 indexNo,通过这个 indexNo 可以计算出该 index 索引单元在 indexFile 文件中的物理位置。

  • 由于 Hash 取模存在较高的冲突概率(即多个 key 会落在同一个 slot),RocketMQ 采用链表结构来解决该问题。每个 index 索引单元中增加了 preIndexNo 字段,用于指向该 slot 中 “前一个 index 索引单元”。这样多个冲突的 index 会通过 preIndexNo 串联起来。同时,slot 中始终保存的是当前最新的 indexNo,因此只要定位到 slot,就可以找到最新的 index 索引单元,再通过 preIndexNo 逐步回溯历史索引。

  • 此外,indexNo 是 indexFile 文件内部的递增流水号,从 0 开始依次增长,在整个 indexFile 文件中保持单调递增。需要注意的是,indexNo 本身并不直接存储在 index 索引单元中,而是通过遍历 indexes 区域,按顺序隐式计算得到的。


index 索引单元的默认大小是 20 个字节,其中存放着以下四个属性:

  • keyHash:消息中业务 key 的 hash 值,用于定位 slot
  • phyOffset:当前 key 对应消息在 commitlog 文件中的物理偏移量(commitlog offset)
  • timeDiff:当前 key 对应消息的存储时间,与 indexFile 文件创建时间之间的时间差
  • preIndexNo:当前 slot 下该 index 索引单元的前一个索引单元的 indexNo(用于链表回溯)
indexFile 的创建

indexFile 的文件名为其创建时的时间戳,该时间戳主要用于辅助查询。在根据业务 key 进行查询时,除了 key 之外,还需要指定一个时间条件(时间戳),表示查询 “不大于该时间点的最新消息”。通过 indexFile 文件的时间戳命名,可以快速定位到目标时间范围内的文件,从而减少扫描范围,提升查询效率。

  • indexFile 文件的创建时机

    • indexFile 文件的创建主要有两个触发条件:
    • (1) 当第一条带 key 的消息到达时,如果当前不存在 indexFile 文件,则会创建第一个 indexFile 文件
    • (2) 当当前 indexFile 文件中的索引单元数量达到最大上限(2000 万)时,会创建新的 indexFile 文件
      • 当新的带 key 消息到来时,系统会定位到最新的 indexFile 文件
      • 从其 indexHeader 中读取 indexCount(位于 indexHeader 的最后 4 字节)
      • 如果 indexCount 大于等于 2000 万,则会创建新的 indexFile 文件
  • indexFile 文件的大小计算

    • 一个 indexFile 文件的结构包括:
      • indexHeader:40 字节
      • slot 区域:500 万个 slot,每个 slot 占 4 字节
      • index 区域:2000 万个 index 索引单元,每个占 20 字节
    • 因此,一个 indexFile 文件的最大大小为:
      • maxSize = 40 + 500w * 4 + 2000w * 20,单位字节,约等于 400MB 左右
消息的查询流程

当消费者通过业务 key 来查询相应的消息时,Broker 需要经过一个相对较复杂的查询流程。不过,在分析查询流程之前,首先要清楚几个定位计算公式:

  • 计算指定消息 key 的 slot 槽位序号: slot 槽位序号 = key 的 hash 值 % 500w
  • 计算槽位序号为 n 的 slot 在 indexFile 文件中的起始位置: slot(n) = 40 + (n - 1) * 4
  • 计算 indexNo 为 m 的 index 在 indexFile 中的位置:index(m) = 40 + 500w * 4 + (m - 1) * 20

提示

  • 一个 indexFile 文件最多可以有 500w 个 slot,每个 slot 占 4 个字节
  • 一个 indexFile 文件最多可以有 2000w 个索引单元,每个索引单元占 20 个字节
  • 在 indexFile 文件中,indexHeader 占 4 个字节
  • 所有 slot(哈希槽)所占的字节数是 2000w = 500w * 4

根据 indexFile 文件 + 业务 key 查询消息的具体流程如下: