大纲
SpringBoot 整合 Kafka
本节将介绍 SpringBoot 项目如何整合 Kafka,包括发送消息到 Kafka 和从 Kafka 消费消息。值得一提的是,本节给出的配置和代码同样适用于 SpringCloud 项目整合 Kafka,具体案例请看 这里。
代码下载
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-19
。
版本说明
组件 | 版本 | 说明 |
---|
Kafka Server | 3.9.0 | Kafka 服务器 |
SpringBoot | 2.7.18 | 本节的案例代码兼容 SpringBoot 3.2.0 版本 |
引入依赖
- SpringBoot 整合 Kafka 时,最关键的是引入
spring-kafka
依赖,而 spring-kafka
又会将 kafka-clients
依赖引入进来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.18</version> <relativePath/> </parent>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies>
|
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| server: port: 9999
spring: kafka: bootstrap-servers: 127.0.0.1:9092 producer: acks: all batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 compression-type: gzip properties: linger.ms: 100 consumer: group-id: test auto-offset-reset: earliest enable-auto-commit: false auto-commit-interval: 1s key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 100 properties: session.timeout.ms: 120000 max.poll.interval.ms: 300000 request.timeout.ms: 60000 allow.auto.create.topics: false heartbeat.interval.ms: 40000 listener: ack-mode: manual_immediate missing-topics-fatal: true type: batch concurrency: 3 template: default-topic: "test"
|
影响生产吞吐量(速度)的核心参数
spring.kafka.producer.batch-size
:单次发送消息的批量大小(以字节为单位,默认 16K),当多个消息累积到指定大小时,生产者会批量发送以提升效率。spring.kafka.producer.buffer-memory
:生产者内存缓冲区的大小(以字节为单位,默认 32M),用于存储等待发送的消息。spring.kafka.producer.compression-type
:生产者发送的所有数据的压缩方式。默认值为 none
,也就是不压缩。支持压缩类型:none
、gzip
、snappy
、lz4
和 zstd
。spring.kafka.producer.properties.linger.ms
:如果数据量迟迟未达到 batch.size
大小,Sender 线程等待 linger.ms
之后就会发送数据。单位是 ms
,默认值为 0ms
,表示没有延迟。生产环境建议该值大小为 5 ~ 100ms 之间。
影响消费吞吐量(速度)的核心参数
spring.kafka.consumer.max-poll-records
:消费者每次调用 poll()
方法时,一次最多能拉取的消息数量,默认值为 500spring.kafka.listener.concurrency
:并发线程数,决定了创建多少个消费者实例(等于消费线程数),建议设置为小于或等于 Topic 的分区数spring.kafka.listener.type
:消费者的消费模式,包括 single
单条消费和 batch
批量消费;当设置为批量消费时,需要配合 consumer.max-poll-records
参数设置一次最多能拉取的消息数量。
工具类的代码
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class KafkaConstants {
public static final String TOPIC_TEST = "test";
public static final String GROUP_ID = "test";
}
|
生产者的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import com.clay.kafka.config.KafkaConstants; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@RestController @RequestMapping("/kafka") public class KafkaProducerController {
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/produce") public String produce(String msg) { try { kafkaTemplate.send(KafkaConstants.TOPIC_TEST, msg); } catch (Exception e) { e.printStackTrace(); } return "success"; }
}
|
消费者的代码
特别注意
- (1) 当启用自动提交 Offest,消费者拉取消息后,需要处理业务,这期间可能会发生消息丢失。因此,建议关闭自动提交 Offset,使用手动提交 Offset 来避免消息丢失的问题。
- (2) 为了提高消费者的吞掉量(消费速率),一般情况下,建议使用批量消费,而不是使用单条消费。
自动提交 + 单条消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| spring: kafka: consumer: enable-auto-commit: true listener:
type: single
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import com.clay.kafka.config.KafkaConstants; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;
@Component public class KafkaMsgConsumer {
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID) public void receive(ConsumerRecord<String, String> record) { try { System.out.println("Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value()); } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); } }
}
|
自动提交 + 批量消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| spring: kafka: consumer: enable-auto-commit: true listener:
type: batch
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import com.clay.kafka.config.KafkaConstants; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.List;
@Component public class KafkaMsgConsumer {
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID) public void receive(List<ConsumerRecord<String, String>> records) { try { for (ConsumerRecord<String, String> record : records) { System.out.println("Receive Msg ==> Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value()); } } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); } }
}
|
手动提交 + 单条消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spring: kafka: consumer: enable-auto-commit: false listener: ack-mode: manual_immediate type: single
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import com.clay.kafka.config.KafkaConstants; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component;
@Component public class KafkaMsgConsumer {
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID) public void receive(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) { try { System.out.println("Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value()); acknowledgment.acknowledge(); } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); } }
}
|
手动提交 + 批量消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spring: kafka: consumer: enable-auto-commit: false listener: ack-mode: manual_immediate type: batch
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import com.clay.kafka.config.KafkaConstants; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component;
import java.util.List;
@Component public class KafkaMsgConsumer {
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID) public void receive(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) { try { for (ConsumerRecord<String, String> record : records) { System.out.println("Receive Msg ==> Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value()); } acknowledgment.acknowledge(); } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); } }
}
|
参考资料