Kafka 2.x 入门教程之七

大纲

前言

学习资源

Kafka-KRaft 模式

KRaft 模式的概述

  • 左图为 Kafka 的 ZooKeeer 模式架构,元数据存储在 Zookeeper 中,运行时会动态选举一个 Broker 节点作为 Controller(唯一),由 Controller 进行 Kafka 集群管理。
  • 左图为 Kafka 的 KRaft 模式架构,不再依赖 Zookeeper 集群,而是使用多个 Controller 节点来代替 Zookeeper,元数据保存在 Controller 中,由 Controller 直接管理 Kafka 集群。
特性 ZooKeeper 模式 KRaft 模式
Leader 选举依赖性需要 ZooKeeper 提供元数据存储和通知机制不依赖 ZooKeeper,完全有 Kafka 自己自行实现
元数据管理由 ZooKeeper 管理由 Kafka 自己的 Raft 集群管理
一致性保障 ZooKeeper 提供一致性 Raft 协议保障一致性
引入版本 Kafka 早期版本(默认是 ZooKeeper 模式)Kafka 2.8.0+(KRaft 模式)

版本说明

从 Kafka 2.8.0 版本开始,Kafka 自身实现了 Raft 分布式一致性机制,这意味着 Kafka 集群可以脱离 ZooKeeper 独立运行。

KRaft 模式的优势

  • Kafka 不再依赖外部服务,而是能够独立运行。
  • Controller 管理集群时,不再需要从 Zookeeper 中先读取数据,提高了集群性能。
  • 由于不依赖 Zookeeper,因此 Kafka 集群扩展时不再受到 Zookeeper 读写能力的限制。
  • Controller 节点不再是通过动态选举来决定,而是支持由配置文件指定,这样开发者可以有针对性地加强。
  • Controller 节点支持通过配置来指定,而不是像以前一样对随机 Controller 节点的高负载束手无策。

Kafka 选举的概念

ZooKeeper 模式下的选举

在 ZooKeeper 模式下,Controller 的责职

  • Controller 负责管理和分发 Kafka 集群的元数据信息,例如 Topic、分区和副本的状态。
  • Controller 负责管理集群 Broker 的上下线,包括所有 Topic 的分区副本分配和分区副本 Leader 选举等工作。
  • 当分区的 Leader 副本失效时,Controller 负责触发分区副本的 Leader 选举,并将新的 Leader 信息更新到 ZooKeeper 和其他 Broker。
  • Controller 的选举

    • 选举机制
      • Kafka 集群的 Controller 是通过 ZooKeeper 进行选举的。
      • 在 Broker 启动时,每个 Broker 都会尝试在 ZooKeeper 的 /controller 节点上创建一个临时节点。
      • 成功创建该节点的 Broker 就成为 Controller,其它 Broker 则成为普通 Broker。
    • 触发条件
      • 如果当前的 Controller 因故障宕机(例如网络中断),ZooKeeper 会删除该 Controller 所在 Broker 的临时节点,然后触发新的 Controller 选举。
  • 分区副本的 Leader 选举

    • 选举机制
      • 由 Controller 从 ZooKeeper 获取 ISR 列表,选出新的分区副本 Leader 并更新元数据。
      • 优先从 ISR 列表中选择健康的副本作为 Leader。
    • 触发条件
      • 当分区的 Leader 副本失效(如 Broker 宕机)。
      • Controller 检测到 ISR(同步副本列表)发生变化。
    • 更新通知
      • 选举完成后,元数据更新至所有 Broker,客户端根据新 Leader 继续读写操作。
  • 两种选举的区别

    • Controller 的选举:由 ZooKeeper 决定,目的是选出集群的管理者。
    • 分区副本的 Leader 选举:由 Controller 负责发起,目的是为每个分区在 ISR 列表中选出一个新的 Leader 副本。

特别注意

  • 在 ZooKeeper 模式下,Kafka 集群中的 Controller 是通过 ZooKeeper 选举产生的,集群中同一时刻只能有一个活跃的 Controller(唯一)。其他 Broker 则处于非 Controller 状态,只处理分区的读写请求。
  • 在 ZooKeeper 模式下,Kafka 集群中同一时刻只能有一个活跃的 Controller(唯一),这种单点的设计简化了管理,但也增加了 Controller 故障时的切换开销和延迟。
  • 在 KRaft 模式下,Kafka 使用 Raft 协议来支持运行多个 Quorum Controller 节点,从而提高了可用性和一致性。

KRaft 模式下的选举

在 KRaft 模式下,Controller 的责职

  • Controller 是负责管理元数据的专用节点。
  • Quorum Controller 是一个逻辑角色,可以分布在多个 Kafka Broker 上。
  • Quorum Controller 集群是通过 Raft 共识协议来保证元数据的一致性和高可用性的。
  • Controller 的选举

    • 在 KRaft 模式下,Controller 的选举是通过 Raft 共识算法完成的,而不再依赖 ZooKeeper。
    • 在多个 Controller 节点中,会有一个被选举为 Leader Controller,负责处理元数据更新和变更请求。
    • 其他 Controller 节点(Follower Controller)则通过 Raft 协议复制 Leader Controller 的元数据日志。
  • 分区副本的 Leader 选举

    • Controller 使用 Raft 协议直接管理元数据并选举分区副本 Leader,无需依赖 ZooKeeper。
    • Controller 根据副本状态(如 ISR 列表)决定新的分区副本 Leader。
    • 元数据变更后,通过 Raft 协议同步到所有 Quorum Controller 节点,无需依赖 ZooKeeper。

Kafka 生产者源码分析

提示

  • 在阅读和调试 Kafka 的源码之前,必须先搭建 Kafka 源码的阅读环境,详细教程请看 这里
  • 验证 Kafka 源码阅读成果的两个标志:第一个是能够自行调试源码,第二个是能够在源码上独立开发高阶功能。

生产者的工作原理

发送消息的流程

Main 线程的初始化

Sender 线程的初始化

发送数据到缓冲区

Sender 线程发送数据

生产者的源码分析

Main 线程的初始化

这里的核心类是 clients 模块下的 KafkaProducer 类,最核心的是 KafkaProducer() 构造方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
Time time) {
try {
this.producerConfig = config;
this.time = time;

// 获取事务 ID
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);

// 获取客户端 ID
this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);

LogContext logContext;
if (transactionalId == null)
logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
else
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
log = logContext.logger(KafkaProducer.class);
log.trace("Starting the Kafka producer");

Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));

// JMX 监控相关的配置
JmxReporter jmxReporter = new JmxReporter();
jmxReporter.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)));
reporters.add(jmxReporter);
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
// 分区器的配置
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
// 重试时间间隔的配置,默认值是 100ms
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
// 序列化的配置
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
}

// 拦截器的配置
List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
if (interceptors != null)
this.interceptors = interceptors;
else
this.interceptors = new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
valueSerializer, interceptorList, reporters);
// 生产者发送给 Kafka 的单条消息的最大大小,默认值是 1m
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
// 缓冲区大小,默认值是 32m
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
int deliveryTimeoutMs = configureDeliveryTimeout(config, log);

this.apiVersions = new ApiVersions();
this.transactionManager = configureTransactionState(config, logContext);

// 初始化 RecordAccumulator 缓冲区
// batch.size,默认值是 16k
// compression.type,默认值是 none
// linger.ms,默认值是 0
// retry.backoff.ms,默认值是 100ms
// delivery.timeout.ms,默认值是 2 分钟
// request.timeout.ms,默认值是 30s
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));

// ZooKeeper 集群地址
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));

// 获取元数据
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new ProducerMetadata(retryBackoffMs,
// metadata.max.age.ms,生产者每隔多久需要更新一次元数据,默认值是 5 分钟
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
// metadata.max.idle.ms,网络最大空闲时间设置,超过该阈值,就关闭该网络,默认值是 5 分钟
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
this.metadata.bootstrap(addresses);
}
this.errors = this.metrics.sensor("errors");
// 初始化 Sender 线程
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
// 启动 Sender 线程
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(Duration.ofMillis(0), true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}

Sender 线程的初始化

这里的核心类是 clients 模块下的 KafkaProducer 类,最核心的是 newSender() 方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
// maxInflightRequests 没有返回 ACK 的请求的最大数量,默认值是 5
int maxInflightRequests = configureInflightRequests(producerConfig);
// request.timeout.ms,默认值是 30s
int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);

KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder, logContext),
metadata,
clientId,
// maxInflightRequests 没有返回 ACK 的请求的最大数量,默认值是 5
// reconnect.backoff.ms 重连时间间隔,默认值是 50ms
// reconnect.backoff.max.ms 重连的总时间。每次重连失败时,呈指数增加重连时间,直至达到此最大值,默认值是 1000ms
// send.buffer.bytes Socket 发送数据的缓冲区大小,默认值是 128k
// receive.buffer.bytes Socket 接收数据的缓冲区大小,默认值是 32k
// request.timeout.ms,默认值是 30s
// socket.connection.setup.timeout.ms 生产者和服务器通信连接建立的时间。如果在超时之前没有建立连接,将关闭通信,默认值是 10s
// socket.connection.setup.timeout.max.ms 生产者和服务器通信,每次连续连接失败时,连接建立超时将呈指数增加,直至达到此最大值,默认值是 30s
maxInflightRequests,
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
time,
true,
apiVersions,
throttleTimeSensor,
logContext);
// acks,默认值是 -1
// acks=0,生产者发送给 Kafka 服务器后,不需要等待应答
// acks=1,生产者发送给 Kafka 服务器后,需要等待 Leader 接收后应答
// acks=-1(all),生产者发送给 Kafka 服务器后,需要等待 Leader 和 ISR 队列中的所有 Follower 接收后应答
short acks = configureAcks(producerConfig, log);
return new Sender(logContext,
client,
metadata,
this.accumulator,
maxInflightRequests == 1,
// 生产者发送给 Kafka 的单条消息的最大大小,默认值是 1m
producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
// retries 失败重试次数,默认值是 Int 的最大值
producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
metricsRegistry.senderMetrics,
time,
requestTimeoutMs,
// retry.backoff.ms 失败重试的时间间隔,默认值 100ms
producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
}

Sendler 类实现了 Runnable 接口,Main 线程会将 Sender 对象放到一个线程(KafkaThread 类)中启动。Sender 类的 run() 方法如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");

// main loop, runs until close is called
while (running) {
try {
// Sender 线程从缓冲区拉取数据,然后将数据发送给 Kafka,刚启动时会拉取不到数据
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for acknowledgment,
// wait until these are completed.
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

// Abort the transaction if any commit or abort didn't go through the transaction manager's queue
while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

if (forceClose) {
// We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on
// the futures.
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced shutdown");
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}

log.debug("Shutdown of Kafka producer I/O thread has completed.");
}

Main 线程发送数据到缓冲区

发送总体流程

这里的核心类是 clients 模块下的 KafkaProducer 类,最核心的是 send() 方法,如下所示:

1
2
3
4
5
6
7
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// 拦截器处理待发送的数据
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
// 发送数据给 Kafka
return doSend(interceptedRecord, callback);
}

send() 会调用 onSend() 方法让拦截器处理待发送的数据,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
// 拦截器处理
interceptRecord = interceptor.onSend(interceptRecord);
} catch (Exception e) {
// do not propagate interceptor exception, log and continue calling other interceptors
// be careful not to throw exception from here
if (record != null)
log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
else
log.warn("Error executing interceptor onSend callback", e);
}
}
return interceptRecord;
}

send() 会调用 doSend() 方法将数据发送给 Kafka,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
long nowMs = time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;
try {
// 拉取元数据,maxBlockTimeMs 表示最多能等待多长时间
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
// 剩余等待时间 = 最多能等待的时间 - 用了多少时间
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
// 更新集群的元数据
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
// 序列化操作
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
// 分区操作(根据集群的元数据)
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);

setReadOnly(record.headers());
Header[] headers = record.headers().toArray();

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
// 校验发送消息的大小是否超过最大值,默认的最大值是 1m
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
if (log.isTraceEnabled()) {
log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
}
// 消息发送的回调
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != null && transactionManager.isTransactional()) {
transactionManager.failIfNotReadyForSend();
}
// 缓冲区(默认大小是 32m),里面是默认 16k 一个批次
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}

if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);

// 当批次满了或者创建了一个新的批次,则唤醒 Sender 线程发送数据给 Kafka
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
分区的选择

这里的核心类是 clients 模块下的 KafkaProducer 类,最核心的是 partition() 方法,如下所示:

1
2
3
4
5
6
7
8
9
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
// 如果指定了分区号,那么就直接使用该分区号
Integer partition = record.partition();
return partition != null ?
partition :
// 分区器选择分区
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

分区器都实现了 Partitioner 接口,默认的分区器实现类是 DefaultPartitioner,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class DefaultPartitioner implements Partitioner {

private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

public void configure(Map<String, ?> configs) {}

/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}

/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param numPartitions The number of partitions of the given {@code topic}
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
// 如果发送数据时没有指定 Key
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// 如果发送数据时有指定 Key,就按照 Key 的 Hash 值对分区数量取模
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

......
}

如果在发送数据时没有指定 Key,那么会调用 StickyPartitionCache.partition() 方法来获取分区号,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class StickyPartitionCache {
private final ConcurrentMap<String, Integer> indexCache;
public StickyPartitionCache() {
this.indexCache = new ConcurrentHashMap<>();
}

public int partition(String topic, Cluster cluster) {
Integer part = indexCache.get(topic);
if (part == null) {
return nextPartition(topic, cluster, -1);
}
return part;
}

public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// Check that the current sticky partition for the topic is either not set or that the partition that
// triggered the new batch matches the sticky partition that needs to be changed.
if (oldPart == null || oldPart == prevPartition) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
while (newPart == null || newPart.equals(oldPart)) {
int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}

}

Kafka 的分区策略

  • (1) 在指明 partition 的情况下,直接将指明的值作为 partition 值。例如:partition=0,那么数据会被写入分区 0。
  • (2) 在没有指明 partition 值,但有指定 key 的情况下,将 key 的 Hash 值与 topicpartition 数进行取余来得到 partition 值。例如:key 的 Hash 值是 5,topicpartition 数是 2,那么 key 对应的 value 会被写入 1 号分区。
  • (3) 在既没有指明 partition 值,又没有指定 key 的情况下,Kafka 会采用 Sticky Partition 黏性分区器,也就是会随机选择一个分区,并尽可能一直使用该分区,等该分区的 batch 已满或者已完成,Kafka 再随机一个分区进行使用(和上一次选的分区不同)。例如:第一次随机选择 0 号分区,等 0 号分区当前批次满了(默认 16K 大小)或者 linger.ms 设置的时间到了,Kafka 会再随机选择一个分区进行使用(如果还是 0 分区会继续随机选择一个分区)。
发送数据大小校验

这里的核心类是 clients 模块下的 KafkaProducer 类,最核心的是 ensureValidRecordSize() 方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
private void ensureValidRecordSize(int size) {
// 一次请求发送消息的最大大小,默认是 1m
if (size > maxRequestSize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than " + maxRequestSize + ", which is the value of the " +
ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
// 缓冲区的大小,默认是 32m
if (size > totalMemorySize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the total memory buffer you have configured with the " +
ProducerConfig.BUFFER_MEMORY_CONFIG +
" configuration.");
}
内存池的使用

clients 模块下的 KafkaProducer 类的 doSend() 方法中,有以下一段代码:

1
2
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

RecordAccumulator 类的 append() 方法的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// 为每个分区,创建或者获取一个双端队列
// check if we have an in-progress batch
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 尝试往双端队列里面添加数据(第一次添加数据时,因为没有分配内存、批次对象,所以添加失败)
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
return appendResult;
}

// we don't have an in-progress record batch try to allocate a new batch
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true);
}

byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
// 取批次大小(默认值是 16k)和消息大小两者中的最大值(上限默认是 1m),这样设计的主要原因是有可能一条消息的大小大于批次大小
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
// 根据批次大小(默认值是 16k)和消息大小两者中的最大值(上限默认是 1m)分配内存
buffer = free.allocate(size, maxTimeToBlock);

// Update the current time in case the buffer allocation blocked above.
nowMs = time.milliseconds();
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");

// 尝试往双端队列里面添加数据(有内存,但是没有批次对象)
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}

MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
// 根据内存大小封装批次(有内存、有批次对象)
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
// 尝试往批次里面添加数据
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, nowMs));
// 将新创建的批次放到双端队列的末尾
dq.addLast(batch);
incomplete.add(batch);

// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}

RecordAccumulator 类的 getOrCreateDeque() 方法的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
// 获取分区对应的双端队列
Deque<ProducerBatch> d = this.batches.get(tp);
if (d != null)
return d;
d = new ArrayDeque<>();
Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
if (previous == null)
return d;
else
return previous;
}

Sender 线程发送数据到服务端

clients 模块下的 KafkaProducer 类的 doSend() 方法中,有以下一段代码:

1
2
3
4
5
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
// 唤醒 Sender 线程
this.sender.wakeup();
}

Sendler 类实现了 Runnable 接口,查看 Sender 类的 run() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");

// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for acknowledgment,
// wait until these are completed.
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

......
}

Sender 类的 runOnce() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void runOnce() {
// 如果是事务操作
if (transactionManager != null) {
try {
transactionManager.maybeResolveSequences();

// do not continue sending if the transaction manager is in a failed state
if (transactionManager.hasFatalError()) {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, time.milliseconds());
return;
}

// Check whether we need a new producerId. If so, we will enqueue an InitProducerId
// request which will be sent below
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();

if (maybeSendAndPollTransactionalRequest()) {
return;
}
} catch (AuthenticationException e) {
// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request", e);
transactionManager.authenticationFailed(e);
}
}

long currentTimeMs = time.milliseconds();
// 将准备好的数据发送给 Kafka
long pollTimeout = sendProducerData(currentTimeMs);
// 等待发送的响应结果
client.poll(pollTimeout, currentTimeMs);
}

Sender 类的 sendProducerData() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
private long sendProducerData(long now) {
// 获取元数据
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
// 检查缓冲区(默认大小是 32m)的数据是否准备好
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

// 如果 Leader 信息不知道,是不能发送数据的
// if there are any partitions whose leaders are not known yet, force metadata update
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic, now);

log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}

// 删除没有准备好的节点(不往这些节点发送数据)
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}

// 将发往同一个 Broker 节点的数据,封装为一个请求批次
// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}

accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
// we need to reset the producer id here.
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch);
}
}
sensors.updateProduceRequestMetrics(batches);

// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
// time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
// sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
// that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout = 0;
}
// 发送请求
sendProduceRequests(batches, now);
return pollTimeout;
}

RecordAccumulator 类的 ready() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 检查缓冲区(默认大小是 32m)的数据是否准备好
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();

boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
Deque<ProducerBatch> deque = entry.getValue();
synchronized (deque) {
// When producing to a large number of partitions, this path is hot and deques are often empty.
// We check whether a batch exists first to avoid the more expensive checks whenever possible.
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
TopicPartition part = entry.getKey();
Node leader = cluster.leaderFor(part);
if (leader == null) {
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !isMuted(part)) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
// 如果不是第一次拉取该批次的数据,且等待时间没有超过重试时间间隔,那么 backingOff=true
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
// 如果 backingOff=true,则返回重试时间间隔(retry.backoff.ms),如果不是重试,则选择 linger.ms
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() > 1 || batch.isFull();
// 如果等待的时间超过了 timeToWaitMs,那么 expired=true,表示可以发送数据
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();
boolean sendable = full
|| expired
|| exhausted
|| closed
|| flushInProgress()
|| transactionCompleting;
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

RecordAccumulator 类的 drain() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
// 将发往同一个 Broker 节点的数据,封装为一个请求批次
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();

Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
batches.put(node.id(), ready);
}
return batches;
}

Sender 类的 sendProduceRequest() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* Transfer the record batches into a list of produce requests on a per-node basis
*/
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}

/**
* Create a produce request from the given record batches
*/
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;

final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

// find the minimum magic version used when creating the record sets
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();

// down convert if necessary to the minimum magic used. In general, there can be a delay between the time
// that the producer starts building the batch and the time that we send the request, and we may have
// chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
// the new message format, but found that the broker didn't support it, so we need to down-convert on the
// client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
// not all support the same message format version. For example, if a partition migrates from a broker
// which is supporting the new magic version to one which doesn't, then we will need to convert.
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
if (tpData == null) {
tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
tpd.add(tpData);
}
tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition())
.setRecords(records));
recordsByPartition.put(tp, batch);
}

String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}

ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
new ProduceRequestData()
.setAcks(acks)
.setTimeoutMs(timeout)
.setTransactionalId(transactionalId)
.setTopicData(tpd));
RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());

String nodeId = Integer.toString(destination);
// 创建发送请求对象
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
// 发送请求
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

NetworkClient 的 send() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/**
* Queue up the given request for sending. Requests can only be sent out to ready nodes.
* @param request The request
* @param now The current timestamp
*/
@Override
public void send(ClientRequest request, long now) {
doSend(request, false, now);
}

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
ensureActive();
String nodeId = clientRequest.destination();
if (!isInternalRequest) {
// If this request came from outside the NetworkClient, validate
// that we can send data. If the request is internal, we trust
// that internal code has done this validation. Validation
// will be slightly different for some internal requests (for
// example, ApiVersionsRequests can be sent prior to being in
// READY state.)
if (!canSendRequest(nodeId, now))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
}
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
try {
NodeApiVersions versionInfo = apiVersions.get(nodeId);
short version;
// Note: if versionInfo is null, we have no server version information. This would be
// the case when sending the initial ApiVersionRequest which fetches the version
// information itself. It is also the case when discoverBrokerVersions is set to false.
if (versionInfo == null) {
version = builder.latestAllowedVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending {} with correlation id {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
} else {
version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
}
// The call to build may also throw UnsupportedVersionException, if there are essential
// fields that cannot be represented in the chosen version.
// 发送请求
doSend(clientRequest, isInternalRequest, now, builder.build(version));
} catch (UnsupportedVersionException unsupportedVersionException) {
// If the version is not supported, skip sending the request over the wire.
// Instead, simply add it to the local queue of aborted requests.
log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,
clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException);
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, unsupportedVersionException, null, null);

if (!isInternalRequest)
abortedSends.add(clientResponse);
else if (clientRequest.apiKey() == ApiKeys.METADATA)
metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));
}
}

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
log.debug("Sending {} request with header {} and timeout {} to node {}: {}",
clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);
}
Send send = request.toSend(header);
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
// 添加发送请求到 inFlight
this.inFlightRequests.add(inFlightRequest);
// 发送数据
selector.send(new NetworkSend(clientRequest.destination(), send));
}

NetworkClient 的 poll() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 获取发送的响应结果
@Override
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();

if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}

long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}

// process completed actions
// 获取发送后的响应结果
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutConnections(responses, updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);

return responses;
}