概述
1.简介
手动指定offset起始位置的应用场景主要有两个,一是人为控制offset起始位置,二是如果出现程序错误,重新消费一次。
2.代码示例
public class ConsumerSample {
private static final String topicName = "steven";
public static void main(String[] args) {
//Consumer配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "127.0.0.1:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
//enable.auto.commit设置为false后,下面这一项便不会生效
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
TopicPartition p0 = new TopicPartition(topicName, 0);
//消费订阅某个topic的某个分区
consumer.assign(Arrays.asList(p0));
while (true) {
//手动指定offset起始位置
consumer.seek(p0, 0);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
//每个partition单独处理
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);
for (ConsumerRecord<String, String> record : pRecord) {
System.out.printf("partition = %d,offset = %d,key = %s,value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
long lastOffset = pRecord.get(pRecord.size() - 1).offset();
//单个partition中的offset,并且进行提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
//提交offset
consumer.commitSync(offset);
System.out.println("=============partition-" + partition + "end================");
}
}
}
}
3.代码运行结果
(1).控制台输出
执行ConsumerSample代码,可以看到控制台循环输出partition0的offset从0开始的消息。
partition = 0,offset = 0,key = key-0,value = value-0
partition = 0,offset = 1,key = key-2,value = value-2
partition = 0,offset = 2,key = key-4,value = value-4
partition = 0,offset = 3,key = key-6,value = value-6
partition = 0,offset = 4,key = key-8,value = value-8
=============partition - steven-0 end================
最后
以上就是明亮悟空为你收集整理的4.6 Kafka Consumer API之控制偏移量的起始位置的全部内容,希望文章能够帮你解决4.6 Kafka Consumer API之控制偏移量的起始位置所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复