一 提交offset
提交offset的功能是由ConsumerCoordinator实现的,它会发送OffsetCommitRequest请求和接受OffsetCommitResponse响应
我们先分析一下OffsetCommitRequest和OffsetCommitResponse消息体格式
OffsetCommitRequest:
group_id: 消费者组的id
group_generation_id: 消费者组保存的generation信息
member_id: GroupCoordinator分配给消费者的id
retention_time: 该offset的最长保存时间
topic: topic名称
partition: 分区编号
offset: 提交消息的offset
metadata: 希望与offset一起保存的自定义数据
OffsetCommitResponse:
topic: topic名称
partition: 分区编号
error_code: 错误码
提交Offset的流程:
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();
// 如果GroupCoordinator已知,那么就开始提交
if (!coordinatorUnknown()) {
doCommitOffsetsAsync(offsets, callback);
} else {
// 查找到GroupCoordinator再进行提交
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
doCommitOffsetsAsync(offsets, callback);
}
@Override
public void onFailure(RuntimeException e) {
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));
}
});
}
client.pollNoWakeup();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) { // 将needsFetchCommittedOffsets置为true this.subscriptions.needRefreshCommits(); // 发送OffsetCommitRequest请求 RequestFuture<Void> future = sendOffsetCommitRequest(offsets); final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; // 对RequestFuture添加监听器,如果成功则completedOffsetCommits添加OffsetCommitCompletion // 如果失败也往completedOffsetCommits添加OffsetCommitCompletion,但是会带有异常 future.addListener(new RequestFutureListener<Void>() { @Override public void onSuccess(Void value) { if (interceptors != null) interceptors.onCommit(offsets); completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null)); } @Override public void onFailure(RuntimeException e) { Exception commitException = e; if (e instanceof RetriableException) commitException = new RetriableCommitFailedException(e); completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException)); } }); }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) { if (offsets.isEmpty()) return RequestFuture.voidSuccess(); // 获取GroupCoordinator Node coordinator = coordinator(); if (coordinator == null) return RequestFuture.coordinatorNotAvailable(); // 遍历offsets,并且往offsetData里put数据 Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size()); for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { OffsetAndMetadata offsetAndMetadata = entry.getValue(); offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData( offsetAndMetadata.offset(), offsetAndMetadata.metadata())); } // 获取generation信息 final Generation generation; if (subscriptions.partitionsAutoAssigned()) generation = generation(); else generation = Generation.NO_GENERATION; // if the generation is null, we are not part of an active group (and we expect to be). // the only thing we can do is fail the commit and let the user rejoin the group in poll() if (generation == null) return RequestFuture.failure(new CommitFailedException()); // 构建OffsetCommitRequest OffsetCommitRequest req = new OffsetCommitRequest( this.groupId, generation.generationId, generation.memberId, OffsetCommitRequest.DEFAULT_RETENTION_TIME, offsetData); log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId); // 将请求放到unsent集合等待被发送 return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req) .compose(new OffsetCommitResponseHandler(offsets)); }
二 获取offset
在rebalance操作结束之后,每一个消费者都确定了其需要的分区。在开始消费之前,消费者需要确定拉取消息的起始位置。此时消费者可以通过OffsetFetchRequest请求获取上次提交的offset并从此开始消费
OffsetFetchRequest和OffsetFetchResponse消息体格式:
获取offset源码分析:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) { while (true) { // 确认已经准备好接受请求,否则一直阻塞 ensureCoordinatorReady(); // 发送OffsetFetchRequest请求 RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions); // 阻塞发送OffsetFetchRequest请求 client.poll(future); if (future.succeeded()) return future.value(); // 返回从服务器端得到的offset的值 if (!future.isRetriable()) throw future.exception(); time.sleep(retryBackoffMs);// 否则等待重试 } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) { // 获取GroupCoordinator Node coordinator = coordinator(); if (coordinator == null) return RequestFuture.coordinatorNotAvailable(); log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions); // 构建OffsetFetchRequest请求 OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<>(partitions)); // 发送请求到unsent请求队列,等待被发送 return client.send(coordinator, ApiKeys.OFFSET_FETCH, request) .compose(new OffsetFetchResponseHandler()); }
1处理OffsetFetchResponse:在返回RequestFuture的时候,会通过OffsetFetchResponseHandler对OffsetFetchResponse做一些处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) { // 构造一个<TopicPartition, OffsetAndMetadata> 集合offsets Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size()); // 遍历返回的相应的数据 for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) { TopicPartition tp = entry.getKey(); OffsetFetchResponse.PartitionData data = entry.getValue(); if (data.hasError()) { Errors error = Errors.forCode(data.errorCode); log.debug("Group {} failed to fetch offset for partition {}: {}", groupId, tp, error.message()); if (error == Errors.GROUP_LOAD_IN_PROGRESS) { // just retry future.raise(error); } else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) { // re-discover the coordinator and retry coordinatorDead(); future.raise(error); } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic")); } else { future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message())); } return; } else if (data.offset >= 0) {// 如果没有错误则把数据放到offsets集合里 // record the position with the offset (-1 indicates no committed offset to fetch) offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata)); } else { log.debug("Group {} has no committed offset for partition {}", groupId, tp); } } // 传播offset集合最终会通过fetchCommitedOffsets取到offset的值 future.complete(offsets); } }
最后
以上就是直率项链最近收集整理的关于offset分析一 提交offset二 获取offset的全部内容,更多相关offset分析一内容请搜索靠谱客的其他文章。
发表评论 取消回复