Kafka 入门教程之二

Kafka 生产者

生产者消息发送流程

生产者消息发送原理

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程 — main 线程和 Sender 线程。在 main 线程中,会创建一个双端队列 RecordAccumulator。值得一提的是,main 线程将消息发送给 RecordAccumulator 时,Sender 线程会不断从 RecordAccumulator 中拉取消息并发送到 Kafka Broker。

生产者重要参数列表

生产者异步发送 API

普通的异步发送

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-01

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
  • Java 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka集群的连接信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息
producer.send(new ProducerRecord<>("test", "hello kafka " + i));
}
// 关闭资源
producer.close();
}

}
  • 测试代码

第一步:启动 Kafka 的控制台消费者:

1
# ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test

第二步:在 IDE 工具中执行代码,观察控制台消费者中是否接收到消息,如下所示:

1
2
3
4
5
hello kafka 0
hello kafka 1
hello kafka 2
hello kafka 3
hello kafka 4

带回调函数的异步发送

回调方法会在 Producer 收到 ack 时调用,且为异步调用;该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception)。如果 Exceptionnull,则说明消息发送成功,如果 Exception 不为 null,则说明消息发送失败。值得一提的是,消息发送失败会自动重试发送,不需要在回调函数中手动重试发送。

  • Java 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CustomerProducer2 {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka集群的连接信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 异步发送消息(带回调函数)
producer.send(new ProducerRecord<>("test", "hello kafka " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception == null) {
System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition());
}
}
});
}
// 关闭资源
producer.close();
}

}
  • 测试代码

除了在 Kafka 的控制台消费者中接收到消息之外,还可以在 IDE 的控制台看到如下的输出信息:

1
2
3
4
5
topic: test, partition: 0
topic: test, partition: 0
topic: test, partition: 0
topic: test, partition: 0
topic: test, partition: 0

生产者同步发送 API

提示

本节所需的案例代码,可以直接从 GitHub 下载对应章节 kafka-lesson-02

普通的同步发送

同步发送的意思就是,当一条消息发送之后,会阻塞当前线程,直至收到 ack 应答。由于 send() 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,只需调用 Future 对象的 get() 方法即可实现同步发送。

  • Maven 依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
  • Java 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class CustomerProducer {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka集群的连接信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 同步发送消息
try {
producer.send(new ProducerRecord<>("test", "hello kafka " + i)).get();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭资源
producer.close();
}

}
  • 测试代码

第一步:启动 Kafka 的控制台消费者:

1
# ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test

第二步:在 IDE 工具中执行代码,观察控制台消费者中是否接收到消息,如下所示:

1
2
3
4
5
hello kafka 0
hello kafka 1
hello kafka 2
hello kafka 3
hello kafka 4

带回调函数的同步发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CustomerProducer2 {

public static void main(String[] args) {
Properties properties = new Properties();
// 指定Kafka集群的连接信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093");
// 指定序列化器(必需)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 同步发送消息(带回调函数)
try {
producer.send(new ProducerRecord<>("test", "hello kafka " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception == null) {
System.out.println("topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition());
}
}
}).get();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭资源
producer.close();
}

}
  • 测试代码

除了在 Kafka 的控制台消费者中接收到消息之外,还可以在 IDE 的控制台看到如下的输出信息:

1
2
3
4
5
topic: test, partition: 0
topic: test, partition: 2
topic: test, partition: 0
topic: test, partition: 1
topic: test, partition: 2