大纲 前言 官方文档 学习路线 Kafka 生产者 生产者消息发送流程 生产者消息发送原理 Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程 — main
线程和 Sender
线程。在 main
线程中,会创建一个双端队列 RecordAccumulator
。值得一提的是,main
线程将消息发送给 RecordAccumulator
时,Sender
线程会不断从 RecordAccumulator
中拉取消息并发送到 Kafka Broker。
生产者重要参数列表
生产者异步发送 API 普通的异步发送 提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-01
。
1 2 3 4 5 <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > <version > 3.2.1</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class CustomerProducer { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0 ; i < 5 ; i++) { producer.send(new ProducerRecord<>("test" , "hello kafka " + i)); } producer.close(); } }
第一步:启动 Kafka 的控制台消费者:
第二步:在 IDE 工具中执行代码,观察控制台消费者中是否接收到消息,如下所示:
1 2 3 4 5 hello kafka 0 hello kafka 1 hello kafka 2 hello kafka 3 hello kafka 4
带回调函数的异步发送 回调方法会在 Producer 收到 ack
时调用,且为异步调用;该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception)。如果 Exception
为 null
,则说明消息发送成功,如果 Exception
不为 null
,则说明消息发送失败。值得一提的是,消息发送失败会自动重试发送,不需要在回调函数中手动重试发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class CustomerProducer2 { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0 ; i < 5 ; i++) { producer.send(new ProducerRecord<>("test" , "hello kafka " + i), new Callback() { @Override public void onCompletion (RecordMetadata recordMetadata, Exception exception) { if (exception != null ) { exception.printStackTrace(); } else { System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition()); } } }); } producer.close(); } }
除了在 Kafka 的控制台消费者中接收到消息之外,还可以在 IDE 的控制台看到如下的输出信息:
1 2 3 4 5 topic: test, partition: 0 topic: test, partition: 0 topic: test, partition: 0 topic: test, partition: 0 topic: test, partition: 0
生产者同步发送 API 提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-02
。
普通的同步发送 同步发送的意思就是,当一条消息发送之后,会阻塞当前线程,直至收到 ack
应答。由于 send()
方法返回的是一个 Future
对象,根据 Futrue
对象的特点,只需调用 Future
对象的 get()
方法即可实现同步发送。
1 2 3 4 5 <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > <version > 3.2.1</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class CustomerProducer { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0 ; i < 5 ; i++) { try { producer.send(new ProducerRecord<>("test" , "hello kafka " + i)).get(); } catch (Exception e) { e.printStackTrace(); } } producer.close(); } }
第一步:启动 Kafka 的控制台消费者:
第二步:在 IDE 工具中执行代码,观察控制台消费者中是否接收到消息,如下所示:
1 2 3 4 5 hello kafka 0 hello kafka 1 hello kafka 2 hello kafka 3 hello kafka 4
带回调函数的同步发送 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class CustomerProducer2 { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0 ; i < 5 ; i++) { try { producer.send(new ProducerRecord<>("test" , "hello kafka " + i), new Callback() { @Override public void onCompletion (RecordMetadata recordMetadata, Exception exception) { if (exception == null ) { System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition()); } } }).get(); } catch (Exception e) { e.printStackTrace(); } } producer.close(); } }
除了在 Kafka 的控制台消费者中接收到消息之外,还可以在 IDE 的控制台看到如下的输出信息:
1 2 3 4 5 topic: test, partition: 0 topic: test, partition: 2 topic: test, partition: 0 topic: test, partition: 1 topic: test, partition: 2
生产者分区 生产者分区的优点 提高并行度,生产者支持以分区为单位发送数据,消费者支持以分区为单位消费数据。 便于合理使用存储资源,每个 Partition 在一台 Broker 上存储,可以把海量的数据按照分区切割成一块一块的数据并存储在多台 Broker 上。合理控制分区的任务,可以实现负载均衡的效果。
生产者发送消息的分区策略 默认的分区器类是 DefaultPartitioner
,部分源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class DefaultPartitioner implements Partitioner { ...... }
通过 KafkaProducer
类的 send()
方法发送消息时,需要指定 ProducerRecord
对象作为参数,ProducerRecord
类的构造方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class ProducerRecord <K , V > { public ProducerRecord (String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) { ...... } public ProducerRecord (String topic, Integer partition, Long timestamp, K key, V value) { ...... } public ProducerRecord (String topic, Integer partition, K key, V value, Iterable<Header> headers) { ...... } public ProducerRecord (String topic, Integer partition, K key, V value) { ...... } public ProducerRecord (String topic, K key, V value) { ...... } public ProducerRecord (String topic, V value) { ...... }
调用 ProducerRecord
类不同的构造方法时,有以下几种分区策略:
在指明 partition
的情况下,直接将指明的值作为 partition
值。例如:partition=0
,那么数据会被写入分区 0。
在没有指明 partition
值,但有指定 key
的情况下,将 key
的 Hash 值与 topic
的 partition
数进行取余来得到 partition
值。例如:key
的 Hash 值是 5,topic
的 partition
数是 2,那么 key
对应的 value
会被写入 1 号分区。
在既没有指明 partition
值,又没有指定 key
的情况下,Kafka 会采用 Sticky Partition
黏性分区器,也就是会随机选择一个分区,并尽可能一直使用该分区,等该分区的 batch
已满或者已完成,Kafka 再随机一个分区进行使用(和上一次选的分区不同)。例如:第一次随机选择 0 号分区,等 0 号分区当前批次满了(默认 16K 大小)或者 linger.ms
设置的时间到了,Kafka 会再随机选择一个分区进行使用(如果还是 0 分区会继续随机选择一个分区)。
自定义生产者的分区器 开发人员可以根据业务需求自定义分区器,只需要实现 Partitioner
接口即可。
提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-03
。
自定义分区器类,实现 Partitioner
接口,并重写 partition()
方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class CustomPartitioner implements Partitioner { @Override public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) { String msgValue = value.toString(); int partition; if (msgValue.contains("order" )) { partition = 0 ; } else { partition = 1 ; } return partition; } @Override public void close () { } @Override public void configure (Map<String, ?> configs) { } }
在生产者的配置中添加分区器参数,以此来指定自定义分区器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class CustomerProducer { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); for (int i = 0 ; i < 5 ; i++) { producer.send(new ProducerRecord<>("test" , "hello kafka " + i), new Callback() { @Override public void onCompletion (RecordMetadata metadata, Exception exception) { System.out.println("Partition : " + metadata.partition()); } }); } producer.close(); } }
除了在 Kafka 的控制台消费者中接收到消息之外,还可以在 IDE 的控制台看到如下的输出信息:
1 2 3 4 5 Partition : 1 Partition : 1 Partition : 1 Partition : 1 Partition : 1
生产者最佳实践 生产者如何提高吞吐量 参数优化 为了让生产者提高吞吐量(发送消息的效率),可以优化以下几个参数:
batch.size
:批次大小,默认 16k
linger.ms
:等待时间,默认 0ms
,修改为 5-100ms
compression.type
:压缩方式,默认是 none
,修改过为 snappy
RecordAccumulator
:缓冲区(双端队列)大小,默认是 32m
,修改为 64m
特别注意
上述的四个参数值并不是设置得越大就越好,设置得过大会导致 Kafka 中的消息被延迟消费。 当 Topic 的分区数量比较多的时候,可以适当增加 RecordAccumulator(缓冲区)
的大小。 参数说明
示例代码 提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-04
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class CustomerProducer { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.LINGER_MS_CONFIG, 5 ); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024 ); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy" ); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024 ); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0 ; i < 5 ; i++) { producer.send(new ProducerRecord<>("test" , "hello kafka " + i)); } producer.close(); } }
生产者如何保证数据可靠性 这里的数据可靠性是指生产者如何保证消息可以发送给 Kafka,并保证 Kafka 可以持久化消息内容。
消息发送流程
ISR 介绍
Leader 维护了一个动态的 in-sync replica set (ISR)
,意为和 Leader 保持同步的 Follower + Leader 集合(leader: 0,isr: 0, 1, 2)。
ACK 应答原理
数据可靠性总结
特别注意
当 ACK 级别设置为 -1
的时候,虽然可以完全保证数据的可靠性,但会存在数据重复的情况,详细的介绍可以看 这里 。
示例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class CustomerProducer { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.ACKS_CONFIG, "all" ); properties.put(ProducerConfig.RETRIES_CONFIG, 3 ); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0 ; i < 5 ; i++) { producer.send(new ProducerRecord<>("test" , "hello kafka " + i)); } producer.close(); } }
生产者如何处理数据去重 Kafka 从 0.11
版本以后,引入了一项重大特性:幂等性和事务,可用于解决数据重复的问题。
数据重复分析
数据传递语义 至少一次 (At Least Once) = ACK 级别设置为 - 1
+ 分区副本大于等于 2
+ ISR 里应答的最小副本数量大于等于 2
最多一次 (At Most Once) = ACK 级别设置为 0
总结
至少一次 (At Least Once) 可以保证数据不丢失,但是不能保证数据不重复。 最多一次 (At Most Once) 可以保证数据不重复,但是不能保证数据不丢失。 精确一次 (Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复,也不能丢失。 幂等性使用 幂等性介绍 幂等性就是指 Producer (生产者) 无论向 Broker 发送多少次重复数据,Broker 端都只会持久化一条数据,保证了数据不重复。 精确一次 (Exactly Once) = 幂等性 + 至少一次 (acks = -1
+ 分区副本数 >= 2
+ ISR 里应答的最小副本数量 >= 2
)。 重复数据的判断标准:具有 <PID, Partition, SeqNumber>
相同主键的消息提交时,Broker 只会持久化一条。其中 PID 是 Kafka 每次重启都会分配一个新的,Partition 表示分区号,Sequence Number 是单调自增的序列号,所以幂等性只能保证数据在单分区单会话内不重复,这里的单会话是相对于 Kafka 单次重启来说。如果需要解决多分区多会话的数据重复问题,需要结合幂等性与事务来解决。
幂等性开启 开启幂等性的配置参数是 enable.idempotence
,默认值为 true
,设置 false
会关闭幂等性。
提示
Kafka 官方文档中的幂等性详细介绍可以看 这里 。
事务使用 特别注意
由于事务的底层是基于幂等性,因此在生产者使用事务之前,必须开启幂等性(默认开启)。 更多关于 Kafka 事务的使用介绍可以看 这里 的教程。 事务原理介绍
事务流程介绍
如上图所示,整个事务流程分为以下几个步骤:
事务初始化
:initTransactions事务启动
:beginTransaction发送消息
:一般发送多条消息,可以向 1 个或多个 Topic 发送消息事务提交
:commitTransaction事务回滚
:abortTransaction消费消息
当 Producer 发送多条事务消息的话:
事务初始化是一次性的 事务开始、发送消息、事务提交 / 回滚则会一直循环运行 事务的常用 API 1 2 3 4 5 6 7 8 9 10 11 12 13 14 void initTransactions () ;void beginTransaction () throws ProducerFencedException ;void sendOffsetsToTransaction (Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException ;void commitTransaction () throws ProducerFencedException ;void abortTransaction () throws ProducerFencedException ;
示例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class CustomerProducer { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_01" ); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); producer.initTransactions(); producer.beginTransaction(); try { for (int i = 0 ; i < 5 ; i++) { producer.send(new ProducerRecord<>("test" , "hello kafka " + i)); } producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); } finally { producer.close(); } } }
生产者如何保证数据有序 数据有序分析 提示
单分区内,数据是有序的(需要符合一定的条件)。 多分区,分区与分区之间无序。 如果要求多分区的数据有序,可以让消费者读取多个分区的数据,并存储在本地内存中,然后对内存中的数据进行排序,最后再进一步统一处理数据。但这会引申出一个问题,也就是消费者需要等待所有分区的数据都读取完了才能进一步处理数据,造成数据的处理效率比较低。
保证数据有序