我是靠谱客的博主 直率项链,最近开发中收集的这篇文章主要介绍offset分析一 提交offset二 获取offset,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一 提交offset

提交offset的功能是由ConsumerCoordinator实现的,它会发送OffsetCommitRequest请求和接受OffsetCommitResponse响应

我们先分析一下OffsetCommitRequest和OffsetCommitResponse消息体格式


OffsetCommitRequest:

group_id: 消费者组的id

group_generation_id: 消费者组保存的generation信息

member_id: GroupCoordinator分配给消费者的id

retention_time: 该offset的最长保存时间

topic: topic名称

partition: 分区编号

offset: 提交消息的offset

metadata: 希望与offset一起保存的自定义数据



OffsetCommitResponse:

topic: topic名称

partition: 分区编号

error_code: 错误码

 

提交Offset的流程:

public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
   invokeCompletedOffsetCommitCallbacks();
    // 如果GroupCoordinator已知,那么就开始提交
   
if (!coordinatorUnknown()) {
        doCommitOffsetsAsync(offsets, callback);
    } else {
        // 查找到GroupCoordinator再进行提交
       
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
            @Override
           
public void onSuccess(Void value) {
                doCommitOffsetsAsync(offsets, callback);
            }

            @Override
           
public void onFailure(RuntimeException e) {
                completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));
            }
        });
    }
    client.pollNoWakeup();
}

 

private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
    // needsFetchCommittedOffsets置为true
    this.subscriptions.needRefreshCommits();
    // 发送OffsetCommitRequest请求
    RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
    final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
    // RequestFuture添加监听器,如果成功则completedOffsetCommits添加OffsetCommitCompletion
    // 如果失败也往completedOffsetCommits添加OffsetCommitCompletion,但是会带有异常
    future.addListener(new RequestFutureListener<Void>() {
        @Override
        public void onSuccess(Void value) {
            if (interceptors != null)
                interceptors.onCommit(offsets);

            completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
        }

        @Override
        public void onFailure(RuntimeException e) {
            Exception commitException = e;

            if (e instanceof RetriableException)
                commitException = new RetriableCommitFailedException(e);

            completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
        }
    });
}

 

private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
    if (offsets.isEmpty())
        return RequestFuture.voidSuccess();
    // 获取GroupCoordinator
    Node coordinator = coordinator();
    if (coordinator == null)
        return RequestFuture.coordinatorNotAvailable();

    // 遍历offsets,并且往offsetDataput数据
    Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
        OffsetAndMetadata offsetAndMetadata = entry.getValue();
        offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
                offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
    }
    // 获取generation信息
    final Generation generation;
    if (subscriptions.partitionsAutoAssigned())
        generation = generation();
    else
        generation = Generation.NO_GENERATION;

    // if the generation is null, we are not part of an active group (and we expect to be).
    // the only thing we can do is fail the commit and let the user rejoin the group in poll()
    if (generation == null)
        return RequestFuture.failure(new CommitFailedException());
    // 构建OffsetCommitRequest
    OffsetCommitRequest req = new OffsetCommitRequest(
            this.groupId,
            generation.generationId,
            generation.memberId,
            OffsetCommitRequest.DEFAULT_RETENTION_TIME,
            offsetData);

    log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);
    // 将请求放到unsent集合等待被发送
    return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
            .compose(new OffsetCommitResponseHandler(offsets));
}

 

二 获取offset

在rebalance操作结束之后,每一个消费者都确定了其需要的分区。在开始消费之前,消费者需要确定拉取消息的起始位置。此时消费者可以通过OffsetFetchRequest请求获取上次提交的offset并从此开始消费

 

OffsetFetchRequest和OffsetFetchResponse消息体格式:

 

获取offset源码分析:

public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
    while (true) {
        // 确认已经准备好接受请求,否则一直阻塞
        ensureCoordinatorReady();
        // 发送OffsetFetchRequest请求
        RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
        // 阻塞发送OffsetFetchRequest请求
        client.poll(future);
        if (future.succeeded())
            return future.value(); // 返回从服务器端得到的offset的值
        if (!future.isRetriable())
            throw future.exception();
        time.sleep(retryBackoffMs);// 否则等待重试
    }
}

 

private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
    // 获取GroupCoordinator
    Node coordinator = coordinator();
    if (coordinator == null)
        return RequestFuture.coordinatorNotAvailable();

    log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions);
    // 构建OffsetFetchRequest请求
    OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<>(partitions));

    // 发送请求到unsent请求队列,等待被发送
    return client.send(coordinator, ApiKeys.OFFSET_FETCH, request)
            .compose(new OffsetFetchResponseHandler());
}

 

处理OffsetFetchResponse:在返回RequestFuture的时候,会通过OffsetFetchResponseHandlerOffsetFetchResponse做一些处理:
    public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
        // 构造一个<TopicPartition, OffsetAndMetadata> 集合offsets
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
        // 遍历返回的相应的数据
        for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
            TopicPartition tp = entry.getKey();
            OffsetFetchResponse.PartitionData data = entry.getValue();
            if (data.hasError()) {
                Errors error = Errors.forCode(data.errorCode);
                log.debug("Group {} failed to fetch offset for partition {}: {}", groupId, tp, error.message());

                if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                    // just retry
                    future.raise(error);
                } else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                    // re-discover the coordinator and retry
                    coordinatorDead();
                    future.raise(error);
                } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                    future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));
                } else {
                    future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
                }
                return;
            } else if (data.offset >= 0) {// 如果没有错误则把数据放到offsets集合里
                // record the position with the offset (-1 indicates no committed offset to fetch)
                offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));
            } else {
                log.debug("Group {} has no committed offset for partition {}", groupId, tp);
            }
        }
        // 传播offset集合最终会通过fetchCommitedOffsets取到offset的值
        future.complete(offsets);
    }
}

最后

以上就是直率项链为你收集整理的offset分析一 提交offset二 获取offset的全部内容,希望文章能够帮你解决offset分析一 提交offset二 获取offset所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(46)

评论列表共有 0 条评论

立即
投稿
返回
顶部