Kafka 入门教程之二

大纲

前言

官方文档

学习路线

Kafka 生产者

生产者消息发送流程

生产者消息发送原理

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程 — main 线程和 Sender 线程。在 main 线程中,会创建一个双端队列 RecordAccumulator。值得一提的是,main 线程将消息发送给 RecordAccumulator 时,Sender 线程会不断从 RecordAccumulator 中拉取消息并发送到 Kafka Broker。

生产者重要参数列表

生产者异步发送 API

普通的异步发送

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-01

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
  • Java 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息
producer.send(new ProducerRecord<>("test", "hello kafka " + i));
}
// 关闭资源
producer.close();
}

}
  • 测试代码

第一步:启动 Kafka 的控制台消费者:

1
# ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test

第二步:在 IDE 工具中执行代码,观察控制台消费者中是否接收到消息,如下所示:

1
2
3
4
5
hello kafka 0
hello kafka 1
hello kafka 2
hello kafka 3
hello kafka 4

带回调函数的异步发送

回调方法会在 Producer 收到 ack 时调用,且为异步调用;该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception)。如果 Exceptionnull,则说明消息发送成功,如果 Exception 不为 null,则说明消息发送失败。值得一提的是,消息发送失败会自动重试发送,不需要在回调函数中手动重试发送。

  • Java 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class CustomerProducer2 {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息(带回调函数)
producer.send(new ProducerRecord<>("test", "hello kafka " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition());
}
}
});
}
// 关闭资源
producer.close();
}

}
  • 测试代码

除了在 Kafka 的控制台消费者中接收到消息之外,还可以在 IDE 的控制台看到如下的输出信息:

1
2
3
4
5
topic: test, partition: 0
topic: test, partition: 0
topic: test, partition: 0
topic: test, partition: 0
topic: test, partition: 0

生产者同步发送 API

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-02

普通的同步发送

同步发送的意思就是,当一条消息发送之后,会阻塞当前线程,直至收到 ack 应答。由于 send() 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,只需调用 Future 对象的 get() 方法即可实现同步发送。

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
  • Java 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 同步发送消息
try {
producer.send(new ProducerRecord<>("test", "hello kafka " + i)).get();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭资源
producer.close();
}

}
  • 测试代码

第一步:启动 Kafka 的控制台消费者:

1
# ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test

第二步:在 IDE 工具中执行代码,观察控制台消费者中是否接收到消息,如下所示:

1
2
3
4
5
hello kafka 0
hello kafka 1
hello kafka 2
hello kafka 3
hello kafka 4

带回调函数的同步发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CustomerProducer2 {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 同步发送消息(带回调函数)
try {
producer.send(new ProducerRecord<>("test", "hello kafka " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception == null) {
System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition());
}
}
}).get();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭资源
producer.close();
}

}
  • 测试代码

除了在 Kafka 的控制台消费者中接收到消息之外,还可以在 IDE 的控制台看到如下的输出信息:

1
2
3
4
5
topic: test, partition: 0
topic: test, partition: 2
topic: test, partition: 0
topic: test, partition: 1
topic: test, partition: 2

生产者分区

生产者分区的优点

  • 提高并行度,生产者支持以分区为单位发送数据,消费者支持以分区为单位消费数据。
  • 便于合理使用存储资源,每个 Partition 在一台 Broker 上存储,可以把海量的数据按照分区切割成一块一块的数据并存储在多台 Broker 上。合理控制分区的任务,可以实现负载均衡的效果。

生产者发送消息的分区策略

默认的分区器类是 DefaultPartitioner,部分源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose the sticky partition that changes when the batch is full.
*
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {

......

}

通过 KafkaProducer 类的 send() 方法发送消息时,需要指定 ProducerRecord 对象作为参数,ProducerRecord 类的构造方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ProducerRecord<K, V> {

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
......
}

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
......
}

public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
......
}

public ProducerRecord(String topic, Integer partition, K key, V value) {
......
}

public ProducerRecord(String topic, K key, V value) {
......
}

public ProducerRecord(String topic, V value) {
......
}

调用 ProducerRecord 类不同的构造方法时,有以下几种分区策略:

  • 在指明 partition 的情况下,直接将指明的值作为 partition 值。例如:partition=0,那么数据会被写入分区 0。

  • 在没有指明 partition 值,但有指定 key 的情况下,将 key 的 Hash 值与 topicpartition 数进行取余来得到 partition 值。例如:key 的 Hash 值是 5,topicpartition 数是 2,那么 key 对应的 value 会被写入 1 号分区。

  • 在既没有指明 partition 值,又没有指定 key 的情况下,Kafka 会采用 Sticky Partition 黏性分区器,也就是会随机选择一个分区,并尽可能一直使用该分区,等该分区的 batch 已满或者已完成,Kafka 再随机一个分区进行使用(和上一次选的分区不同)。例如:第一次随机选择 0 号分区,等 0 号分区当前批次满了(默认 16K 大小)或者 linger.ms 设置的时间到了,Kafka 会再随机选择一个分区进行使用(如果还是 0 分区会继续随机选择一个分区)。

自定义生产者的分区器

开发人员可以根据业务需求自定义分区器,只需要实现 Partitioner 接口即可。

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-03

  • 自定义分区器类,实现 Partitioner 接口,并重写 partition() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 自定义分区器
*/
public class CustomPartitioner implements Partitioner {

/**
* 返回消息对应的分区
*
* @param topic 主题
* @param key 消息的 key
* @param keyBytes 消息的 key 序列化后的字节数组
* @param value 消息的 value
* @param valueBytes 消息的 value 序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取消息内容
String msgValue = value.toString();

// 定义分区号
int partition;

if (msgValue.contains("order")) {
partition = 0;
} else {
partition = 1;
}
// 返回分区号
return partition;
}

/**
* 关闭资源
*/
@Override
public void close() {

}

/**
* 配置信息
*
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {

}

}
  • 在生产者的配置中添加分区器参数,以此来指定自定义分区器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 指定自定义分区器
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息
producer.send(new ProducerRecord<>("test", "hello kafka " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("Partition : " + metadata.partition());
}
});
}
// 关闭资源
producer.close();
}

}
  • 测试代码

除了在 Kafka 的控制台消费者中接收到消息之外,还可以在 IDE 的控制台看到如下的输出信息:

1
2
3
4
5
Partition : 1
Partition : 1
Partition : 1
Partition : 1
Partition : 1

生产者最佳实践

生产者如何提高吞吐量

参数优化

为了让生产者提高吞吐量(发送消息的效率),可以优化以下几个参数:

  • batch.size:批次大小,默认 16k
  • linger.ms:等待时间,默认 0ms,修改为 5-100ms
  • compression.type:压缩方式,默认是 none,修改过为 snappy
  • RecordAccumulator:缓冲区(双端队列)大小,默认是 32m,修改为 64m

特别注意

    1. 上述的四个参数值并不是设置得越大就越好,设置得过大会导致 Kafka 中的消息被延迟消费。
    1. 当 Topic 的分区数量比较多的时候,可以适当增加 RecordAccumulator(缓冲区) 的大小。
参数说明

示例代码

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-04

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 等待时间(默认 0ms)
properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
// 批次大小(默认 16K),单位是字节
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
// 压缩方式(默认 none)
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// 缓冲区大小(默认 32M),单位是字节
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024);

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息
producer.send(new ProducerRecord<>("test", "hello kafka " + i));
}
// 关闭资源
producer.close();
}

}

生产者如何保证数据可靠性

这里的数据可靠性是指生产者如何保证消息可以发送给 Kafka,并保证 Kafka 可以持久化消息内容。

消息发送流程

ISR 介绍

Leader 维护了一个动态的 in-sync replica set (ISR),意为和 Leader 保持同步的 Follower + Leader 集合(leader: 0,isr: 0, 1, 2)。

ACK 应答原理


数据可靠性总结

特别注意

当 ACK 级别设置为 -1 的时候,虽然可以完全保证数据的可靠性,但会存在数据重复的情况,详细的介绍可以看 这里

示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 设置 ACK 应答级别,默认值是 "all"
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 设置重试次数,默认值是 int 类型的最大值 2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息
producer.send(new ProducerRecord<>("test", "hello kafka " + i));
}
// 关闭资源
producer.close();
}

}

生产者如何处理数据去重

Kafka 从 0.11 版本以后,引入了一项重大特性:幂等性和事务,可用于解决数据重复的问题。

数据重复分析

数据传递语义
  • 至少一次 (At Least Once) = ACK 级别设置为 - 1 + 分区副本大于等于 2 + ISR 里应答的最小副本数量大于等于 2
  • 最多一次 (At Most Once) = ACK 级别设置为 0

总结

  • 至少一次 (At Least Once) 可以保证数据不丢失,但是不能保证数据不重复。
  • 最多一次 (At Most Once) 可以保证数据不重复,但是不能保证数据不丢失。
  • 精确一次 (Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复,也不能丢失。
幂等性使用
幂等性介绍
  • 幂等性就是指 Producer (生产者) 无论向 Broker 发送多少次重复数据,Broker 端都只会持久化一条数据,保证了数据不重复。
  • 精确一次 (Exactly Once) = 幂等性 + 至少一次 (acks = -1 + 分区副本数 >= 2 + ISR 里应答的最小副本数量 >= 2)。
  • 重复数据的判断标准:具有 <PID, Partition, SeqNumber> 相同主键的消息提交时,Broker 只会持久化一条。其中 PID 是 Kafka 每次重启都会分配一个新的,Partition 表示分区号,Sequence Number 是单调自增的序列号,所以幂等性只能保证数据在单分区单会话内不重复,这里的单会话是相对于 Kafka 单次重启来说。如果需要解决多分区多会话的数据重复问题,需要结合幂等性与事务来解决。

幂等性开启

开启幂等性的配置参数是 enable.idempotence,默认值为 true,设置 false 会关闭幂等性。

提示

Kafka 官方文档中的幂等性详细介绍可以看 这里

事务使用

特别注意

  1. 由于事务的底层是基于幂等性,因此在生产者使用事务之前,必须开启幂等性(默认开启)。
  2. 更多关于 Kafka 事务的使用介绍可以看 这里 的教程。
事务原理介绍

事务流程介绍

如上图所示,整个事务流程分为以下几个步骤:

  • 事务初始化:initTransactions
  • 事务启动:beginTransaction
  • 发送消息:一般发送多条消息,可以向 1 个或多个 Topic 发送消息
  • 事务提交:commitTransaction
  • 事务回滚:abortTransaction
  • 消费消息

当 Producer 发送多条事务消息的话:

  • 事务初始化是一次性的
  • 事务开始、发送消息、事务提交 / 回滚则会一直循环运行
事务的常用 API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 1.初始化事务
void initTransactions();

// 2.开启事务
void beginTransaction() throws ProducerFencedException;

// 3.在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;

// 4.提交事务
void commitTransaction() throws ProducerFencedException;

// 5.放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka的连接信息(若是Kafka集群,多个节点之间使用逗号分隔)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.127:9092");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 设置事务 ID(必需)
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_01");

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

// 初始化事务
producer.initTransactions();

// 开启事务
producer.beginTransaction();

try {
// 发送数据
for (int i = 0; i < 5; i++) {
producer.send(new ProducerRecord<>("test", "hello kafka " + i));
}
// 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 放弃事务
producer.abortTransaction();
} finally {
// 关闭资源
producer.close();
}

}

}

生产者如何保证数据有序

数据有序分析

提示

  • 单分区内,数据是有序的(需要符合一定的条件)。
  • 多分区,分区与分区之间无序。
  • 如果要求多分区的数据有序,可以让消费者读取多个分区的数据,并存储在本地内存中,然后对内存中的数据进行排序,最后再进一步统一处理数据。但这会引申出一个问题,也就是消费者需要等待所有分区的数据都读取完了才能进一步处理数据,造成数据的处理效率比较低。

保证数据有序
  • Kafka 在 1.x 版本之前可以保证数据单分区有序,条件如下:

    • max.in.flight.requests.per.connection = 1,不需要考虑是否开启幂等性
  • Kafka 在 1.x 及以后版本可以保证数据单分区有序,条件如下:

    • 未开启幂等性
      • max.in.flight.requests.per.connection 需要设置为 1
    • 开启幂等性
      • max.in.flight.requests.per.connection 需要设置小于等于 5
      • 原因说明:因为在 Kafka 1.x 版本以后,启用幂等性后,Kafka 服务端会缓存 Producer 发来的最近 5 个 Request 的元数据,因此无论如何,都可以保证最近 5 个 Request 的数据都是有序的(如下图所示)