概述
一 提交offset
提交offset的功能是由ConsumerCoordinator实现的,它会发送OffsetCommitRequest请求和接受OffsetCommitResponse响应
我们先分析一下OffsetCommitRequest和OffsetCommitResponse消息体格式
OffsetCommitRequest:
group_id: 消费者组的id
group_generation_id: 消费者组保存的generation信息
member_id: GroupCoordinator分配给消费者的id
retention_time: 该offset的最长保存时间
topic: topic名称
partition: 分区编号
offset: 提交消息的offset
metadata: 希望与offset一起保存的自定义数据
OffsetCommitResponse:
topic: topic名称
partition: 分区编号
error_code: 错误码
提交Offset的流程:
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();
// 如果GroupCoordinator已知,那么就开始提交
if (!coordinatorUnknown()) {
doCommitOffsetsAsync(offsets, callback);
} else {
// 查找到GroupCoordinator再进行提交
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
doCommitOffsetsAsync(offsets, callback);
}
@Override
public void onFailure(RuntimeException e) {
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));
}
});
}
client.pollNoWakeup();
}
private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
// 将needsFetchCommittedOffsets置为true
this.subscriptions.needRefreshCommits();
// 发送OffsetCommitRequest请求
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
// 对RequestFuture添加监听器,如果成功则completedOffsetCommits添加OffsetCommitCompletion
// 如果失败也往completedOffsetCommits添加OffsetCommitCompletion,但是会带有异常
future.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
if (interceptors != null)
interceptors.onCommit(offsets);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
}
@Override
public void onFailure(RuntimeException e) {
Exception commitException = e;
if (e instanceof RetriableException)
commitException = new RetriableCommitFailedException(e);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
}
});
}
private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (offsets.isEmpty())
return RequestFuture.voidSuccess();
// 获取GroupCoordinator
Node coordinator = coordinator();
if (coordinator == null)
return RequestFuture.coordinatorNotAvailable();
// 遍历offsets,并且往offsetData里put数据
Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
OffsetAndMetadata offsetAndMetadata = entry.getValue();
offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
}
// 获取generation信息
final Generation generation;
if (subscriptions.partitionsAutoAssigned())
generation = generation();
else
generation = Generation.NO_GENERATION;
// if the generation is null, we are not part of an active group (and we expect to be).
// the only thing we can do is fail the commit and let the user rejoin the group in poll()
if (generation == null)
return RequestFuture.failure(new CommitFailedException());
// 构建OffsetCommitRequest
OffsetCommitRequest req = new OffsetCommitRequest(
this.groupId,
generation.generationId,
generation.memberId,
OffsetCommitRequest.DEFAULT_RETENTION_TIME,
offsetData);
log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);
// 将请求放到unsent集合等待被发送
return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
.compose(new OffsetCommitResponseHandler(offsets));
}
二 获取offset
在rebalance操作结束之后,每一个消费者都确定了其需要的分区。在开始消费之前,消费者需要确定拉取消息的起始位置。此时消费者可以通过OffsetFetchRequest请求获取上次提交的offset并从此开始消费
OffsetFetchRequest和OffsetFetchResponse消息体格式:
获取offset源码分析:
public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
while (true) {
// 确认已经准备好接受请求,否则一直阻塞
ensureCoordinatorReady();
// 发送OffsetFetchRequest请求
RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
// 阻塞发送OffsetFetchRequest请求
client.poll(future);
if (future.succeeded())
return future.value(); // 返回从服务器端得到的offset的值
if (!future.isRetriable())
throw future.exception();
time.sleep(retryBackoffMs);// 否则等待重试
}
}
private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
// 获取GroupCoordinator
Node coordinator = coordinator();
if (coordinator == null)
return RequestFuture.coordinatorNotAvailable();
log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions);
// 构建OffsetFetchRequest请求
OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<>(partitions));
// 发送请求到unsent请求队列,等待被发送
return client.send(coordinator, ApiKeys.OFFSET_FETCH, request)
.compose(new OffsetFetchResponseHandler());
}
处理OffsetFetchResponse:在返回RequestFuture的时候,会通过OffsetFetchResponseHandler对OffsetFetchResponse做一些处理:
public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
// 构造一个<TopicPartition, OffsetAndMetadata> 集合offsets
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
// 遍历返回的相应的数据
for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
OffsetFetchResponse.PartitionData data = entry.getValue();
if (data.hasError()) {
Errors error = Errors.forCode(data.errorCode);
log.debug("Group {} failed to fetch offset for partition {}: {}", groupId, tp, error.message());
if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
// just retry
future.raise(error);
} else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) {
// re-discover the coordinator and retry
coordinatorDead();
future.raise(error);
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));
} else {
future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
}
return;
} else if (data.offset >= 0) {// 如果没有错误则把数据放到offsets集合里
// record the position with the offset (-1 indicates no committed offset to fetch)
offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));
} else {
log.debug("Group {} has no committed offset for partition {}", groupId, tp);
}
}
// 传播offset集合最终会通过fetchCommitedOffsets取到offset的值
future.complete(offsets);
}
}
最后
以上就是直率项链为你收集整理的offset分析一 提交offset二 获取offset的全部内容,希望文章能够帮你解决offset分析一 提交offset二 获取offset所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复