前言
官方文档
SpringBoot 整合 Kafka
本节将介绍 SpringBoot 项目如何整合 Kafka,包括发送消息到 Kafka 和从 Kafka 消费消息。值得一提的是,本节给出的配置和代码同样适用于 SpringCloud 项目整合 Kafka。
提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-19
。
引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.18</version> <relativePath/> </parent>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies>
|
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| server: port: 9999
spring: kafka: bootstrap-servers: localhost:9092 producer: acks: all batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 3 consumer: group-id: test auto-offset-reset: earliest enable-auto-commit: false auto-commit-interval: 1s key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 100 properties: session.timeout.ms: 120000 max.poll.interval.ms: 300000 request.timeout.ms: 60000 allow.auto.create.topics: false heartbeat.interval.ms: 40000 listener: ack-mode: manual_immediate missing-topics-fatal: true type: batch concurrency: 3 template: default-topic: "test"
|
影响生产速度的核心参数
spring.kafka.producer.batch-size
:单次发送消息的批量大小(以字节为单位,默认 16K),当多个消息累积到指定大小时,生产者会批量发送以提升效率。spring.kafka.producer.buffer-memory
:生产者内存缓冲区的大小(以字节为单位,默认 32M),用于存储等待发送的消息。
影响消费速度的核心参数
spring.kafka.consumer.max-poll-records
:消费者每次调用 poll()
方法时,一次最多能拉取的消息数量,默认值为 500spring.kafka.listener.concurrency
:并发线程数,决定了创建多少个消费者实例(等于消费线程数),建议设置为小于或等于 Topic 的分区数spring.kafka.listener.type
:消费者的消费模式,包括 single
单条消费和 batch
批量消费;当设置为批量消费时,需要配合 consumer.max-poll-records
参数设置一次最多能拉取的消息数量。
配置代码
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class KafkaConstants {
public static final String TOPIC_TEST = "test";
public static final String GROUP_ID = "test";
}
|
生产者的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import com.clay.kafka.config.KafkaConstants; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@RestController @RequestMapping("/kafka") public class KafkaProducerController {
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/produce") public String produce(String msg) { try { kafkaTemplate.send(KafkaConstants.TOPIC_TEST, msg); } catch (Exception e) { e.printStackTrace(); } return "success"; }
}
|
消费者的代码
批量消费的代码
- 批量消费的核心配置(手动提交 Offset + 批量消费)
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spring: kafka: consumer: enable-auto-commit: false listener: ack-mode: manual_immediate type: batch
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import com.clay.kafka.config.KafkaConstants; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component;
import java.util.List;
@Component public class KafkaMsgConsumer {
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID) public void receive(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) { try { for (ConsumerRecord<String, String> record : records) { System.out.println("Receive Msg ==> Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value()); } acknowledgment.acknowledge(); } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); } }
}
|
单条消费的代码
- 单条消费的核心配置(手动提交 Offset + 单条消费)
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spring: kafka: consumer: enable-auto-commit: false listener: ack-mode: manual_immediate type: single
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import com.clay.kafka.config.KafkaConstants; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component;
@Component public class KafkaMsgConsumer {
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, groupId = KafkaConstants.GROUP_ID) public void receive(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) { try { System.out.println("Group: " + KafkaConstants.GROUP_ID + ", Topic: " + record.topic() + ", Msg: " + record.value()); acknowledgment.acknowledge(); } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); } }
}
|
SpringCloud 整合 Kafka
SpringBoot 整合方式
这种整合方式其实就是上面介绍的 SpringBoot 整合 Kafka,依赖于 SpringBoot 和 Spring Kafka 提供的功能。SpringCloud 本身并没有直接对 Kafka 提供的特殊支持,而是通过 Spring Kafka 提供对 Kafka 的集成。这种方式只需要引入 SpringBoot 与 spring-kafka
的 Maven 坐标,由于上面的案例已经给出了详细的代码和配置,这里不再累述。
SpringCloud Stream 整合方式
SpringCloud Stream 的概述
SpringCloud Stream 是一个构建消息驱动微服务的框架,抽象了 MQ 的使用方式,提供统一的 API 操作。SpringCloud Stream 通过 Binder、Inputs/Outputs Channel 完成应用程序和 MQ 的解耦。SpringCloud Stream 的模型如下图:
- Binder:负责绑定应用程序和 MQ 中间件,即指定应用程序是和 KafKa 交互还是和 RabbitMQ 交互,又或者和其他的 MQ 中间件交互。
- Inputs/Outputs Channel:抽象发布 / 订阅消息的方式,即无论是什么类型的 MQ 应用程序都通过统一的方式发布 / 订阅消息。
SpringCloud Stream 的配置
binder
:绑定 MQ 中间件及配置bindings
:管理所有的 Topicdestination
:指定发布 / 订阅的 TopiccontentType
:指定发布 / 订阅消息的格式group
:指定消费者组(一条消息只能被一组消息者中的一个消息者消费)
SpringCloud Stream 的使用
在 SpringCloud Stream 3.X 之后,官方不建议使用 @Binding(Source.class)
、@StreamListener(Sink.class)
,改成推荐使用函数式编程的方式实现。高版本的 SpringCloud Stream 提供两种使用方式,一种是使用 YML 配置的方式绑定生产 / 消费者,另一种是通过 Function 的方式绑定生产 / 消费者。
第一种使用方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.0</version> <relativePath/> </parent>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2023.0.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> </dependencies>
|
提示
根据 SpringCloud 官方文档,引入 spring-cloud-starter-stream-kafka
或者引入 spring-cloud-stream-binder-kafka
都是可以的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| server: port: 9999
spring: application: name: kafka-test cloud: stream: kafka: binder: brokers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 auto-create-topics: false bindings: receiveMsg-in-0: destination: test content-type: application/json
|
配置参数说明
spring.cloud.stream.kafka.binder
里面有很多配置,比如:默认自动提交偏移量、默认的分区和副本数量等配置,对于当前项目来说,默认值即可满足。spring.cloud.stream.kafka.bindings
主要是为了配置消费者以及生产者信息的,其中 xxx-in/out-0
这个配置主要是为了声明当前是生产者还是消费者的,而 destination
用于定义 Topic 的名称。
1 2 3 4 5 6 7 8 9 10 11 12
| @Data @NoArgsConstructor @AllArgsConstructor public class Person implements Serializable {
private Long id;
private String name;
private int age;
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
@Component public class MessageProducer {
@Autowired private StreamBridge streamBridge;
public void sendMessage(String topic, Object data) { streamBridge.send(topic, data); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
@Component public class MessageConsumer {
@Bean public Consumer<Person> receiveMsg() { return person -> { if (person != null) { System.out.println("name: " + person.getName() + ", age: " + person.getAge()); } }; }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @RestController @RequestMapping("/kafka") public class KafkaProducerController {
private static final String TOPIC = "test";
@Autowired private MessageProducer messageProducer;
@PostMapping("/produce") public String produce(@RequestBody Person person) { try { messageProducer.sendMessage(TOPIC, person); } catch (Exception e) { e.printStackTrace(); } return "success"; }
}
|
第二种使用方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.0</version> <relativePath/> </parent>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2023.0.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> </dependencies>
|
提示
根据 SpringCloud 官方文档,引入 spring-cloud-starter-stream-kafka
或者引入 spring-cloud-stream-binder-kafka
都是可以的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| server: port: 9999
spring: application: name: kafka-test cloud: stream: kafka: binder: brokers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 auto-create-topics: false bindings: receiveMsg-in-0: binder: kafka destination: test content-type: application/json group: test sendMsg-out-0: binder: kafka destination: test content-type: application/json group: test function: definition: receiveMsg;sendMsg
|
YML 配置文件是否需要指定 binder 属性
Spring Cloud Stream 默认支持 Kafka 和 RabbitMQ 两种 Binder,如果在项目中只引入了 Kafka 的依赖,比如 spring-cloud-starter-stream-kafka
,那么 Spring Cloud Stream 会自动将 Kafka 作为默认的 Binder,即不需要在 spring.cloud.stream.bindings
下显式配置 binder: kafka
。如果在项目中同时引入了多个 Binder(例如 Kafka 和 RabbitMQ),则必须明确指定每个绑定对应的 Binder,即需要显式配置 binder
属性,比如 binder: kafka
。
YML 配置文件是否需要指定 function.definition 属性
- 如果在项目中只有一个函数(如 Supplier 或 Consumer),Spring Cloud Stream 会自动发现它,并将其绑定到配置的通道,在这种情况下,不需要配置
function.definition
属性。 - 如果在项目中有多个函数(Supplier、Consumer 或 Function),Spring Cloud Stream 无法确定要绑定哪一个函数,在这种情况下,必须通过
function.definition
属性指定要绑定的函数。
1 2 3 4 5 6 7 8 9 10 11 12
| @Data @NoArgsConstructor @AllArgsConstructor public class Person implements Serializable {
private Long id;
private String name;
private int age;
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Component public class MessageProducer {
private final BlockingQueue<Person> messageQueue = new LinkedBlockingQueue<>(1000);
public void sendPersonMessage(Person person) { try { messageQueue.put(person); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Failed to enqueue message", e); } }
@Bean public Supplier<Message<Person>> sendMsg() { return () -> { Person person = messageQueue.poll(); if (person != null) { return MessageBuilder.withPayload(person).build(); } return null; }; }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Component public class MessageProducer {
private final BlockingQueue<Person> messageQueue = new LinkedBlockingQueue<>(1000);
public void sendPersonMessage(Person person) { try { messageQueue.put(person); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Failed to enqueue message", e); } }
@Bean public Supplier<Message<Person>> sendMsg() { return () -> { Person person = messageQueue.poll(); if (person != null) { return MessageBuilder.withPayload(person).build(); } return null; }; }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @RestController @RequestMapping("/kafka") public class KafkaProducerController {
@Autowired private StreamBridge streamBridge;
@PostMapping("/produce") public String produce(@RequestBody Person person) { try { streamBridge.send("sendMsg-out-0", person); } catch (Exception e) { e.printStackTrace(); } return "success"; }
}
|
参考资料