概述
我有一个3节点kafka群集设置.我正在使用风暴来阅读来自kafka的消息.我系统中的每个主题都有7个分区.
现在我面临一个奇怪的问题.直到3天前,一切都运转良好.但是,现在看来我的风暴拓扑无法从2个分区 – #1和#4中专门读取.
我试图深入研究这个问题并发现在我的kafka日志中,对于这两个分区,一个偏移量丢失,即在5964511之后,下一个偏移量是5964513而不是5964512.
由于缺少偏移,Simple Consumer无法继续下一个偏移.我做错了什么或者它是一个已知的错误?
可能是这种行为的原因是什么?
我使用以下代码来读取有效偏移的窗口:
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map requestInfoMap = new HashMap();
requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 100));
OffsetRequest request = new OffsetRequest( requestInfoMap, kafka.api.OffsetRequest.CurrentVersion() , clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
long[] validOffsets = response.offsets(topic, partition);
for (long validOffset : validOffsets) {
System.out.println(validOffset + " : ");
}
long largestOffset = validOffsets[0];
long smallestOffset = validOffsets[validOffsets.length - 1];
System.out.println(smallestOffset + " : " + largestOffset );
return largestOffset;
}
这给了我以下输出:
4529948 : 6000878
因此,我提供的偏移量在偏移范围内.
最后
以上就是清秀大米为你收集整理的java获取kafka偏移量_java – Kafka日志中缺少偏移量 – 简单消费者无法继续的全部内容,希望文章能够帮你解决java获取kafka偏移量_java – Kafka日志中缺少偏移量 – 简单消费者无法继续所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复