Clay 的技术空间

用进废退 | 艺不压身

大纲

前言

本文将介绍在三台服务器上,手动部署 Kafka-Eagle(EFAK)的集群服务,实现对 Kafka 集群的管理和监控,适用于 CentOS/Debian/Ubuntu 等发行版。

官方资源

阅读全文 »

编译与链接原理

提示

强烈建议在阅读完本节的内容后,多花点时间深入读一遍《深入理解计算机系统 - 第三版》的第 7 章 "链接"。

编译命令

步骤命令
1. 预处理 gcc -E hello.c -o hello.i
2. 编译到汇编代码 gcc -S hello.i -o hello.s
3. 汇编到目标代码(机器语言)gcc -c hello.s -o hello.o
4. 链接,生成可执行文件 gcc hello.o -o hello
以上四个步骤,可以合成一个步骤,直接编译链接成可执行的目标文件 gcc hello.c -o hello
阅读全文 »

前言

本教程将使用 Docker 安装单机版的 Seata Server、Nacos、MySQL,并实现以下配置目标:

  • (1) 配置 Seata Server 使用 Nacos 作为注册中心
  • (2) 配置 Seata Server 使用 Nacos 作为配置中心
  • (3) 配置 Nacos 将配置信息存储(持久化)到 MySQL 中
  • (4) 基于 Nacos 配置中心,配置 Seata Server 使用 MySQL 数据库来存储全局事务会话信息
阅读全文 »

大纲

前言

本文将介绍在 SpringBoot 项目中,如何基于 Redis + Lua 脚本 + AOP + 反射 + 自定义注解自研分布式限流组件,且支持拔插式使用。由于篇幅有限,下面使用的是 Redis 单机服务,若是在生产环境,为了保证系统的可用性,建议部署 Redis 集群,并使用 Redisson 作为 Redis 的客户端,这里不再累述。

阅读全文 »

大纲

MySQL 锁的介绍

锁的类型

在 MySQL 中,锁机制可以用来解决事务并发问题。MySQL 的锁有以下几种类型:

  • 按锁的粒度可以分为:
    • 行锁:锁住某行数据,锁粒度最小,并发度高。开销大,加锁慢,会出现死锁。
    • 表锁:锁住整张表,锁粒度较大,并发度低。开销小,加锁快,不会出现死锁。
    • 间隙锁:锁住的是一个区间。
    • 全局锁:锁住整个数据库,锁粒度最大。
阅读全文 »

大纲

最左前缀原则的介绍

最左前缀原则指的是:使用组合索引(联合索引)时,查询条件需要从索引的最左列开始匹配,并且不跳过索引中的列。如果跳跃某一列,索引将会部分失效 (这一列后面的字段的索引会失效)。比如,针对 A、B、C 三个字段建立了一个组合索引,那么在写一个 SQL 时就一定要提供 A 字段的查询条件,这样才能让组合索引生效。这是由于在建立 A、B、C 三个字段的组合索引时,MySQL 底层的 B+ 树是按照 A、B、C 三个字段从左往右去比较大小进行排序的,也就是说 B+ 树的索引结构依赖于从左到右逐层递进地有序搜索路径。如果查询条件中缺少了最左边的列,B+ 树将无法确定初始的搜索路径,从而无法利用索引进行高效查询。

阅读全文 »

大纲

索引的介绍

索引概述

MySQL 官方对索引的定义为:索引(Index)是帮助 MySQL 高效获取数据的数据结构。在数据之外,数据库系统还维护着满足特定查找算法的数据结构,这些数据结构以某种方式引用(指向)数据,这样就可以在这些数据结构上实现高级查找算法,这种数据结构就是索引。简而言之,索引的本质是数据结构

阅读全文 »

查看整机系统性能

top

使用 top 命令,可以查看系统的负载情况,重点关注的是 %CPU%MEMload average 三个性能指标。

1
top
阅读全文 »

前言

官方文档

SpringBoot 整合 Kafka

本节将介绍 SpringBoot 项目如何整合 Kafka,包括发送消息到 Kafka 和从 Kafka 消费消息。值得一提的是,本节给出的配置和代码同样适用于 SpringCloud 项目整合 Kafka。

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-19

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.18</version>
<relativePath/>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
server:
port: 9999

spring:
kafka:
# Kafka 集群的地址
bootstrap-servers: localhost:9092
producer:
# 设置生产者需要等待多少个分区副本收到消息的确认,可选值: 0 | 1 | all,其中 all 表示所有分区副本都需要确认,确保消息不丢失
acks: all
# 单次发送消息的批量大小(以字节为单位,默认16K),当多个消息累积到指定大小时,生产者会批量发送以提升效率
batch-size: 16384
# 生产者内存缓冲区的大小(以字节为单位,默认32M),用于存储等待发送的消息
buffer-memory: 33554432
# Key 的序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Value 的序列化器
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 设置生产者在遇到发送失败时的重试次数0 表示不进行重试
retries: 3
consumer:
# 消费者组 ID
group-id: test
# 从哪个偏移量 offset 开始消费,可选值:earliest | latest
auto-offset-reset: earliest
# 是否自动提交偏移量 offset
enable-auto-commit: false
# 自动提交的频率,生效的前提是 enable-auto-commit=true
auto-commit-interval: 1s
# Key 的反序列化器
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Value 的反序列化器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消费者每次调用 poll() 方法时,一次最多能拉取的消息数量,默认值为 500
max-poll-records: 100
# 消费者的其他核心配置
properties:
# 如果在这个时间内(默认45秒),协调器没有收到心跳,该消费者会被踢出消费者组并触发分区再平衡
session.timeout.ms: 120000
# 最大消费时间,该参数决定了获取消息后提交偏移量的最长时间,超过设定的时间(默认5分钟),服务端会认为该消费者失效,然后将其踢出消费者组并触发分区再平衡
max.poll.interval.ms: 300000
# 客户端等待请求响应的最长时间如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试次数用尽,则请求失败
request.timeout.ms: 60000
# 订阅或分配主题时,是否允许自动创建主题(生产环境建议设置为 false)
allow.auto.create.topics: false
# poll() 方法向协调器发送心跳的频率(默认每隔3秒发送一次),建议设置为 session.timeout.ms 的三分之一
heartbeat.interval.ms: 40000
# 指定每个分区里返回的记录最多不超的字节数
# max.partition.fetch.bytes=1048576
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
ack-mode: manual_immediate
# 是否严格检查 topic 的存在性
# true: 如果配置的 topic 不存在,则启动失败
# false: 忽略不存在的 topic,继续启动
missing-topics-fatal: true
# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: batch
# 并发线程数,决定了创建多少个消费者实例(等于消费线程数)
# 建议设置为小于或等于 Topic 的分区数
# 每个线程可消费一个分区,线程数多于分区时,多余的线程将处于空闲状态
concurrency: 3
template:
default-topic: "test"
  • 影响生产速度的核心参数

    • spring.kafka.producer.batch-size:单次发送消息的批量大小(以字节为单位,默认 16K),当多个消息累积到指定大小时,生产者会批量发送以提升效率。
    • spring.kafka.producer.buffer-memory:生产者内存缓冲区的大小(以字节为单位,默认 32M),用于存储等待发送的消息。
  • 影响消费速度的核心参数

    • spring.kafka.consumer.max-poll-records:消费者每次调用 poll() 方法时,一次最多能拉取的消息数量,默认值为 500
    • spring.kafka.listener.concurrency:并发线程数,决定了创建多少个消费者实例(等于消费线程数),建议设置为小于或等于 Topic 的分区数
    • spring.kafka.listener.type:消费者的消费模式,包括 single 单条消费和 batch 批量消费;当设置为批量消费时,需要配合 consumer.max-poll-records 参数设置一次最多能拉取的消息数量。

配置代码

1
2
3
4
5
6
7
8
9
10
11
12
13
public class KafkaConstants {

/**
* 主题
*/
public static final String TOPIC_TEST = "test";

/*
* 消费者组 ID
*/
public static final String GROUP_ID = "test";

}

生产者的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.clay.kafka.config.KafkaConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafka")
public class KafkaProducerController {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 发送消息
*/
@GetMapping("/produce")
public String produce(String msg) {
try {
kafkaTemplate.send(KafkaConstants.TOPIC_TEST, msg);
} catch (Exception e) {
e.printStackTrace();
}
return "success";
}

}

消费者的代码

批量消费的代码

  • 批量消费的核心配置(手动提交 Offset + 批量消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
kafka:
consumer:
# 是否自动提交偏移量 offset
enable-auto-commit: false
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
ack-mode: manual_immediate
# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: batch
  • 批量消费的核心代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.clay.kafka.config.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class KafkaMsgConsumer {

/**
* 消费消息(批量消费,手动提交 Offset)
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID)
public void receive(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
try {
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.println("Receive Msg ==> Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value());
}
// 手动提交偏移量
acknowledgment.acknowledge();
} catch (Exception e) {
// 消息处理失败,选择不提交偏移量,保证消息再次消费
System.err.println("Error processing message: " + e.getMessage());
}
}

}

单条消费的代码

  • 单条消费的核心配置(手动提交 Offset + 单条消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
kafka:
consumer:
# 是否自动提交偏移量 offset
enable-auto-commit: false
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
ack-mode: manual_immediate
# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: single
  • 单条消费的核心代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.clay.kafka.config.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class KafkaMsgConsumer {

/**
* 消费消息(单条消费,手动提交 Offset)
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID)
public void receive(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
try {
// 处理消息
System.out.println("Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value());
// 手动提交偏移量
acknowledgment.acknowledge();
} catch (Exception e) {
// 消息处理失败,选择不提交偏移量,保证消息再次消费
System.err.println("Error processing message: " + e.getMessage());
}
}

}

SpringCloud 整合 Kafka

SpringBoot 整合方式

这种整合方式其实就是上面介绍的 SpringBoot 整合 Kafka,依赖于 SpringBoot 和 Spring Kafka 提供的功能。SpringCloud 本身并没有直接对 Kafka 提供的特殊支持,而是通过 Spring Kafka 提供对 Kafka 的集成。这种方式只需要引入 SpringBoot 与 spring-kafka 的 Maven 坐标,由于上面的案例已经给出了详细的代码和配置,这里不再累述。

SpringCloud Stream 整合方式

SpringCloud Stream 的概述

SpringCloud Stream 是一个构建消息驱动微服务的框架,抽象了 MQ 的使用方式,提供统一的 API 操作。SpringCloud Stream 通过 Binder、Inputs/Outputs Channel 完成应用程序和 MQ 的解耦。SpringCloud Stream 的模型如下图:

  • Binder:负责绑定应用程序和 MQ 中间件,即指定应用程序是和 KafKa 交互还是和 RabbitMQ 交互,又或者和其他的 MQ 中间件交互。
  • Inputs/Outputs Channel:抽象发布 / 订阅消息的方式,即无论是什么类型的 MQ 应用程序都通过统一的方式发布 / 订阅消息。

SpringCloud Stream 的配置

  • binder:绑定 MQ 中间件及配置
  • bindings:管理所有的 Topic
  • destination:指定发布 / 订阅的 Topic
  • contentType:指定发布 / 订阅消息的格式
  • group:指定消费者组(一条消息只能被一组消息者中的一个消息者消费)

SpringCloud Stream 的使用

在 SpringCloud Stream 3.X 之后,官方不建议使用 @Binding(Source.class)@StreamListener(Sink.class),改成推荐使用函数式编程的方式实现。高版本的 SpringCloud Stream 提供两种使用方式,一种是使用 YML 配置的方式绑定生产 / 消费者,另一种是通过 Function 的方式绑定生产 / 消费者。

第一种使用方式
  • 引入依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2023.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>

提示

根据 SpringCloud 官方文档,引入 spring-cloud-starter-stream-kafka 或者引入 spring-cloud-stream-binder-kafka 都是可以的。

  • 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
server:
port: 9999

spring:
application:
name: kafka-test
cloud:
stream:
kafka:
binder:
# Kafka 集群的地址
brokers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
# 是否允许自动创建主题
auto-create-topics: false
bindings:
# 对应 MessageConsumer 中的 receiveMsg 函数,格式为:xxx-in/out-xx
receiveMsg-in-0:
# 指定 Topic
destination: test
# 指定消息的格式
content-type: application/json

配置参数说明

  • spring.cloud.stream.kafka.binder 里面有很多配置,比如:默认自动提交偏移量、默认的分区和副本数量等配置,对于当前项目来说,默认值即可满足。
  • spring.cloud.stream.kafka.bindings 主要是为了配置消费者以及生产者信息的,其中 xxx-in/out-0 这个配置主要是为了声明当前是生产者还是消费者的,而 destination 用于定义 Topic 的名称。
  • 实体类
1
2
3
4
5
6
7
8
9
10
11
12
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Person implements Serializable {

private Long id;

private String name;

private int age;

}
  • Kafka 生产者类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 消息生产者
*/
@Component
public class MessageProducer {

@Autowired
private StreamBridge streamBridge;

/**
* 发送消息
*
* @param topic 主题
* @param data 消息
*/
public void sendMessage(String topic, Object data) {
streamBridge.send(topic, data);
}

}
  • Kafka 消费者类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 消息消费者
*/
@Component
public class MessageConsumer {

/**
* 消费消息
* <p> 对应 YML 配置文件中的 receiveMsg-in-0
*/
@Bean
public Consumer<Person> receiveMsg() {
return person -> {
if (person != null) {
System.out.println("name: " + person.getName() + ", age: " + person.getAge());
}
};
}

}
  • 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@RestController
@RequestMapping("/kafka")
public class KafkaProducerController {

private static final String TOPIC = "test";

@Autowired
private MessageProducer messageProducer;

/**
* 发送消息
*/
@PostMapping("/produce")
public String produce(@RequestBody Person person) {
try {
messageProducer.sendMessage(TOPIC, person);
} catch (Exception e) {
e.printStackTrace();
}
return "success";
}

}
第二种使用方式
  • 引入依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2023.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>

提示

根据 SpringCloud 官方文档,引入 spring-cloud-starter-stream-kafka 或者引入 spring-cloud-stream-binder-kafka 都是可以的。

  • 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
server:
port: 9999

spring:
application:
name: kafka-test
cloud:
stream:
kafka:
binder:
# Kafka 集群的地址
brokers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
# 是否允许自动创建主题
auto-create-topics: false
bindings:
receiveMsg-in-0:
# 指定 Binder
binder: kafka
# Topic 的名称
destination: test
# 消息的格式
content-type: application/json
# 消费组的名称
group: test
sendMsg-out-0:
# 指定 Binder
binder: kafka
# Topic 的名称
destination: test
# 消息的格式
content-type: application/json
# 消费组的名称
group: test
function:
# 动态指定要绑定的函数
definition: receiveMsg;sendMsg

YML 配置文件是否需要指定 binder 属性

Spring Cloud Stream 默认支持 Kafka 和 RabbitMQ 两种 Binder,如果在项目中只引入了 Kafka 的依赖,比如 spring-cloud-starter-stream-kafka,那么 Spring Cloud Stream 会自动将 Kafka 作为默认的 Binder,即不需要在 spring.cloud.stream.bindings 下显式配置 binder: kafka。如果在项目中同时引入了多个 Binder(例如 Kafka 和 RabbitMQ),则必须明确指定每个绑定对应的 Binder,即需要显式配置 binder 属性,比如 binder: kafka

YML 配置文件是否需要指定 function.definition 属性

  • 如果在项目中只有一个函数(如 Supplier 或 Consumer),Spring Cloud Stream 会自动发现它,并将其绑定到配置的通道,在这种情况下,不需要配置 function.definition 属性。
  • 如果在项目中有多个函数(Supplier、Consumer 或 Function),Spring Cloud Stream 无法确定要绑定哪一个函数,在这种情况下,必须通过 function.definition 属性指定要绑定的函数。
  • 实体类
1
2
3
4
5
6
7
8
9
10
11
12
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Person implements Serializable {

private Long id;

private String name;

private int age;

}
  • Kafka 生产者类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Component
public class MessageProducer {

private final BlockingQueue<Person> messageQueue = new LinkedBlockingQueue<>(1000);

/**
* 提供外部方法,将 Person 对象加入队列
*/
public void sendPersonMessage(Person person) {
try {
// 将消息放入到队列(阻塞操作)
messageQueue.put(person);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Failed to enqueue message", e);
}
}

/**
* 发送消息
* <p> 对应 YML 配置文件中的 sendMsg-out-0
*/
@Bean
public Supplier<Message<Person>> sendMsg() {
return () -> {
// 从队列中取出消息(非阻塞操作)
Person person = messageQueue.poll();
if (person != null) {
// 构建消息
return MessageBuilder.withPayload(person).build();
}
return null;
};
}

}
  • Kafka 消费者类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Component
public class MessageProducer {

private final BlockingQueue<Person> messageQueue = new LinkedBlockingQueue<>(1000);

/**
* 提供外部方法,将 Person 对象加入队列
*/
public void sendPersonMessage(Person person) {
try {
// 将消息放入到队列(阻塞操作)
messageQueue.put(person);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Failed to enqueue message", e);
}
}

/**
* 发送消息
* <p> 对应 YML 配置文件中的 sendMsg-out-0
*/
@Bean
public Supplier<Message<Person>> sendMsg() {
return () -> {
// 从队列中取出消息(非阻塞操作)
Person person = messageQueue.poll();
if (person != null) {
// 构建消息
return MessageBuilder.withPayload(person).build();
}
return null;
};
}

}
  • 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@RestController
@RequestMapping("/kafka")
public class KafkaProducerController {

@Autowired
private StreamBridge streamBridge;

/**
* 发送消息
*/
@PostMapping("/produce")
public String produce(@RequestBody Person person) {
try {
streamBridge.send("sendMsg-out-0", person);
} catch (Exception e) {
e.printStackTrace();
}
return "success";
}

}

参考资料