大纲
前言
学习资源
Kafka 消费者的概念
消费者的消费方式
消息队列通常有以下两种消费模式:
Push(推)模式 :
- 概述:MQ 服务器主动推送数据给消费者。
- 优点:低延迟,消息实时推送,适合高吞吐场景。
- 缺点:消费者可能因自身处理能力不足被压垮(消息速率不可控)。
Pull(拉)模式 :
- 概述:消费者从 MQ 服务器中主动拉取数据。
- 优点:消费者主动拉取,消费速率可控,适合消费处理能力不均衡的场景。
- 缺点:消息的消费可能会有延迟(需要消费者轮询)。如果 MQ 服务器没有数据,消费者可能会陷入循环中,因为 MQ 服务器一直返回空数据。
Kafka 并没有采用 Push(推)这种消费模式,而是采用 Pull(拉)的消费模式。因为由 Broker 决定消息发送速率,将很难适应所有消费者的消费速度。比如,当推送的速度是 50m/s,那么 Consumer1、Consumer2 就可能来不及处理消息(如下图所示)。
消费者的工作流程
提示
- Kafka 在
0.9
版本之前,Consumer 默认将 offset
存储在 Zookeeper 中。 - Kafka 从
0.9
版本开始,Consumer 默认将 offset
保存在 Kafka 一个内置的 Topic 中,该 Topic 为 __consumer_offsets
,且默认拥有 50 个分区。
消费者组的工作原理
消费者组的概述
Consumer Group(CG): 消费者组,由多个消费者组成。Kafka 形成一个消费者组的条件,是所有消费者的 groupid
相同。值得一提的是,在使用 Kafka 的命令行消费者时,即使只有一个消费者,也会形成一个消费者组;之所以不用专门为该消费者设置 groupid
,这是因为 Kafka 默认给该消费者设置了一个随机的 groupid
。
- 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,如下图所示:
- 向消费组中添加更多的消费者时,如果消费者数量超过了主题分区数量,则有一部分消费者会闲置(即不会接收任何消息),如下图所示:
- 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者(消费者),如下图所示:
消费者组的初始化流程
- coordinator 节点的作用
- coordinator 节点的选择
- 节点选择算法:
groupid 的哈希值 % 50
,这里的 50
是 Kafka 内置主题 __consumer_offsets
的默认分区数量,可以通过 Kafka 的 offsets.topic.num.partitions
参数进行配置。 - 例如,
groupid 的哈希值 = 1; 1 % 50 = 1;
,那么就选择 __consumer_offsets
主题的 1 号分区;该分区在哪个 Broker 上,就选择这个 Brokder 的 coordinator
作为这个消费者组的老大(负责消费者组的初始化和分区的分配)。在该消费者组下的所有消费者提交 offset 的时候,就往这个分区去提交 offset。
- coordinator 相关的配置参数
参数名称 | 参数描述 |
---|
heartbeat.interval.ms | 消费者向 Broker(Coordinator)发送心跳的时间间隔,该参数的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的 1/3,默认值为 3秒 |
session.timeout.ms | 消费者与 Broker(Coordinator)之间心跳超时的阈值,如果消费者在该时间内未向 Broker(Coordinator) 发送心跳,则会被认为消费者已失效,其分区会被重新分配给组内的其他消费者(即触发分区再平衡),默认值为 45秒 |
max.poll.interval.ms | 消费者处理消息的最长时间,如果消费者在该时间内未处理完消息(即未调用 poll() 方法),则会被认为消费者已失效,其分区会被重新分配给组内的其他消费者(即触发分区再平衡),默认值为 5分钟 |
消费者组的完整消费流程
消费者的核心配置参数
参数名称 | 参数描述 |
---|
bootstrap.servers | 向 Kafka 集群建立初始连接用到的 host:port 列表。 |
key.deserializer | 指定接收消息的 key 的反序列化器。 |
value.deserializer | 指定接收消息的 value 的反序列化器。 |
group.id | 消费者组 ID,用于标记消费者所属的消费者组。 |
enable.auto.commit | 指定消费者是否自动周期性地向服务器提交 offset,默认值为 true 。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true ,则该参数定义了消费者自动提交 offset 的时间间隔(频率),默认值是 5 秒 。 |
auto.offset.reset | 当 Kafka 中没有初始 offset 或当前 offset 在服务器中不存在时(比如,数据被删除了),该如何处理? - earliest :自动重置 offset 到最早的 offset。 - latest :自动重置 offset 为最晚的 offset,这是默认值。 - none :如果消费组原来的 offset 不存在,则向消费者抛出异常。 - anything :向消费者抛异常。 |
offsets.topic.num.partitions | Kafka 内置主题 __consumer_offsets 的分区数数量,默认值为 50 。 |
heartbeat.interval.ms | 消费者向 Broker(Coordinator)发送心跳的时间间隔,该参数的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的 1/3,默认值为 3秒 。 |
session.timeout.ms | 消费者与 Broker(Coordinator)之间心跳超时的阈值,如果消费者在该时间内未向 Broker(Coordinator) 发送心跳,则会被认为消费者已失效,其分区会被重新分配给组内的其他消费者(即触发分区再平衡),默认值为 45秒 。 |
max.poll.interval.ms | 消费者处理消息的最长时间,如果消费者在该时间内未处理完消息(即未调用 poll() 方法),则会被认为消费者已失效,其分区会被重新分配给组内的其他消费者(即触发分区再平衡),默认值为 5分钟 。 |
fetch.min.bytes | 消费者获取服务器端一批消息的最小字节数,默认值为 1字节 。 |
fetch.max.wait.ms | 如果消费者没有从服务器端获取到一批消息的最小字节数,等过了该参数指定的等待时间,仍然会去服务器端拉取消息,默认值为 500 毫秒 。 |
fetch.max.bytes | 消费者获取服务器端一批消息的最大字节数,默认值为 50M 。如果服务器端一批次的消息大于该值,仍然可以将这批消息拉取回来,所以这不是一个绝对最大值。消费者拉取一批次消息的大小受 message.max.bytes (Broker 配置)或者 max.message.bytes (Topic 配置)影响。 |
max.poll.records | 消费者每次调用 poll() 方法时,最多能拉取的消息数量,默认值为 500 。 |
Kafka 消费者的使用
消费一个主题
提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-08
。
- 案例目标:创建一个独立消费者,消费
first
主题中数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class CustomerConsumer {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
第一步:启动 Kafa 的命令行生产者,并手动往 first
主题发送消息
1
| $ ./kafka-console-producer.sh --topic first --broker-list 127.0.0.1:9092
|
第二步:在 IDE 工具中执行消费者的代码,观察控制台中是否打印接收到的消息,如下所示:
1 2 3
| ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 11, CreateTime = 1732780407396, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka1) ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 12, CreateTime = 1732780413888, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka2) ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 13, CreateTime = 1732780417120, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka3)
|
特别注意
在消费者 API 代码中必须配置消费者组 ID,即使只有一个消费者存在。在使用命令行启动消费者时,之所以不用专门为该消费者设置 groupid
,这是因为 Kafka 默认给该消费者设置了一个随机的 groupid
。
消费一个分区
提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-09
。
- 案例目标:创建一个独立消费者,消费
first
主题 0
号分区的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class CustomerConsumer {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<TopicPartition> topicPartitions = new ArrayList<>(); topicPartitions.add(new TopicPartition("first", 0)); consumer.assign(topicPartitions);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
第一步:执行以下的生产者代码,创建一个生产者往 first
主题的 0
号分区发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class CustomerProducer {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i < 5; i++) { producer.send(new ProducerRecord<>("first", 0, "", "hello kafka " + i)); } producer.close(); }
}
|
第二步:在 IDE 工具中执行消费者的代码,观察控制台中是否打印接收到的消息,如下所示:
1 2 3
| ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 14, CreateTime = 1732779753896, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 0) ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 15, CreateTime = 1732779753922, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 1) ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 16, CreateTime = 1732779753922, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 2)
|
消费者组消费多个分区
提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-10
。
- 案例目标:消费者组内每个消费者负责消费
first
主题的不同分区的数据,一个分区只能由一个组内消费者消费
1 2 3
| Topic: first TopicId: bvg8QtrBR6yX6kO_pmbp2A PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: first Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: first Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class CustomerConsumer1 {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class CustomerConsumer1 {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
第一步:在 IDE 工具中执行上面的消费者代码,分别启动同一个消费者组中的两个消费者
第一步:执行以下的生产者代码,创建一个生产者往 first
主题的两个分区发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class CustomerProducer2 {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i < 200; i++) { int partition = i % 2; producer.send(new ProducerRecord<>("first", partition, "", "hello kafka " + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) { if (exception != null) { exception.printStackTrace(); } else { System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition()); } } }); } producer.close(); }
}
|
第三步:在 IDE 工具控制台中,会输出以下日志信息,可以看到两个消费者各自消费不同分区的消息,也就是说同一个分区不会被组内两个消费者同时消费
1 2 3
| ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 100, CreateTime = 1732797538636, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 0) ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 101, CreateTime = 1732797538664, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 2) ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 102, CreateTime = 1732797538665, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 4)
|
1 2 3
| ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 100, CreateTime = 1732797538663, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 1) ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 101, CreateTime = 1732797538664, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 3) ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 102, CreateTime = 1732797538665, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 5)
|
Kafka 消费者组的使用
四大分区分配策略
参数名称 | 参数描述 |
---|
heartbeat.interval.ms | 消费者向 Broker(Coordinator)发送心跳的时间间隔,该参数的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的 1/3,默认值为 3秒 |
session.timeout.ms | 消费者与 Broker(Coordinator)之间心跳超时的阈值,如果消费者在该时间内未向 Broker(Coordinator) 发送心跳,则会被认为消费者已失效,其分区会被重新分配给组内的其他消费者(即触发分区再平衡),默认值为 45秒 |
max.poll.interval.ms | 消费者处理消息的最长时间,如果消费者在该时间内未处理完消息(即未调用 poll() 方法),则会被认为消费者已失效,其分区会被重新分配给组内的其他消费者(即触发分区再平衡),默认值为 5分钟 |
partition.assignment.strategy | 消费者的分区分配策略,默认策略是 Range + CooperativeSticky 。Kafka 支持同时使用多个分区分配策略,可以选择的策略包括:Range 、RoundRobin 、Sticky 、CooperativeSticky |
Range 分区分配策略
Range 策略的概述
- Range 分区分配策略是针对每个 Topic 而言的。
- 首先对同一个 Topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
- 假如现在有 7 个分区,3 个消费者,排序后的分区将会是
0,1,2,3,4,5,6
,消费者排序之后将会是 C0,C1,C2
。 - 通过
Partitions 数量 / Consumer 数量
来决定每个消费者应该消费几个分区。如果除不尽,那么最前面的几个消费者将会多消费 1 个分区。 - 例如,
7 / 3 = 2
,余 1,除不尽,那么消费者 C0 将会多消费 1 个分区。8 / 3 = 2
,余 2,除不尽,那么 C0 和 C1 将会分别多消费一个分区。 - 特别注意:如果只是针对 1 个 Topic 而言,C0 消费者多消费 1 个分区的影响并不是很大。但是,如果有 N 个 Topic,那么针对每个 Topic,消费者 C0 都将多消费 1 个分区;随着 Topic 增多,C0 消费的分区会比其他消费者多消费 N 个分区,这导致容易产生数据倾斜。
Range 策略的使用
本节将创建主题 first
,拥有 7 个分区和 3 个副本。准备 3 个消费者(同一消费组)和 1 个 生产者,采用 Range 分区分配策略并进行消费,观察分区的消费分配情况。然后,再停止其中一个消费者,再次观察分区的消费分配情况。
提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-11
。
Range 策略的案例
- 创建
first
主题,包含 7 个分区和 3 个副本
1 2 3 4 5
| $ ./kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --topic first --partitions 7 --replication-factor 3
$ ./kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic first
|
1 2 3 4 5 6 7 8
| Topic: first TopicId: s4_N3GKgQAul_LtfVc_AoQ PartitionCount: 7 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: first Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: first Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: first Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: first Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: first Partition: 4 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: first Partition: 5 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: first Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class CustomerConsumer1 {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class CustomerConsumer2 {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class CustomerConsumer3 {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
提示
由于 Kafka 默认的分区分配策略就是 Range
+ CooperativeSticky
,所以在消费者的代码中不需要指定分区分配策略。
第一步:在 IDE 工具中执行上面的消费者代码,分别启动同一个消费者组中的三个消费者
第二步:执行以下的生产者代码,创建一个生产者往 first
主题的 7 个分区发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class CustomerProducer {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i < 7; i++) { producer.send(new ProducerRecord<>("first", i, "", "hello kafka " + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) { if (exception != null) { exception.printStackTrace(); } else { System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition()); } } }); } producer.close(); }
}
|
第三步:在 IDE 工具控制台中,会输出以下日志信息,可以看到三个消费者各自消费不同分区(两个)的消息,而且其中一个消费者(Leader) 会多消费一个分区,如下所示:
1 2 3
| ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1732855355141, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 0) ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 2, CreateTime = 1732855355164, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 1) ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 2, CreateTime = 1732855355165, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 2)
|
1 2
| ConsumerRecord(topic = first, partition = 3, leaderEpoch = 0, offset = 2, CreateTime = 1732855355165, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 3) ConsumerRecord(topic = first, partition = 4, leaderEpoch = 0, offset = 2, CreateTime = 1732855355165, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 4)
|
1 2
| ConsumerRecord(topic = first, partition = 5, leaderEpoch = 0, offset = 2, CreateTime = 1732855355165, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 5) ConsumerRecord(topic = first, partition = 6, leaderEpoch = 0, offset = 2, CreateTime = 1732855355165, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 6)
|
Range 分区再平衡的案例
- 第一步:执行完上述案例代码的测试步骤后,首先停止消费者一的运行,然后快速(必须是 45 秒内,因为此时消费者一还没有被踢出消费者组)再次执行生产者的代码往 7 个分区发送消息;等待 45 秒后,会发现原本由消费者一负责消费的分区,会整体重新分配给消费者二或者消费者三继续消费(如下所示)。
1 2 3 4 5
| ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1732856785264, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 0) ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 4, CreateTime = 1732856785285, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 1) ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 4, CreateTime = 1732856785285, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 2) ConsumerRecord(topic = first, partition = 3, leaderEpoch = 0, offset = 4, CreateTime = 1732856785286, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 3) ConsumerRecord(topic = first, partition = 4, leaderEpoch = 0, offset = 4, CreateTime = 1732856785286, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 4)
|
1 2
| ConsumerRecord(topic = first, partition = 5, leaderEpoch = 0, offset = 4, CreateTime = 1732856785286, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 5) ConsumerRecord(topic = first, partition = 6, leaderEpoch = 0, offset = 4, CreateTime = 1732856785286, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 6)
|
- 第二步:从上面输出的日志信息可以看到,消费者一挂掉后,消费者组需要根据心跳超时时间(45 秒)来判断消费者一是否退出,所以需要等待一段时间;等时间到了 45 秒后,判断消费者一真的退出了,就会将原本由消费者一负责消费的分区整体重新分配给组内的某个消费者来消费。
- 第三步:等时间过了 45 秒后,再次执行生产者的代码往 7 个分区发送消息;由于消费者一已经被踢出消费者组,即消费者组内只剩下两个消费者了,此时分区会重新按照 Range 分配策略分配给组内剩下的两个消费者,如下所示:
1 2 3 4
| ConsumerRecord(topic = first, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1732858089294, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 0) ConsumerRecord(topic = first, partition = 1, leaderEpoch = 0, offset = 5, CreateTime = 1732858089317, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 1) ConsumerRecord(topic = first, partition = 2, leaderEpoch = 0, offset = 5, CreateTime = 1732858089318, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 2) ConsumerRecord(topic = first, partition = 3, leaderEpoch = 0, offset = 5, CreateTime = 1732858089318, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 3)
|
1 2 3
| ConsumerRecord(topic = first, partition = 4, leaderEpoch = 0, offset = 5, CreateTime = 1732858089319, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 4) ConsumerRecord(topic = first, partition = 5, leaderEpoch = 0, offset = 5, CreateTime = 1732858089319, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 5) ConsumerRecord(topic = first, partition = 6, leaderEpoch = 0, offset = 5, CreateTime = 1732858089320, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 6)
|
RoundRobin 分区分配策略
RoundRobin 策略的概述
- RoundRobin 分区分配策略是针对集群中所有 Topic 而言。
- RoundRobin 轮询分区策略,是将所有的 Partition 和所有的 Consumer 都列出来,然后按照对应的 HashCode 进行排序,最后通过轮询算法来将 Partition 分配给各个 Consumer。
RoundRobin 策略的使用
本节将创建主题 first
,拥有 7 个分区和 3 个副本。准备 3 个消费者(同一消费组)和 1 个 生产者,采用 RoundRobin 分区分配策略并进行消费,观察分区的消费分配情况。然后,再停止其中一个消费者,再次观察分区的消费分配情况。
提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-12
。
RoundRobin 策略的案例
- 创建
first
主题,包含 7 个分区和 3 个副本
1 2 3 4 5
| $ ./kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --topic first --partitions 7 --replication-factor 3
$ ./kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic first
|
1 2 3 4 5 6 7 8
| Topic: first TopicId: s4_N3GKgQAul_LtfVc_AoQ PartitionCount: 7 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: first Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: first Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: first Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: first Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: first Partition: 4 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: first Partition: 5 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: first Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class CustomerConsumer1 {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2"); properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class CustomerConsumer2 {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2"); properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class CustomerConsumer3 {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2"); properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
第一步:在 IDE 工具中执行上面的消费者代码,分别启动同一个消费者组中的三个消费者
第二步:执行以下的生产者代码,创建一个生产者往 first
主题的 7 个分区发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class CustomerProducer {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i < 7; i++) { producer.send(new ProducerRecord<>("first", i, "", "hello kafka " + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) { if (exception != null) { exception.printStackTrace(); } else { System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition()); } } }); } producer.close(); }
}
|
第三步:在 IDE 工具控制台中,会输出以下日志信息,可以看到三个消费者各自消费不同分区的消息,如下所示:
1 2 3
| ConsumerRecord(topic = first, partition = 0, leaderEpoch = 8, offset = 9, CreateTime = 1732934991668, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 0) ConsumerRecord(topic = first, partition = 3, leaderEpoch = 8, offset = 9, CreateTime = 1732934991689, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 3) ConsumerRecord(topic = first, partition = 6, leaderEpoch = 8, offset = 9, CreateTime = 1732934991690, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 6)
|
1 2
| ConsumerRecord(topic = first, partition = 1, leaderEpoch = 6, offset = 9, CreateTime = 1732934991689, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 1) ConsumerRecord(topic = first, partition = 4, leaderEpoch = 6, offset = 9, CreateTime = 1732934991690, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 4)
|
1 2
| ConsumerRecord(topic = first, partition = 2, leaderEpoch = 6, offset = 9, CreateTime = 1732934991689, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 2) ConsumerRecord(topic = first, partition = 5, leaderEpoch = 6, offset = 9, CreateTime = 1732934991690, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 5)
|
RoundRobin 分区再平衡的案例
- 第一步:执行完上述案例代码的测试步骤后,首先停止消费者一的运行,然后快速(必须是 45 秒内,因为此时消费者一还没有被踢出消费者组)再次执行生产者的代码往 7 个分区发送消息;等待 45 秒后,发现原本由消费者一负责消费的分区,会按照 RoundRobin 轮询的方式,将分区轮询分成 0 、6 和 3 号分区,然后分配给消费者二和消费者三继续消费。
1 2 3 4
| ConsumerRecord(topic = first, partition = 1, leaderEpoch = 6, offset = 12, CreateTime = 1732936140359, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 1) ConsumerRecord(topic = first, partition = 4, leaderEpoch = 6, offset = 12, CreateTime = 1732936140360, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 4) ConsumerRecord(topic = first, partition = 0, leaderEpoch = 8, offset = 12, CreateTime = 1732936140337, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 0) ConsumerRecord(topic = first, partition = 6, leaderEpoch = 8, offset = 12, CreateTime = 1732936140360, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 6)
|
1 2 3
| ConsumerRecord(topic = first, partition = 2, leaderEpoch = 6, offset = 12, CreateTime = 1732936140359, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 2) ConsumerRecord(topic = first, partition = 5, leaderEpoch = 6, offset = 12, CreateTime = 1732936140360, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 5) ConsumerRecord(topic = first, partition = 3, leaderEpoch = 8, offset = 12, CreateTime = 1732936140360, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 3)
|
- 第二步:从上面输出的日志信息可以看到,消费者一挂掉后,消费者组需要根据心跳超时时间(45 秒)来判断消费者一是否退出,所以需要等待一段时间;等时间到了 45 秒后,判断消费者一真的退出了,就会将原本由消费者一负责消费的分区按照 RoundRobin 轮询的方式重新分配给组内的所有消费者继续消费。
- 第三步:等时间过了 45 秒后,再次执行生产者的代码往 7 个分区发送消息;由于消费者一已经被踢出消费者组,即消费者组内只剩下两个消费者了,此时分区会重新按照 RoundRobin 分配策略分配给组内剩下的两个消费者,如下所示:
1 2 3 4
| ConsumerRecord(topic = first, partition = 0, leaderEpoch = 8, offset = 13, CreateTime = 1732936983383, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 0) ConsumerRecord(topic = first, partition = 2, leaderEpoch = 6, offset = 13, CreateTime = 1732936983406, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 2) ConsumerRecord(topic = first, partition = 4, leaderEpoch = 6, offset = 13, CreateTime = 1732936983407, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 4) ConsumerRecord(topic = first, partition = 6, leaderEpoch = 8, offset = 13, CreateTime = 1732936983407, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 6)
|
1 2 3
| ConsumerRecord(topic = first, partition = 1, leaderEpoch = 6, offset = 13, CreateTime = 1732936983405, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 1) ConsumerRecord(topic = first, partition = 3, leaderEpoch = 8, offset = 13, CreateTime = 1732936983406, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 3) ConsumerRecord(topic = first, partition = 5, leaderEpoch = 6, offset = 13, CreateTime = 1732936983407, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 5)
|
Sticky 分区分配策略
Sticky 策略的概述
- 粘性分区定义:可以理解为分区分配的结果带有 “粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽可能少地调整分区分配的变动,这样可以节省大量的开销。
- Kafka 从
0.11.x
版本开始引入 Sticky 分区分配策略,首先会尽可能均衡地随机分配分区给消费者;在同一消费者组内,当消费者出现宕机并被踢出消费者组的时候,会尽量保持原有分配的分区不变动。 - 值得注意的是,采用 Sticky 分区分配策略后,在执行第一次分区分配(之前没有执行过分区分配)时,默认是随机分配分区的。
Sticky 策略的使用
本节将创建主题 first
,拥有 7 个分区和 3 个副本。准备 3 个消费者(同一消费组)和 1 个 生产者,采用 Sticky 分区分配策略并进行消费,观察分区的消费分配情况。然后,再停止其中一个消费者,再次观察分区的消费分配情况。
提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-13
。
Sticky 策略的案例
- 创建
first
主题,包含 7 个分区和 3 个副本
1 2 3 4 5
| $ ./kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --topic first --partitions 7 --replication-factor 3
$ ./kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic first
|
1 2 3 4 5 6 7 8
| Topic: first TopicId: s4_N3GKgQAul_LtfVc_AoQ PartitionCount: 7 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: first Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: first Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: first Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: first Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: first Partition: 4 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: first Partition: 5 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: first Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class CustomerConsumer1 {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test3"); properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class CustomerConsumer2 {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test3"); properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class CustomerConsumer3 {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test3"); properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
第一步:在 IDE 工具中执行上面的消费者代码,分别启动同一个消费者组中的三个消费者
第二步:执行以下的生产者代码,创建一个生产者往 first
主题的 7 个分区发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class CustomerProducer {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i < 7; i++) { producer.send(new ProducerRecord<>("first", i, "", "hello kafka " + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) { if (exception != null) { exception.printStackTrace(); } else { System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition()); } } }); } producer.close(); }
}
|
第三步:在 IDE 工具控制台中,会输出以下日志信息,可以看到三个消费者各自消费不同分区的消息,如下所示:
1 2 3
| ConsumerRecord(topic = first, partition = 0, leaderEpoch = 8, offset = 16, CreateTime = 1732939292544, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 0) ConsumerRecord(topic = first, partition = 3, leaderEpoch = 8, offset = 16, CreateTime = 1732939292570, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 3) ConsumerRecord(topic = first, partition = 6, leaderEpoch = 8, offset = 16, CreateTime = 1732939292571, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 6)
|
1 2
| ConsumerRecord(topic = first, partition = 1, leaderEpoch = 6, offset = 16, CreateTime = 1732939292569, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 1) ConsumerRecord(topic = first, partition = 4, leaderEpoch = 6, offset = 16, CreateTime = 1732939292570, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 4)
|
1 2
| ConsumerRecord(topic = first, partition = 2, leaderEpoch = 6, offset = 16, CreateTime = 1732939292570, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 2) ConsumerRecord(topic = first, partition = 5, leaderEpoch = 6, offset = 16, CreateTime = 1732939292571, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 5)
|
Sticky 分区再平衡的案例
- 第一步:执行完上述案例代码的测试步骤后,首先停止消费者一的运行,然后快速(必须是 45 秒内,因为此时消费者一还没有被踢出消费者组)再次执行生产者的代码往 7 个分区发送消息;等待 45 秒后,发现原本由消费者一负责消费的分区,会尽可能均衡地随机分成 0、3、6 号分区,然后随机分配给消费者二和消费者三继续消费。
1 2 3 4
| ConsumerRecord(topic = first, partition = 1, leaderEpoch = 6, offset = 17, CreateTime = 1732940272374, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 1) ConsumerRecord(topic = first, partition = 4, leaderEpoch = 6, offset = 17, CreateTime = 1732940272375, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 4) ConsumerRecord(topic = first, partition = 0, leaderEpoch = 8, offset = 17, CreateTime = 1732940272351, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 0) ConsumerRecord(topic = first, partition = 6, leaderEpoch = 8, offset = 17, CreateTime = 1732940272381, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 6)
|
1 2 3
| ConsumerRecord(topic = first, partition = 2, leaderEpoch = 6, offset = 17, CreateTime = 1732940272375, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 2) ConsumerRecord(topic = first, partition = 5, leaderEpoch = 6, offset = 17, CreateTime = 1732940272375, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 5) ConsumerRecord(topic = first, partition = 3, leaderEpoch = 8, offset = 17, CreateTime = 1732940272375, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 3)
|
- 第二步:从上面输出的日志信息可以看到,消费者一挂掉后,消费者组需要根据心跳超时时间(45 秒)来判断消费者一是否退出,所以需要等待一段时间;等时间到了 45 秒后,判断消费者一真的退出了,就会将原本由消费者一负责消费的分区尽可能均衡地随机重新分配给组内的所有消费者继续消费。
- 第三步:等时间过了 45 秒后,再次执行生产者的代码往 7 个分区发送消息;由于消费者一已经被踢出消费者组,即消费者组内只剩下两个消费者了,此时分区会重新按照 Sticky 分配策略分配给组内剩下的两个消费者,如下所示:
1 2 3 4
| ConsumerRecord(topic = first, partition = 0, leaderEpoch = 8, offset = 18, CreateTime = 1732940641037, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 0) ConsumerRecord(topic = first, partition = 1, leaderEpoch = 6, offset = 18, CreateTime = 1732940641059, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 1) ConsumerRecord(topic = first, partition = 4, leaderEpoch = 6, offset = 18, CreateTime = 1732940641060, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 4) ConsumerRecord(topic = first, partition = 6, leaderEpoch = 8, offset = 18, CreateTime = 1732940641061, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 6)
|
1 2 3
| ConsumerRecord(topic = first, partition = 2, leaderEpoch = 6, offset = 18, CreateTime = 1732940641060, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 2) ConsumerRecord(topic = first, partition = 3, leaderEpoch = 8, offset = 18, CreateTime = 1732940641060, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 3) ConsumerRecord(topic = first, partition = 5, leaderEpoch = 6, offset = 18, CreateTime = 1732940641060, serialized key size = 0, serialized value size = 13, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = hello kafka 5)
|
Kafaka 消费者的 offset
offset 的默认存储位置
值得注意的是,在 Kafka 的不同版本中,存储消费者的 offset 的地方是不一样的:
Kafka 旧版本(0.9.0
及以前)
- 存储方式:
- 消费者的 offset 信息存储在 Zookeeper 中。
- 存储特点:
- 每个消费者组都会在 Zookeeper 中维护其消费的 offset 信息。
- Zookeeper 主要负责集群的元数据管理,频繁地写入 offset 数据会导致 Zookeeper 的性能问题,尤其是在高吞吐量场景下。
- 存储偏移时,Zookeeper 会受到延迟和负载的影响,扩展性较差。
Kafka 新版本(0.10.0
及以后)
- 存储方式:
- 消费者的 offset 信息存储在 Kafka 的内置主题
__consumer_offsets
中,该主题默认拥有 50 个分区。
- 存储特点:
- 在 Kafka 内置主题
__consumer_offsets
里面,采用键值对的方式存储数据。其中 Key 是 GroupId + Topic + 分区号
,而 Value 就是当前 offset
的值。每隔一段时间,Kafka 内部会对这个内置 Topic 进行压缩(Compact),也就是针对每个 GroupId + Topic + 分区号
只保留最新的数据。 - Kafka 内置的
__consumer_offsets
是一个特殊的主题,用于存储消费者组的 offset 信息。这个主题支持分区和副本,提供了更高的可靠性和扩展性。 - Kafka 的 Broker 可以高效地管理 offset 的存储和读取,减轻了 Zookeeper 的压力。
- 支持更细粒度的控制,例如自动提交(
auto.commit
)或手动提交 offset。 - 提高了整体性能,尤其是在高吞吐场景下,消费者提交 offset 的操作不会影响整个集群的性能。
在 Kafka 中,offset 相关的参数如下:
参数名称 | 参数描述 |
---|
enable.auto.commit | 是否开启自动提交 offset 功能,默认值是 true 。开启后,消费者会自动周期性地向 Kafka 服务器提交 offset。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true ,则该参数定义了消费者自动提交 offset 的时间间隔(频率),默认值是 5 秒 。 |
offsets.retention.minutes | Kafka 中内置主题 __consumer_offsets 存储 offset 的保留时间,默认值是 24 小时 。具体来说,当一个消费者组的 offset 变为无效(例如消费者组停止消费或从未重新平衡中恢复),Kafka 会在超过这个时间后清理该组的 offset。 |
消费 offset 的案例
提示
因为 __consumer_offsets
是 Kafka 的内置主题,所以可以通过消费者对该主题进行消费。
- (1) 在 Kafka 的配置文件
config/consumer.properties
中添加配置参数 exclude.internal.topics
,默认值是 true
,即表示不能消费 Kafka 的内置主题。为了查看该内置主题的数据,需要将该参数修改为 false
。
1
| exclude.internal.topics=false
|
1
| [centos@node02 kafka]$ bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test --partitions 3 --replication-factor 3
|
- (3) 启动命令行生产者往
test
主题发送多条消息
1
| [centos@node02 kafka]$ bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test
|
- (4) 启动命令行消费者消费
test
主题,同时指定消费者组的 ID,这是为了可以更好地观察 offset 的存储状况
1
| [centos@node02 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --group mygroup
|
- (5) 启动命令行消费者消费内置主题
__consumer_offsets
,这样就可以查看内置主题 __consumer_offsets
的数据
1
| [centos@node02 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --consumer.config ../config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --topic __consumer_offsets --from-beginning
|
1 2 3
| [mygroup,test,1]::OffsetAndMetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1732965368784, expireTimestamp=None) [mygroup,test,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1732965368784, expireTimestamp=None) [mygroup,test,2]::OffsetAndMetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1732965368784, expireTimestamp=None)
|
其中的 [mygroup,test,2]
是 Key,分别对应 GroupId + Topic + 分区号
。
自动提交 offset
自动提交 offset 的概述
为了让开发者能够专注于业务逻辑的开发,Kafka 提供了消费者自动提交 offset 的功能,如下图所示:
消费者自动提交 offset 的相关参数如下:
参数名称 | 描述 |
---|
enable.auto.commit | 是否开启自动提交 offset 功能,默认值是 true 。开启后,消费者会自动周期性地向 Kafka 服务器提交 offset。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true ,则该参数定义了消费者自动提交 offset 的时间间隔(频率),默认值是 5 秒 。 |
自动提交 offset 的案例
提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-14
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class CustomerConsumer {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
手动提交 offset
手动提交 offset 的概述
- 虽然自动提交 offset 十分简单便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此,Kafka 还提供了手动提交 offset 的 API。
- Kafka 手动提交 offset 有以下两种方式:
commitSync(同步提交)
:必须阻塞等待 offset 提交完毕,再去消费下一批数据。commitAsync(异步提交)
:发送完提交 offset 的请求后,就可以立刻消费下一批数据了。- 两者的相同点是,都会将本次提交的一批数据的最高偏移量(offset)提交。
- 两者的不同点是,同步提交会阻塞当前线程,直到提交成功为止,并且会自动失败重试(由于诸多不可控因素导致,也可能会出现提交失败的情况); 而异步提交则没有失败重试机制,因此有可能提交失败。
手动提交 offset 的使用
提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-15
。
同步提交 offset 的案例
由于同步提交 offset 有失败重试机制,因此更加可靠;但是,需要一直阻塞等待提交的结果,这也导致了提交效率比较低。以下为同步提交 offset 的案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class CustomerConsumer {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } consumer.commitSync(); } }
}
|
异步提交 offset 的案例
虽然同步提交 offset 有失败重试机制,更可靠一些,但是由于其会阻塞当前线程,直到 offset 提交成功。因此,吞吐量会受到很大的影响。在通常的情况下,更多会选用异步提交 offset 的方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class CustomerConsumer {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } consumer.commitAsync(); } }
}
|
消费者指定 offset 消费
消费者指定 offset 消费的概述
当 Kafka 中没有初始偏移量(消费者组第一次消费)或者服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办呢?Kafka 为此提供了 auto.offset.reset
配置参数,该参数的值有三种类型:
- (1)
earliest
:自动将偏移量重置为最早的偏移量,相当于 --from-beginning
。 - (2)
latest
:自动将偏移量重置为最新(最晚)的偏移量,这是默认值。 - (3)
none
:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
参数名称 | 参数描述 |
---|
auto.offset.reset | 指定在消费者找不到有效偏移量时的处理策略,该参数的值有三种类型:earliest 、latest 、none ,默认值是 latest 。特别注意,该参数仅在首次消费或偏移量丢失时生效,平时消费者会按已提交的偏移量消费。 |
消费者指定 offset 消费的案例
本节将让消费者从任意指定的 offset 位置开始消费数据。
提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-16
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public class CustomerConsumer {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
Set<TopicPartition> assignment = new HashSet<>(); while (assignment.size() == 0) { consumer.poll(Duration.ofSeconds(1)); assignment = consumer.assignment(); }
for (TopicPartition topicPartition : assignment) { consumer.seek(topicPartition, 23); }
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
特别注意
- 上述代码每次执行完后,如果希望再次执行,那么就需要更改消费者组的 ID,否则消费者会消费不到数据。
- 这是因为 Kafka 的 offset 是按消费者组来存储的,如果更换了消费者组,新组没有之前的 offset 记录,就会从手动指定的 offset 开始消费消息,又或者根据
auto.offset.reset
配置来重新消费消息。 - 值得一提的是,如果消费者组 ID 不变,Kafka 会记录该组上一次提交的 offset,即使开发者手动指定了
seek
偏移量,也可能会出现偏移冲突,导致消费者会消费不到数据。
消费者按照时间消费
在生产环境中,会遇到最近消费的几个小时存在数据异常,想按照时间对消息进行重新消费。例如,要求按照时间重新消费前一天的数据,那么应该怎么处实现呢?
提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-17
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| public class CustomerConsumer {
public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);
Set<TopicPartition> assignment = new HashSet<>(); while (assignment.size() == 0) { consumer.poll(Duration.ofSeconds(1)); assignment = consumer.assignment(); }
Map<TopicPartition, Long> timestampToSearch = new HashMap<>(); for (TopicPartition topicPartition : assignment) { timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000); }
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);
for (TopicPartition topicPartition : assignment) { OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition); if (offsetAndTimestamp != null) { consumer.seek(topicPartition, offsetAndTimestamp.offset()); } }
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } }
}
|
Kafka 消费者的最佳实践
Kafka 重复消费与漏消费问题
重复消费
- 已经消费了数据,但是 offset 没有提交。
- 重复消费通常是由于 Kafka 消费者启用自动提交 offset 引起的。
- 比如:设置了 offset 为自动提交,当消费者将消息数据处理完(写库),还没等到 offset 提交,此时刚好消费者线程被 kill 掉,等消费者再次启动后,则会从上一次提交的 offset 位置继续消费,最终导致消息重复消费。
漏消费
- 先提交 offset 再消费,有可能会造成消息的漏消费。
- 比如:设置了 offset 为手动提交,当 offset 被提交时,消息数据还在内存中未处理,此时刚好消费者线程被 Kill 掉,导致内存中的消息数据丢失;由于 offset 已经提交,但是消息数据未处理,最终导致消息漏消费。
思考
Kafka 消费者如何才能做到既不漏消费,也不重复消费呢?详见下面介绍的消费者事务。
Kafka 消费者事务的实现
如果希望实现消费者端的精准一次性消费,那么需要 Kafka 消费端将消费过程和提交 offset 过程做原子绑定。此时,开发者需要将 Kafka 的 offset 存储到支持事务的自定义介质中(比如 MySQL)。
Kafka 消费者提高吞吐量
Kafka 如何提高消费者的消费速度呢?
(1) 如果是 Kafka 消费能力不足,则可以考虑增加 Topic 的分区数量,并且同时增加消费组的消费者数量,这两个条件缺一不可,即 消费者的数量 = Topic 的分区数量
。
(2) 如果是下游的数据处理不及时(有较大延迟),则可以提高消费者每批次拉取消息的数量(默认每批次拉取 500 条消息)。每批次拉取数据过少(拉取的数据量 / 数据处理时间 < 生产速度),会使消费者处理数据的速度小于生产者生产数据的速度,从而可能导致消息积压。
参数名称 | 参数描述 |
---|
fetch.max.bytes | 消费者获取服务器端一批消息的最大字节数,默认值为 50M 。如果服务器端一批次的消息大于该值,仍然可以将这批消息拉取回来,所以这不是一个绝对最大值。消费者拉取一批次消息的大小受 message.max.bytes (Broker 配置)或者 max.message.bytes (Topic 配置)影响。 |
max.poll.records | 消费者每次调用 poll() 方法时,最多能拉取的消息数量,默认值为 500 。 |