SpringBoot3 基础教程之八场景整合

大纲

消息队列整合

消息队列介绍

使用场景

  • 异步 - 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们

  • 解耦 - 允许独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束

  • 削峰 - 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源并随时待命,这无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃

  • 缓冲 - 有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况

Kafka 使用介绍

基本概念

总体架构

  • Producer:消息生产者,就是向 Kafka Broker 发消息的客户端。
  • Consumer:消息消费者,就是向 Kafka Broker 取消息的客户端。
  • Consumer Group (CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  • Broker:一台 Kafka 服务器就是一个 broker。一个 Kafka 集群由多个 broker 组成。一个 broker 可以容纳多个 topic
  • Topic:主题,可以理解为一个队列,生产者和消费者面向的都是一个 topic
  • Partition:分区,为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即 Kafka 服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
  • Replica:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且让 Kafka 仍然能够继续工作,Kafka 为此提供了副本机制。一个 topic 的每个分区都有若干个副本,包括一个 leader 和若干个 follower
  • Leader:每个分区多个副本的 ,生产者发送数据的对象,以及消费者消费数据的对象都是 leader
  • Follower:每个分区多个副本的 ,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 leader
工作原理

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程 — main 线程和 Sender 线程。在 main 线程中,会创建一个双端队列 RecordAccumulator。值得一提的是,main 线程将消息发送给 RecordAccumulator 时,Sender 线程会不断从 RecordAccumulator 中拉取消息并发送到 Kafka Broker。

消息模式

点对点模式

点对点模式 就是一对一,消费者主动拉取数据,消息收到后消息会被清除。消息生产者将消息发送到 Queue 中,然后消息消费者从 Queue 中取出并消费消息。消息被消费以后,Queue 中不再存储它,所以消息消费者不可能消费到已经被消费的消息。Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

发布 / 订阅模式

发布/订阅模式 就是一对多,消息产生后主动推送给订阅者,消费者消费消息之后不会清除消息。消息生产者(发布)将消息发布到 topic 主题(如浏览、点赞、收藏、评论等)中,同时有多个消息消费者(订阅)消费该消息。这和点对点模式不同,每个消费者互相独立,发布到 topic 的消息会被所有订阅者消费。

Kafka 整合案例

本章节所需的案例代码,可以直接从 GitHub 下载对应章节 spring-boot3-15。更多关于 Spring 整合 Kafka 的使用说明,请阅读 官方文档

准备工作

首先在 Kafka 中创建 Topic (主题),建议使用 Kafka-UI 这样的 GUI 工具进行操作,如下所示:

引入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

添加配置

1
2
# 指定Kafka的服务器地址
spring.kafka.bootstrap-servers=127.0.0.1:9092

消息发送

发送普通消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@SpringBootTest
public class KafkaSendStringMessageTest {

@Autowired
private KafkaTemplate kafkaTemplate;

/**
* 发送普通消息
*/
@Test
public void sendSimpleMessage() {
CompletableFuture[] futures = new CompletableFuture[5];
StopWatch stopWatch = new StopWatch();
stopWatch.start();

for (int i = 0; i < 5; i++) {
CompletableFuture future = kafkaTemplate.send("news", "hello", "world");
futures[i] = future;
}

CompletableFuture.allOf(futures).join();
stopWatch.stop();

System.out.println("take " + stopWatch.getTotalTimeMillis() + " millis to send message");
}

}
发送对象消息
  • Kafka 客户端默认是以字符串的形式发送消息,如果发送的是 POJO 对象,则需要指定 Value 的序列化器(如使用 JSON 序列化器)
1
2
3
4
5
6
7
8
# 指定Kafka的服务器地址
spring.kafka.bootstrap-servers=127.0.0.1:9092

# 指定发送消息时,Key使用的序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

# 指定发送消息时,Value使用的序列化器
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
  • 发送对象消息
1
2
3
4
5
6
7
8
9
10
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Person {

private Long id;
private String name;
private Integer age;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SpringBootTest
public class KafkaSendObjectMessageTest {

@Autowired
private KafkaTemplate kafkaTemplate;

/**
* 发送对象消息
*/
@Test
public void sendObjectMessage() {
Person person = new Person(1L, "Jim", 18);
CompletableFuture future = kafkaTemplate.send("news", "person", person);
future.join();
System.out.println("success to send message");
}

}
  • 消息内容最终会以 JSON 字符串的形式存储在 Kafka 中

消息订阅

订阅最新消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
@Component
public class SportTopicListener {

/**
* 订阅消息(最新的)
*
* @param record
*/
@KafkaListener(topics = "sport", groupId = "sport-group-1")
public void subscribeNewest(ConsumerRecord record) {
Object key = record.key();
Object value = record.value();
String topic = record.topic();
log.info("receive message from topic {}, key: {}, value: {}", topic, key, value);
}

}
订阅所有消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
@Component
public class SportTopicListener {

/**
* 订阅消息((所有消息,包括历史消息))
*
* @param record
*/
@KafkaListener(groupId = "sport-group-2", topicPartitions = {@TopicPartition(topic = "sport", partitionOffsets = {@PartitionOffset(partition = "0", initialOffset = "0")})})
public void subscribeAll(ConsumerRecord record) {
Object key = record.key();
Object value = record.value();
String topic = record.topic();
log.info("receive message from topic [{}], key: {}, value: {}", topic, key, value);
}

}

创建主题

自动创建主题

若希望 SpringBoot 应用在启动的时候,自动创建 Topic (主题),则可以参考以下代码。

  • 创建配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class KafkaConfiguration {

/**
* 创建 Topic(主题)
*
* @return
*/
@Bean
public NewTopic sportTopic() {
return TopicBuilder.name("sport")
.partitions(1)
.compact()
.build();
}

}
  • 在主启动类上添加 @EnableKafka 注解,开启 Kafka 的注解驱动功能
1
2
3
4
5
6
7
8
9
@EnableKafka
@SpringBootApplication
public class MainApplication {

public static void main(String[] args) {
SpringApplication.run(MainApplication.class, args);
}

}

自动配置原理

  • kafka 的自动配置由 KafkaAutoConfiguration 实现
    • 往容器中放注入了 KafkaTemplate,可以进行消息的接收和发送
    • 往容器中放了 KafkaAdmin,可以进行 Kafka 的管理,比如创建 Topic 等
    • Kafka 的配置内容都在 KafkaProperties
    • @EnableKafka 注解可以开启 Kafka 基于注解的模式