SpringBoot3 基础教程之八场景整合
大纲
- SpringBoot3 基础教程之一快速入门
- SpringBoot3 基础教程之二常规配置
- SpringBoot3 基础教程之三 Web 开发
- SpringBoot3 基础教程之四 Web 开发
- SpringBoot3 基础教程之五基础特性
- SpringBoot3 基础教程之六场景整合
- SpringBoot3 基础教程之七场景整合
- SpringBoot3 基础教程之八场景整合
- SpringBoot3 基础教程之九核心原理
消息队列整合
消息队列介绍
使用场景
异步- 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们

解耦- 允许独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束

削峰- 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源并随时待命,这无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃

缓冲- 有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况

Kafka 使用介绍
基本概念
总体架构

Producer:消息生产者,就是向 Kafka Broker 发消息的客户端。Consumer:消息消费者,就是向 Kafka Broker 取消息的客户端。Consumer Group (CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。Broker:一台 Kafka 服务器就是一个broker。一个 Kafka 集群由多个broker组成。一个broker可以容纳多个topic。Topic:主题,可以理解为一个队列,生产者和消费者面向的都是一个topic。Partition:分区,为了实现扩展性,一个非常大的topic可以分布到多个broker(即 Kafka 服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且让 Kafka 仍然能够继续工作,Kafka 为此提供了副本机制。一个topic的每个分区都有若干个副本,包括一个leader和若干个follower。Leader:每个分区多个副本的主,生产者发送数据的对象,以及消费者消费数据的对象都是leader。Follower:每个分区多个副本的从,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的leader。
工作原理
Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程 — main 线程和 Sender 线程。在 main 线程中,会创建一个双端队列 RecordAccumulator。值得一提的是,main 线程将消息发送给 RecordAccumulator 时,Sender 线程会不断从 RecordAccumulator 中拉取消息并发送到 Kafka Broker。

消息模式
点对点模式
点对点模式 就是一对一,消费者主动拉取数据,消息收到后消息会被清除。消息生产者将消息发送到 Queue 中,然后消息消费者从 Queue 中取出并消费消息。消息被消费以后,Queue 中不再存储它,所以消息消费者不可能消费到已经被消费的消息。Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

发布 / 订阅模式
发布/订阅模式 就是一对多,消息产生后主动推送给订阅者,消费者消费消息之后不会清除消息。消息生产者(发布)将消息发布到 topic 主题(如浏览、点赞、收藏、评论等)中,同时有多个消息消费者(订阅)消费该消息。这和点对点模式不同,每个消费者互相独立,发布到 topic 的消息会被所有订阅者消费。

Kafka 整合案例
本章节所需的案例代码,可以直接从 GitHub 下载对应章节 spring-boot3-15。更多关于 Spring 整合 Kafka 的使用说明,请阅读 官方文档。
准备工作
首先在 Kafka 中创建 Topic (主题),建议使用 Kafka-UI 这样的 GUI 工具进行操作,如下所示:

引入依赖
1 | <dependency> |
添加配置
1 | # 指定Kafka的服务器地址 |
消息发送
发送普通消息
1 |
|
发送对象消息
- Kafka 客户端默认是以字符串的形式发送消息,如果发送的是 POJO 对象,则需要指定 Value 的序列化器(如使用 JSON 序列化器)
1 | # 指定Kafka的服务器地址 |
- 发送对象消息
1 |
|
1 |
|
- 消息内容最终会以 JSON 字符串的形式存储在 Kafka 中

消息订阅
订阅最新消息
1 |
|
订阅所有消息
1 |
|
创建主题
自动创建主题
若希望 SpringBoot 应用在启动的时候,自动创建 Topic (主题),则可以参考以下代码。
- 创建配置类
1 |
|
- 在主启动类上添加
@EnableKafka注解,开启 Kafka 的注解驱动功能
1 |
|
自动配置原理
- kafka 的自动配置由
KafkaAutoConfiguration实现- 往容器中放注入了
KafkaTemplate,可以进行消息的接收和发送 - 往容器中放了
KafkaAdmin,可以进行 Kafka 的管理,比如创建 Topic 等 - Kafka 的配置内容都在
KafkaProperties中 @EnableKafka注解可以开启 Kafka 基于注解的模式
- 往容器中放注入了
