我是靠谱客的博主 直率项链,这篇文章主要介绍offset分析一 提交offset二 获取offset,现在分享给大家,希望可以做个参考。

一 提交offset

提交offset的功能是由ConsumerCoordinator实现的,它会发送OffsetCommitRequest请求和接受OffsetCommitResponse响应

我们先分析一下OffsetCommitRequest和OffsetCommitResponse消息体格式


OffsetCommitRequest:

group_id: 消费者组的id

group_generation_id: 消费者组保存的generation信息

member_id: GroupCoordinator分配给消费者的id

retention_time: 该offset的最长保存时间

topic: topic名称

partition: 分区编号

offset: 提交消息的offset

metadata: 希望与offset一起保存的自定义数据



OffsetCommitResponse:

topic: topic名称

partition: 分区编号

error_code: 错误码

 

提交Offset的流程:

public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
   invokeCompletedOffsetCommitCallbacks();
    // 如果GroupCoordinator已知,那么就开始提交
   
if (!coordinatorUnknown()) {
        doCommitOffsetsAsync(offsets, callback);
    } else {
        // 查找到GroupCoordinator再进行提交
       
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
            @Override
           
public void onSuccess(Void value) {
                doCommitOffsetsAsync(offsets, callback);
            }

            @Override
           
public void onFailure(RuntimeException e) {
                completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));
            }
        });
    }
    client.pollNoWakeup();
}

 

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {     // needsFetchCommittedOffsets置为true     this.subscriptions.needRefreshCommits();     // 发送OffsetCommitRequest请求     RequestFuture<Void> future = sendOffsetCommitRequest(offsets);     final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;     // RequestFuture添加监听器,如果成功则completedOffsetCommits添加OffsetCommitCompletion     // 如果失败也往completedOffsetCommits添加OffsetCommitCompletion,但是会带有异常     future.addListener(new RequestFutureListener<Void>() {         @Override         public void onSuccess(Void value) {             if (interceptors != null)                 interceptors.onCommit(offsets);             completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));         }         @Override         public void onFailure(RuntimeException e) {             Exception commitException = e;             if (e instanceof RetriableException)                 commitException = new RetriableCommitFailedException(e);             completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));         }     }); }

 

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {     if (offsets.isEmpty())         return RequestFuture.voidSuccess();     // 获取GroupCoordinator     Node coordinator = coordinator();     if (coordinator == null)         return RequestFuture.coordinatorNotAvailable();     // 遍历offsets,并且往offsetDataput数据     Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());     for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {         OffsetAndMetadata offsetAndMetadata = entry.getValue();         offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(                 offsetAndMetadata.offset(), offsetAndMetadata.metadata()));     }     // 获取generation信息     final Generation generation;     if (subscriptions.partitionsAutoAssigned())         generation = generation();     else         generation = Generation.NO_GENERATION;     // if the generation is null, we are not part of an active group (and we expect to be).     // the only thing we can do is fail the commit and let the user rejoin the group in poll()     if (generation == null)         return RequestFuture.failure(new CommitFailedException());     // 构建OffsetCommitRequest     OffsetCommitRequest req = new OffsetCommitRequest(             this.groupId,             generation.generationId,             generation.memberId,             OffsetCommitRequest.DEFAULT_RETENTION_TIME,             offsetData);     log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);     // 将请求放到unsent集合等待被发送     return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)             .compose(new OffsetCommitResponseHandler(offsets)); }

 

二 获取offset

在rebalance操作结束之后,每一个消费者都确定了其需要的分区。在开始消费之前,消费者需要确定拉取消息的起始位置。此时消费者可以通过OffsetFetchRequest请求获取上次提交的offset并从此开始消费

 

OffsetFetchRequest和OffsetFetchResponse消息体格式:

 

获取offset源码分析:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {     while (true) {         // 确认已经准备好接受请求,否则一直阻塞         ensureCoordinatorReady();         // 发送OffsetFetchRequest请求         RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);         // 阻塞发送OffsetFetchRequest请求         client.poll(future);         if (future.succeeded())             return future.value(); // 返回从服务器端得到的offset的值         if (!future.isRetriable())             throw future.exception();         time.sleep(retryBackoffMs);// 否则等待重试     } }

 

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {     // 获取GroupCoordinator     Node coordinator = coordinator();     if (coordinator == null)         return RequestFuture.coordinatorNotAvailable();     log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions);     // 构建OffsetFetchRequest请求     OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<>(partitions));     // 发送请求到unsent请求队列,等待被发送     return client.send(coordinator, ApiKeys.OFFSET_FETCH, request)             .compose(new OffsetFetchResponseHandler()); }

 

复制代码
1
处理OffsetFetchResponse:在返回RequestFuture的时候,会通过OffsetFetchResponseHandlerOffsetFetchResponse做一些处理:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {         // 构造一个<TopicPartition, OffsetAndMetadata> 集合offsets         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());         // 遍历返回的相应的数据         for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {             TopicPartition tp = entry.getKey();             OffsetFetchResponse.PartitionData data = entry.getValue();             if (data.hasError()) {                 Errors error = Errors.forCode(data.errorCode);                 log.debug("Group {} failed to fetch offset for partition {}: {}", groupId, tp, error.message());                 if (error == Errors.GROUP_LOAD_IN_PROGRESS) {                     // just retry                     future.raise(error);                 } else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) {                     // re-discover the coordinator and retry                     coordinatorDead();                     future.raise(error);                 } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {                     future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));                 } else {                     future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));                 }                 return;             } else if (data.offset >= 0) {// 如果没有错误则把数据放到offsets集合里                 // record the position with the offset (-1 indicates no committed offset to fetch)                 offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));             } else {                 log.debug("Group {} has no committed offset for partition {}", groupId, tp);             }         }         // 传播offset集合最终会通过fetchCommitedOffsets取到offset的值         future.complete(offsets);     } }

最后

以上就是直率项链最近收集整理的关于offset分析一 提交offset二 获取offset的全部内容,更多相关offset分析一内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(60)

评论列表共有 0 条评论

立即
投稿
返回
顶部