Kafka 2.x 入门教程之八

大纲

前言

学习资源

Kafka 消费者源码分析

提示

  • 在阅读和调试 Kafka 的源码之前,必须先搭建 Kafka 源码的阅读环境,详细教程请看 这里
  • 验证 Kafka 源码阅读成果的两个标志:第一个是能够自行调试源码,第二个是能够在源码上独立开发高阶功能。

消费者的工作原理

消费者的初始化

消费者订阅主题

消费者拉取和处理消息

消费者组的消费流程

拉取和处理消息的流程

消费者提交 Offset

消费者组的初始化流程

消费者的源码分析

消费者的初始化

这里的核心类是 clients 模块下的 KafkaConsumer 类,最核心的是 KafkaConsumer() 构造方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
try {
// 消费者组分区再平衡的配置
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
GroupRebalanceConfig.ProtocolType.CONSUMER);

// 获取消费者组的 ID
this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
// 获取客户端 ID
this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);

LogContext logContext;

// If group.instance.id is set, we will append it to the log context.
if (groupRebalanceConfig.groupInstanceId.isPresent()) {
logContext = new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() +
", clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] ");
} else {
logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] ");
}

this.log = logContext.logger(getClass());
boolean enableAutoCommit = config.maybeOverrideEnableAutoCommit();
groupId.ifPresent(groupIdStr -> {
if (groupIdStr.isEmpty()) {
log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
}
});

log.debug("Initializing the Kafka consumer");
// 等待服务端返回响应的最大等待时间,默认是 30s
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.time = Time.SYSTEM;
this.metrics = buildMetrics(config, time, clientId);
// 重试时间间隔
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);

// 拦截器配置
List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
this.interceptors = new ConsumerInterceptors<>(interceptorList);
// Key 的反序列化配置
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
this.keyDeserializer = keyDeserializer;
}
// Value 的反序列化配置
if (valueDeserializer == null) {
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
this.valueDeserializer = valueDeserializer;
}
// 如果找不到上次消费的偏移量(Offset),从哪个偏移量(Offset)开始消费,可选值:earliest | latest | none,默认值是 latest
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer,
valueDeserializer, metrics.reporters(), interceptorList);
// 获取元数据
this.metadata = new ConsumerMetadata(retryBackoffMs,
config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
// 是否可以消费系统内置主题
!config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
// 是否允许自动创建主题
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
subscriptions, logContext, clusterResourceListeners);
// Kafka 服务端的连接地址
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
this.metadata.bootstrap(addresses);
String metricGrpPrefix = "consumer";

FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
this.isolationLevel = IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry);
// 获取心跳时间,默认值是 3s
int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);

ApiVersions apiVersions = new ApiVersions();
// 创建网络客户端
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
this.metadata,
clientId,
100, // a fixed large enough value will suffice for max in-flight requests
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
config.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
config.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
time,
true,
apiVersions,
throttleTimeSensor,
logContext);
// 创建消费者网络客户端
this.client = new ConsumerNetworkClient(
logContext,
netClient,
metadata,
time,
retryBackoffMs,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
heartbeatIntervalMs); //Will avoid blocking an extended period of time to prevent heartbeat thread starvation

// 获取消费者分区分配策略
this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
);

// no coordinator will be constructed for the default (null) group id
// 创建消费者协调器
this.coordinator = !groupId.isPresent() ? null :
new ConsumerCoordinator(groupRebalanceConfig,
logContext,
this.client,
assignors,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
enableAutoCommit,
// 自动提交 Offset 的时间间隔,默认值是 5s
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
// 客户端从服务端抓取数据的配置
this.fetcher = new Fetcher<>(
logContext,
this.client,
// 一次抓取数据的最小字节数,默认值是 1 个字节
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
// 一次抓取数据的最大字节数,默认值是 50m
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
// 一次抓取数据的最大等待时间,默认 500ms
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
// 每个分区抓取数据的最大字节数,默认 1m
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
// 一次 poll 拉取数据的最大条数,默认是 500
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),
// Key 的反序列化器
this.keyDeserializer,
// Value 的反序列化器
this.valueDeserializer,
this.metadata,
this.subscriptions,
metrics,
metricsRegistry,
this.time,
this.retryBackoffMs,
this.requestTimeoutMs,
isolationLevel,
apiVersions);

this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, metricGrpPrefix);

config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka consumer initialized");
} catch (Throwable t) {
// call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
// we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
if (this.log != null) {
close(0, true);
}
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer", t);
}
}

消费者订阅主题

这里的核心类是 clients 模块下的 KafkaConsumer 类,最核心的是 subscribe() 方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
if (topics == null)
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
if (topics.isEmpty()) {
// treat subscribing to empty topic list as the same as unsubscribing
this.unsubscribe();
} else {
for (String topic : topics) {
if (Utils.isBlank(topic))
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}

throwIfNoAssignorsConfigured();
// 清空订阅异常主题的缓存数据
fetcher.clearBufferedDataForUnassignedTopics(topics);
log.info("Subscribed to topic(s): {}", Utils.join(topics, ", "));
// 判断是否需要更改订阅的主题,如果需要更改主题,则更新元数据信息
if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
metadata.requestUpdateForNewTopics();
}
} finally {
release();
}
}

SubscriptionState 的 subscribe() 方法,代码如下:

1
2
3
4
5
6
7
8
public synchronized boolean subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
// 注册负载均衡监听(比如,在消费者组中,其他消费者退出或挂掉,触发了分区再平衡)
registerRebalanceListener(listener);
// 按照设置的主题开始订阅,自动分配分区
setSubscriptionType(SubscriptionType.AUTO_TOPICS);
// 更改订阅的主题
return changeSubscription(topics);
}

SubscriptionState 的 changeSubscription() 方法,代码如下:

1
2
3
4
5
6
7
8
private boolean changeSubscription(Set<String> topicsToSubscribe) {
// 如果订阅的主题和以前订阅的一致,就不需要更改订阅信息。如果不一致,就需要更改。
if (subscription.equals(topicsToSubscribe))
return false;

subscription = topicsToSubscribe;
return true;
}

Metadata 类的 requestUpdateForNewTopics() 方法,代码如下:

1
2
3
4
5
6
7
8
// 如果订阅的主题和以前订阅的不一致,就需要更新元数据信息
public synchronized int requestUpdateForNewTopics() {
// Override the timestamp of last refresh to let immediate update.
this.lastRefreshMs = 0;
this.needPartialUpdate = true;
this.requestVersion++;
return this.updateVersion;
}

消费者拉取和处理消息

消费者拉取消息

这里的核心类是 clients 模块下的 KafkaConsumer 类,最核心的是 poll() 方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Override
public ConsumerRecords<K, V> poll(final Duration timeout) {
return poll(time.timer(timeout), true);
}

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
// 记录开始拉取消息的时间
this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());

if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}

do {
client.maybeTriggerWakeup();

if (includeMetadataInTimeout) {
// try to update assignment metadata BUT do not need to block on the timer for join group
// 消费者或消费者组初始化
updateAssignmentMetadataIfNeeded(timer, false);
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
log.warn("Still waiting for metadata");
}
}

// 开始拉取消息
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.transmitSends();
}

// 拦截器处理拉取到的消息
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());

return ConsumerRecords.empty();
} finally {
release();
this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
}
}

KafkaConsumer 类的 pollForFetches() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

// if data is available already, return it immediately
// 首先尝试在本地的队列(内存)拉取数据,如果有数据存在,就直接返回
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}

// send any new fetches (won't resend pending fetches)
// 发送请求从服务端抓取数据(一次最多抓取 50m 数据)
fetcher.sendFetches();

// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure

// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}

log.trace("Polling for fetches with timeout {}", pollTimeout);

Timer pollTimer = time.timer(pollTimeout);
client.poll(pollTimer, () -> {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasAvailableFetches();
});
timer.update(pollTimer.currentTimeMs());

// 从本地的队列(内存)拉取数据,一次最多拉取 500 条
return fetcher.fetchedRecords();
}

Fetcher 类的 sendFetches() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public synchronized int sendFetches() {
// Update metrics in case there was an assignment change
sensors.maybeUpdateAssignment(subscriptions);

// 初始化抓取数据的配置参数
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
final FetchRequest.Builder request = FetchRequest.Builder
// 最大等待时间,默认值是 500ms
// 最小抓取的字节数,默认值是 1 个字节
.forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
.isolationLevel(isolationLevel)
// 最大抓取的字节数,默认值是 50m
.setMaxBytes(this.maxBytes)
.metadata(data.metadata())
.toForget(data.toForget())
.rackId(clientRackId);

if (log.isDebugEnabled()) {
log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
}
// 发送拉取数据的请求
RequestFuture<ClientResponse> future = client.send(fetchTarget, request);
// We add the node to the set of nodes with pending fetch requests before adding the
// listener because the future may have been fulfilled on another thread (e.g. during a
// disconnection being handled by the heartbeat thread) which will mean the listener
// will be invoked synchronously.
this.nodesWithPendingFetchRequests.add(entry.getKey().id());
// 监听服务端返回数据
future.addListener(new RequestFutureListener<ClientResponse>() {

// 成功接收服务端返回的数据
@Override
public void onSuccess(ClientResponse resp) {
synchronized (Fetcher.this) {
try {
// 获取服务端返回的数据
FetchResponse response = (FetchResponse) resp.responseBody();
FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler == null) {
log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
fetchTarget.id());
return;
}
if (!handler.handleResponse(response)) {
return;
}

Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);
if (requestData == null) {
String message;
if (data.metadata().isFull()) {
message = MessageFormatter.arrayFormat(
"Response for missing full request partition: partition={}; metadata={}",
new Object[]{partition, data.metadata()}).getMessage();
} else {
message = MessageFormatter.arrayFormat(
"Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}",
new Object[]{partition, data.metadata(), data.toSend(), data.toForget()}).getMessage();
}

// Received fetch response for missing session partition
throw new IllegalStateException(message);
} else {
long fetchOffset = requestData.fetchOffset;
FetchResponseData.PartitionData partitionData = entry.getValue();

log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
isolationLevel, fetchOffset, partition, partitionData);

Iterator<? extends RecordBatch> batches = FetchResponse.recordsOrFail(partitionData).batches().iterator();
short responseVersion = resp.requestHeader().apiVersion();

// 将从服务端抓取到的数据按照分区,添加到本地的队列(内存)里面
completedFetches.add(new CompletedFetch(partition, partitionData,
metricAggregator, batches, fetchOffset, responseVersion));
}
}

sensors.fetchLatency.record(resp.requestLatencyMs());
} finally {
nodesWithPendingFetchRequests.remove(fetchTarget.id());
}
}
}

@Override
public void onFailure(RuntimeException e) {
synchronized (Fetcher.this) {
try {
FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler != null) {
handler.handleError(e);
}
} finally {
nodesWithPendingFetchRequests.remove(fetchTarget.id());
}
}
}
});

}
return fetchRequestMap.size();
}

Fetcher 类的 fetchedRecords() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// 从本地的队列(内存)里拉取数据,一次默认最多拉取 500 条数据
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();
// 一次拉取数据的最大条数,默认值是 500 条
int recordsRemaining = maxPollRecords;

try {
// 循环处理
while (recordsRemaining > 0) {
if (nextInLineFetch == null || nextInLineFetch.isConsumed) {
// 从本地的队列(内存)里拉取数据
// peek() 方法:获取队列的头元素,但不会移除它,如果队列为空,则返回 null,而不会抛出异常
CompletedFetch records = completedFetches.peek();
// 如果本地的队列(内存)里没有数据,直接跳出循环
if (records == null) break;

if (records.notInitialized()) {
try {
nextInLineFetch = initializeCompletedFetch(records);
} catch (Exception e) {
// Remove a completedFetch upon a parse with exception if (1) it contains no records, and
// (2) there are no fetched records with actual content preceding this exception.
// The first condition ensures that the completedFetches is not stuck with the same completedFetch
// in cases such as the TopicAuthorizationException, and the second condition ensures that no
// potential data loss due to an exception in a following record.
FetchResponseData.PartitionData partition = records.partitionData;
if (fetched.isEmpty() && FetchResponse.recordsOrFail(partition).sizeInBytes() == 0) {
completedFetches.poll();
}
throw e;
}
} else {
nextInLineFetch = records;
}
// 从本地的队列(内存)里移除数据
// poll() 方法:移除并返回队头元素,如果队列为空返回 null,并且不会抛出异常
completedFetches.poll();
} else if (subscriptions.isPaused(nextInLineFetch.partition)) {
// when the partition is paused we add the records back to the completedFetches queue instead of draining
// them so that they can be returned on a subsequent poll if the partition is resumed at that time
log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition);
pausedCompletedFetches.add(nextInLineFetch);
nextInLineFetch = null;
} else {
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineFetch, recordsRemaining);

if (!records.isEmpty()) {
TopicPartition partition = nextInLineFetch.partition;
List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
if (currentRecords == null) {
fetched.put(partition, records);
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
fetched.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}
} catch (KafkaException e) {
if (fetched.isEmpty())
throw e;
} finally {
// add any polled completed fetches for paused partitions back to the completed fetches queue to be
// re-evaluated in the next poll
completedFetches.addAll(pausedCompletedFetches);
}

return fetched;
}
消费者处理消息

在 KafkaConsumer 中的 poll() 方法中,有以下一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 开始拉取数据
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.transmitSends();
}

// 拦截器处理拉取得到的数据
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}

ConsumerInterceptors 的 onConsume() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
ConsumerRecords<K, V> interceptRecords = records;
for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
try {
// 拦截器处理拉取得到的数据
interceptRecords = interceptor.onConsume(interceptRecords);
} catch (Exception e) {
// do not propagate interceptor exception, log and continue calling other interceptors
log.warn("Error executing interceptor onConsume callback", e);
}
}
return interceptRecords;
}

消费者提交 Offset

手动同步提交 Offset

这里的核心类是 clients 模块下的 KafkaConsumer 类,最核心的是 commitSync() 方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public void commitSync() {
commitSync(Duration.ofMillis(defaultApiTimeoutMs));
}

@Override
public void commitSync(Duration timeout) {
commitSync(subscriptions.allConsumed(), timeout);
}

@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
offsets.forEach(this::updateLastSeenEpochIfNewer);
// 同步提交 Offset
if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
"committing offsets " + offsets);
}
} finally {
release();
}
}

ConsumerCoordinator 的 commitOffsetsSync() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 同步提交 Offset
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) {
invokeCompletedOffsetCommitCallbacks();

if (offsets.isEmpty())
return true;

// 循环处理
do {
if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
return false;
}

// 发送提交 Offset 的请求
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
// 等待发送提交 Offset 请求的响应结果
client.poll(future, timer);

// We may have had in-flight offset commits when the synchronous commit began. If so, ensure that
// the corresponding callbacks are invoked prior to returning in order to preserve the order that
// the offset commits were applied.
invokeCompletedOffsetCommitCallbacks();

// 提交成功
if (future.succeeded()) {
if (interceptors != null)
interceptors.onCommit(offsets);
return true;
}

if (future.failed() && !future.isRetriable())
throw future.exception();

timer.sleep(rebalanceConfig.retryBackoffMs);
} while (timer.notExpired());

return false;
}
手动异步提交 Offset

这里的核心类是 clients 模块下的 KafkaConsumer 类,最核心的是 commitAsync() 方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public void commitAsync() {
commitAsync(null);
}

@Override
public void commitAsync(OffsetCommitCallback callback) {
commitAsync(subscriptions.allConsumed(), callback);
}

@Override
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
log.debug("Committing offsets: {}", offsets);
offsets.forEach(this::updateLastSeenEpochIfNewer);
// 异步提交 Offset
coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
} finally {
release();
}
}

ConsumerCoordinator 的 commitOffsetsAsync() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();

// 检测 Coordinator 是否存在
if (!coordinatorUnknown()) {
// 如果 Coordinator 存在,异步提交 Offset
doCommitOffsetsAsync(offsets, callback);
} else {
// we don't know the current coordinator, so try to find it and then send the commit
// or fail (we don't want recursive retries which can cause offset commits to arrive
// out of order). Note that there may be multiple offset commits chained to the same
// coordinator lookup request. This is fine because the listeners will be invoked in
// the same order that they were added. Note also that AbstractCoordinator prevents
// multiple concurrent coordinator lookup requests.
pendingAsyncCommits.incrementAndGet();
// 如果 Coordinator 不存在,寻找 Coordinator,并添加监听器
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
pendingAsyncCommits.decrementAndGet();
// 寻找到 Coordinator 后,异步提交 Offset
doCommitOffsetsAsync(offsets, callback);
client.pollNoWakeup();
}

@Override
public void onFailure(RuntimeException e) {
pendingAsyncCommits.decrementAndGet();
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
new RetriableCommitFailedException(e)));
}
});
}

// ensure the commit has a chance to be transmitted (without blocking on its completion).
// Note that commits are treated as heartbeats by the coordinator, so there is no need to
// explicitly allow heartbeats through delayed task execution.
client.pollNoWakeup();
}

private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
// 发送提交 Offset 的请求
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
// 监听提交 Offset 的结果
future.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
// 提交 Offset 成功
if (interceptors != null)
interceptors.onCommit(offsets);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
}

@Override
public void onFailure(RuntimeException e) {
Exception commitException = e;

if (e instanceof RetriableException) {
commitException = new RetriableCommitFailedException(e);
}
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
if (commitException instanceof FencedInstanceIdException) {
asyncCommitFenced.set(true);
}
}
});
}

消费者组的初始化流程

这里的核心类是 clients 模块下的 KafkaConsumer 类,最核心的是 updateAssignmentMetadataIfNeeded() 方法,如下所示:

1
2
3
4
5
6
7
boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
return false;
}

return updateFetchPositions(timer);
}

ConsumerCoordinator 的 poll() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public boolean poll(Timer timer, boolean waitForJoinGroup) {
// 获取最新的元数据
maybeUpdateSubscriptionMetadata();

invokeCompletedOffsetCommitCallbacks();

if (subscriptions.hasAutoAssignedPartitions()) {
if (protocol == null) {
throw new IllegalStateException("User configured " + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG +
" to empty while trying to subscribe for group protocol to auto assign partitions");
}
// Always update the heartbeat last poll time so that the heartbeat thread does not leave the
// group proactively due to application inactivity even if (say) the coordinator cannot be found.
// 发送心跳给 Coordinator(每隔 3s 发送一次)
pollHeartbeat(timer.currentTimeMs());
// 保证消费者客户端和 Coordinator 可以正常通信(寻找服务端的 Coordinator)
if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
return false;
}

// 判断是否需要加入消费者组
if (rejoinNeededOrPending()) {
// due to a race condition between the initial metadata fetch and the initial rebalance,
// we need to ensure that the metadata is fresh before joining initially. This ensures
// that we have matched the pattern against the cluster's topics at least once before joining.
if (subscriptions.hasPatternSubscription()) {
// For consumer group that uses pattern-based subscription, after a topic is created,
// any consumer that discovers the topic after metadata refresh can trigger rebalance
// across the entire consumer group. Multiple rebalances can be triggered after one topic
// creation if consumers refresh metadata at vastly different times. We can significantly
// reduce the number of rebalances caused by single topic creation by asking consumer to
// refresh metadata before re-joining the group as long as the refresh backoff time has
// passed.
if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {
this.metadata.requestUpdate();
}

if (!client.ensureFreshMetadata(timer)) {
return false;
}

maybeUpdateSubscriptionMetadata();
}

// if not wait for join group, we would just use a timer of 0
if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
// since we may use a different timer in the callee, we'd still need
// to update the original timer's current time after the call
timer.update(time.milliseconds());

return false;
}
}
} else {
// For manually assigned partitions, if there are no ready nodes, await metadata.
// If connections to all nodes fail, wakeups triggered while attempting to send fetch
// requests result in polls returning immediately, causing a tight loop of polls. Without
// the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
// awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
// When group management is used, metadata wait is already performed for this scenario as
// coordinator is unknown, hence this check is not required.
if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
client.awaitMetadataUpdate(timer);
}
}

// 是否自动提交 Offset
maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
return true;
}

AbstractCoordinator 的 ensureCoordinatorReady() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
// 如果已经找到 Coordinator,则直接返回
if (!coordinatorUnknown())
return true;

// 如果没有找到 Coordinator,则循环给服务端发送请求,直到找到 Coordinator 为止
do {
if (fatalFindCoordinatorException != null) {
final RuntimeException fatalException = fatalFindCoordinatorException;
fatalFindCoordinatorException = null;
throw fatalException;
}
// 发送寻找 Coordinator 的请求给服务端
final RequestFuture<Void> future = lookupCoordinator();
// 等待发送寻找 Coordinator 请求的响应结果
client.poll(future, timer);

if (!future.isDone()) {
// ran out of time
break;
}

RuntimeException fatalException = null;

if (future.failed()) {
if (future.isRetriable()) {
log.debug("Coordinator discovery failed, refreshing metadata", future.exception());
client.awaitMetadataUpdate(timer);
} else {
fatalException = future.exception();
log.info("FindCoordinator request hit fatal exception", fatalException);
}
} else if (coordinator != null && client.isUnavailable(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
markCoordinatorUnknown("coordinator unavailable");
timer.sleep(rebalanceConfig.retryBackoffMs);
}

clearFindCoordinatorFuture();
if (fatalException != null)
throw fatalException;
} while (coordinatorUnknown() && timer.notExpired());

return !coordinatorUnknown();
}

protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// find a node to ask about the coordinator
Node node = this.client.leastLoadedNode();
if (node == null) {
log.debug("No broker available to send FindCoordinator request");
return RequestFuture.noBrokersAvailable();
} else {
// 发送寻找 Coordinator 的请求给服务端
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
}
return findCoordinatorFuture;
}

private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
// initiate the group metadata request
log.debug("Sending FindCoordinator request to broker {}", node);
// 封装发送请求的数据
FindCoordinatorRequestData data = new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
.setKey(this.rebalanceConfig.groupId);
FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(data);
// 发送寻找 Coordinator 的请求给服务端
return client.send(node, requestBuilder)
.compose(new FindCoordinatorResponseHandler());
}

Kafka 服务端源码分析

提示

  • 在阅读和调试 Kafka 的源码之前,必须先搭建 Kafka 源码的阅读环境,详细教程请看 这里
  • 验证 Kafka 源码阅读成果的两个标志:第一个是能够自行调试源码,第二个是能够在源码上独立开发高阶功能。
  • Kafka 的服务端(Broker)主要是基于 Scala 语言开发的,因此在阅读 Kafka 服务端的源码之前,必须熟练掌握 Scala 语言。

服务端的工作原理

服务端的整体工作流程

服务端的源码分析

服务端的主启动类

Kafka 服务端的主启动类是 core/src/main/scala/kafka/Kafka.scala,在 IntelliJ IDEA 中打开该主启动类,并启动 main() 方法即可运行服务端(前提是 Kafka 的源码阅读环境已经搭建好),如下图所示:

Kafka 服务端主启动类中的 main() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def main(args: Array[String]): Unit = {
try {
// 获取服务端相关的配置参数
val serverProps = getPropsFromArgs(args)
// 配置服务端
val server = buildServer(serverProps)

try {
if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
new LoggingSignalHandler().register()
} catch {
case e: ReflectiveOperationException =>
warn("Failed to register optional signal handler that logs a message when the process is terminated " +
s"by a signal. Reason for registration failure is: $e", e)
}

// attach shutdown handler to catch terminating signals as well as normal termination
Exit.addShutdownHook("kafka-shutdown-hook", {
try server.shutdown()
catch {
case _: Throwable =>
fatal("Halting Kafka.")
// Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
Exit.halt(1)
}
})

// 启动服务端
try server.startup()
catch {
case _: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)
}

server.awaitShutdown()
}
catch {
case e: Throwable =>
fatal("Exiting Kafka due to fatal exception", e)
Exit.exit(1)
}
Exit.exit(0)
}