大纲 Kafka 生产者 生产者消息发送流程 生产者消息发送原理 Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程 — main
线程和 Sender
线程。在 main
线程中,会创建一个双端队列 RecordAccumulator
。值得一提的是,main
线程将消息发送给 RecordAccumulator
时,Sender
线程会不断从 RecordAccumulator
中拉取消息并发送到 Kafka Broker。
生产者重要参数列表
生产者异步发送 API 普通的异步发送 提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-01
。
1 2 3 4 5 <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > <version > 3.2.1</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class CustomerProducer { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0 ; i < 5 ; i++) { producer.send(new ProducerRecord<>("test" , "hello kafka " + i)); } producer.close(); } }
第一步:启动 Kafka 的控制台消费者:
第二步:在 IDE 工具中执行代码,观察控制台消费者中是否接收到消息,如下所示:
1 2 3 4 5 hello kafka 0 hello kafka 1 hello kafka 2 hello kafka 3 hello kafka 4
带回调函数的异步发送 回调方法会在 Producer 收到 ack
时调用,且为异步调用;该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception)。如果 Exception
为 null
,则说明消息发送成功,如果 Exception
不为 null
,则说明消息发送失败。值得一提的是,消息发送失败会自动重试发送,不需要在回调函数中手动重试发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class CustomerProducer2 { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0 ; i < 5 ; i++) { producer.send(new ProducerRecord<>("test" , "hello kafka " + i), new Callback() { @Override public void onCompletion (RecordMetadata recordMetadata, Exception exception) { if (exception == null ) { System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition()); } } }); } producer.close(); } }
除了在 Kafka 的控制台消费者中接收到消息之外,还可以在 IDE 的控制台看到如下的输出信息:
1 2 3 4 5 topic: test, partition: 0 topic: test, partition: 0 topic: test, partition: 0 topic: test, partition: 0 topic: test, partition: 0
生产者同步发送 API 提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-02
。
普通的同步发送 同步发送的意思就是,当一条消息发送之后,会阻塞当前线程,直至收到 ack
应答。由于 send()
方法返回的是一个 Future
对象,根据 Futrue
对象的特点,只需调用 Future
对象的 get()
方法即可实现同步发送。
1 2 3 4 5 <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > <version > 3.2.1</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class CustomerProducer { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0 ; i < 5 ; i++) { try { producer.send(new ProducerRecord<>("test" , "hello kafka " + i)).get(); } catch (Exception e) { e.printStackTrace(); } } producer.close(); } }
第一步:启动 Kafka 的控制台消费者:
第二步:在 IDE 工具中执行代码,观察控制台消费者中是否接收到消息,如下所示:
1 2 3 4 5 hello kafka 0 hello kafka 1 hello kafka 2 hello kafka 3 hello kafka 4
带回调函数的同步发送 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class CustomerProducer2 { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0 ; i < 5 ; i++) { try { producer.send(new ProducerRecord<>("test" , "hello kafka " + i), new Callback() { @Override public void onCompletion (RecordMetadata recordMetadata, Exception exception) { if (exception == null ) { System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition()); } } }).get(); } catch (Exception e) { e.printStackTrace(); } } producer.close(); } }
除了在 Kafka 的控制台消费者中接收到消息之外,还可以在 IDE 的控制台看到如下的输出信息:
1 2 3 4 5 topic: test, partition: 0 topic: test, partition: 2 topic: test, partition: 0 topic: test, partition: 1 topic: test, partition: 2
生产者分区 生产者分区分区的优点 提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位消费数据 便于合理使用存储资源,每个 Partition 在一个 Broker 上存储,可以把海量的数据按照分区切割成一块一块的数据并存储在多台 Broker 上。合理控制分区的任务,可以实现负载均衡的效果
生产者发送消息的分区策略 默认的分区器类是 DefaultPartitioner
,部分源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class DefaultPartitioner implements Partitioner { ...... }
通过 KafkaProducer
类的 send()
方法发送消息时,需要指定 ProducerRecord
对象作为参数,ProducerRecord
类的构造方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class ProducerRecord <K , V > { public ProducerRecord (String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) { ...... } public ProducerRecord (String topic, Integer partition, Long timestamp, K key, V value) { ...... } public ProducerRecord (String topic, Integer partition, K key, V value, Iterable<Header> headers) { ...... } public ProducerRecord (String topic, Integer partition, K key, V value) { ...... } public ProducerRecord (String topic, K key, V value) { ...... } public ProducerRecord (String topic, V value) { ...... }
调用 ProducerRecord
类不同的构造方法时,有以下几种分区策略:
在指明 partition
的情况下,直接将指明的值作为 partition
值。例如:partition=0
,那么数据会被写入分区 0。
在没有指明 partition
值,但有指定 key
的情况下,将 key
的 Hash 值与 topic
的 partition
数进行取余来得到 partition
值。例如:key
的 Hash 值是 5,topic
的 partition
数是 2,那么 key
对应的 value
会被写入 1 号分区。
在既没有指明 partition
值,又没有指定 key
的情况下,Kafka 会采用 Sticky Partition
黏性分区器,也就是会随机选择一个分区,并尽可能一直使用该分区,等该分区的 batch
已满或者已完成,Kafka 再随机一个分区进行使用(和上一次选的分区不同)。例如:第一次随机选择 0 号分区,等 0 号分区当前批次满了(默认 16K 大小)或者 linger.ms
设置的时间到了,Kafka 会再随机选择一个分区进行使用(如果还是 0 分区会继续随机选择一个分区)。
自定义生产者的分区器 开发人员可以根据业务需求自定义分区器,只需要实现 Partitioner
接口即可。
提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-03
。
自定义分区器类,实现 Partitioner
接口,并重写 partition()
方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class CustomPartitioner implements Partitioner { @Override public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) { String msgValue = value.toString(); int partition; if (msgValue.contains("order" )) { partition = 0 ; } else { partition = 1 ; } return partition; } @Override public void close () { } @Override public void configure (Map<String, ?> configs) { } }
在生产者的配置中添加分区器参数,以此来指定自定义分区器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class CustomerProducer { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); for (int i = 0 ; i < 5 ; i++) { producer.send(new ProducerRecord<>("test" , "hello kafka " + i), new Callback() { @Override public void onCompletion (RecordMetadata metadata, Exception exception) { System.out.println("Partition : " + metadata.partition()); } }); } producer.close(); } }
生产者最佳实践 生产者如何提高吞吐量 参数优化 为了让生产者提高吞吐量(发送消息的效率),可以优化以下几个参数:
batch.size
:批次大小,默认 16k
linger.ms
:等待时间,默认 0ms
,修改为 5-100ms
compression.type
:压缩方式,默认是 none
,修改过为 snappy
RecordAccumulator
:缓冲区(双端队列)大小,默认是 32m
,修改为 64m
参数说明
示例代码 提示
本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-04
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class CustomerProducer { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093" ); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.LINGER_MS_CONFIG, 5 ); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024 ); properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy" ); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024 ); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0 ; i < 5 ; i++) { producer.send(new ProducerRecord<>("test" , "hello kafka " + i)); } producer.close(); } }