SpringBoot 整合 Kafka

大纲

SpringBoot 整合 Kafka

本节将介绍 SpringBoot 项目如何整合 Kafka,包括发送消息到 Kafka 和从 Kafka 消费消息。值得一提的是,本节给出的配置和代码同样适用于 SpringCloud 项目整合 Kafka,具体案例请看 这里

代码下载

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-19

版本说明

组件版本说明
Kafka Server3.9.0Kafka 服务器
SpringBoot2.7.18本节的案例代码兼容 SpringBoot 3.2.0 版本

引入依赖

  • SpringBoot 整合 Kafka 时,最关键的是引入 spring-kafka 依赖,而 spring-kafka 又会将 kafka-clients 依赖引入进来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.18</version>
<relativePath/>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
server:
port: 9999

spring:
kafka:
# Kafka 集群的地址
bootstrap-servers: 127.0.0.1:9092
producer:
# 设置生产者需要等待多少个分区副本收到消息的确认,可选值: 0 | 1 | all,其中 all 表示所有分区副本都需要确认,确保消息不丢失
acks: all
# 单次发送消息的批量大小(以字节为单位,默认16K),当多个消息累积到指定大小时,生产者会批量发送以提升效率
batch-size: 16384
# 生产者内存缓冲区的大小(以字节为单位,默认32M),用于存储等待发送的消息
buffer-memory: 33554432
# Key 的序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Value 的序列化器
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 设置生产者在遇到发送失败时的重试次数0 表示不进行重试
retries: 3
# 压缩类型,支持的压缩类型:none、gzip、snappy、lz4、zstd
compression-type: gzip
# 生产者的其他核心配置
properties:
# 如果数据量迟迟未达到 batch.size 大小,Sender 线程等待 linger.ms 之后就会发送数据。单位是 ms,默认值为 0ms,表示没有延迟。生产环境建议该值大小为 5 ~ 100ms 之间
linger.ms: 100
consumer:
# 消费者组 ID
group-id: test
# 从哪个偏移量 offset 开始消费,可选值:earliest | latest
auto-offset-reset: earliest
# 是否自动提交偏移量 offset
enable-auto-commit: false
# 自动提交的频率,生效的前提是 enable-auto-commit=true
auto-commit-interval: 1s
# Key 的反序列化器
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Value 的反序列化器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消费者每次调用 poll() 方法时,一次最多能拉取的消息数量,默认值为 500
max-poll-records: 100
# 消费者的其他核心配置
properties:
# 如果在这个时间内(默认45秒),协调器没有收到心跳,该消费者会被踢出消费者组并触发分区再平衡
session.timeout.ms: 120000
# 最大消费时间,该参数决定了获取消息后提交偏移量的最长时间,超过设定的时间(默认5分钟),服务端会认为该消费者失效,然后将其踢出消费者组并触发分区再平衡
max.poll.interval.ms: 300000
# 客户端等待请求响应的最长时间如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试次数用尽,则请求失败
request.timeout.ms: 60000
# 订阅或分配主题时,是否允许自动创建主题(生产环境建议设置为 false)
allow.auto.create.topics: false
# poll() 方法向协调器发送心跳的频率(默认每隔3秒发送一次),建议设置为 session.timeout.ms 的三分之一
heartbeat.interval.ms: 40000
# 指定每个分区里返回的记录最多不超的字节数
# max.partition.fetch.bytes=1048576
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
ack-mode: manual_immediate
# 是否严格检查 topic 的存在性
# true: 如果配置的 topic 不存在,则启动失败
# false: 忽略不存在的 topic,继续启动
missing-topics-fatal: true
# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: batch
# 并发线程数,决定了创建多少个消费者实例(等于消费线程数)
# 建议设置为小于或等于 Topic 的分区数
# 每个线程可消费一个分区,线程数多于分区时,多余的线程将处于空闲状态
concurrency: 3
template:
default-topic: "test"
  • 影响生产吞吐量(速度)的核心参数

    • spring.kafka.producer.batch-size:单次发送消息的批量大小(以字节为单位,默认 16K),当多个消息累积到指定大小时,生产者会批量发送以提升效率。
    • spring.kafka.producer.buffer-memory:生产者内存缓冲区的大小(以字节为单位,默认 32M),用于存储等待发送的消息。
    • spring.kafka.producer.compression-type:生产者发送的所有数据的压缩方式。默认值为 none,也就是不压缩。支持压缩类型:nonegzipsnappylz4zstd
    • spring.kafka.producer.properties.linger.ms:如果数据量迟迟未达到 batch.size 大小,Sender 线程等待 linger.ms 之后就会发送数据。单位是 ms,默认值为 0ms,表示没有延迟。生产环境建议该值大小为 5 ~ 100ms 之间。
  • 影响消费吞吐量(速度)的核心参数

    • spring.kafka.consumer.max-poll-records:消费者每次调用 poll() 方法时,一次最多能拉取的消息数量,默认值为 500
    • spring.kafka.listener.concurrency:并发线程数,决定了创建多少个消费者实例(等于消费线程数),建议设置为小于或等于 Topic 的分区数
    • spring.kafka.listener.type:消费者的消费模式,包括 single 单条消费和 batch 批量消费;当设置为批量消费时,需要配合 consumer.max-poll-records 参数设置一次最多能拉取的消息数量。

工具类的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
public class KafkaConstants {

/**
* 主题
*/
public static final String TOPIC_TEST = "test";

/*
* 消费者组 ID
*/
public static final String GROUP_ID = "test";

}

生产者的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.clay.kafka.config.KafkaConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafka")
public class KafkaProducerController {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 发送消息
*/
@GetMapping("/produce")
public String produce(String msg) {
try {
kafkaTemplate.send(KafkaConstants.TOPIC_TEST, msg);
} catch (Exception e) {
e.printStackTrace();
}
return "success";
}

}

消费者的代码

特别注意

  • (1) 当启用自动提交 Offest,消费者拉取消息后,需要处理业务,这期间可能会发生消息丢失。因此,建议关闭自动提交 Offset,使用手动提交 Offset 来避免消息丢失的问题。
  • (2) 为了提高消费者的吞掉量(消费速率),一般情况下,建议使用批量消费,而不是使用单条消费。

自动提交 + 单条消费

  • 核心配置(自动提交 Offset + 单条消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
kafka:
consumer:
# 是否自动提交偏移量 offset
enable-auto-commit: true
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
# ack-mode: manual_immediate

# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: single
  • 核心代码(自动提交 Offset + 单条消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import com.clay.kafka.config.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaMsgConsumer {

/**
* 消费消息(自动提交 Offset + 单条消费)
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID)
public void receive(ConsumerRecord<String, String> record) {
try {
// 处理消息
System.out.println("Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value());
} catch (Exception e) {
// 消息处理失败
System.err.println("Error processing message: " + e.getMessage());
}
}

}

自动提交 + 批量消费

  • 核心配置(自动提交 Offset + 批量消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
kafka:
consumer:
# 是否自动提交偏移量 offset
enable-auto-commit: true
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
# ack-mode: manual_immediate

# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: batch
  • 核心代码(自动提交 Offset + 批量消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.clay.kafka.config.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;

@Component
public class KafkaMsgConsumer {

/**
* 消费消息(自动提交 Offset + 批量消费)
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID)
public void receive(List<ConsumerRecord<String, String>> records) {
try {
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.println("Receive Msg ==> Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value());
}
} catch (Exception e) {
// 消息处理失败
System.err.println("Error processing message: " + e.getMessage());
}
}

}

手动提交 + 单条消费

  • 核心配置(手动提交 Offset + 单条消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
kafka:
consumer:
# 是否自动提交偏移量 offset
enable-auto-commit: false
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
ack-mode: manual_immediate
# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: single
  • 核心代码(手动提交 Offset + 单条消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.clay.kafka.config.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class KafkaMsgConsumer {

/**
* 消费消息(手动提交 Offset + 单条消费)
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID)
public void receive(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
try {
// 处理消息
System.out.println("Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value());
// 手动提交偏移量
acknowledgment.acknowledge();
} catch (Exception e) {
// 消息处理失败,选择不提交偏移量,保证消息再次消费
System.err.println("Error processing message: " + e.getMessage());
}
}

}

手动提交 + 批量消费

  • 核心配置(手动提交 Offset + 批量消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
kafka:
consumer:
# 是否自动提交偏移量 offset
enable-auto-commit: false
listener:
# 消费者的消息确认模式
# 当 enable.auto.commit 设置为 false 时,此配置才会生效
# 当设置为 manual_immediate,则表示手动确认消息,即调用 Acknowledgment.acknowledge() 后立即提交偏移量
ack-mode: manual_immediate
# 消费者的消费模式
# single: 单条消费,每次拉取一条消息
# batch: 批量消费,每次拉取多条消息,需要配合 consumer.max-poll-records 参数设置拉取的消息数量
type: batch
  • 核心代码(手动提交 Offset + 批量消费)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.clay.kafka.config.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class KafkaMsgConsumer {

/**
* 消费消息(手动提交 Offset + 批量消费)
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID)
public void receive(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
try {
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.println("Receive Msg ==> Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value());
}
// 手动提交偏移量
acknowledgment.acknowledge();
} catch (Exception e) {
// 消息处理失败,选择不提交偏移量,保证消息再次消费
System.err.println("Error processing message: " + e.getMessage());
}
}

}

参考资料