我是靠谱客的博主 明亮悟空,最近开发中收集的这篇文章主要介绍4.6 Kafka Consumer API之控制偏移量的起始位置,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1.简介
手动指定offset起始位置的应用场景主要有两个,一是人为控制offset起始位置,二是如果出现程序错误,重新消费一次。

2.代码示例

public class ConsumerSample {
    private static final String topicName = "steven";

    public static void main(String[] args) {
        //Consumer配置
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "127.0.0.1:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "false");
        //enable.auto.commit设置为false后,下面这一项便不会生效
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer(props);

        TopicPartition p0 = new TopicPartition(topicName, 0);
        //消费订阅某个topic的某个分区
        consumer.assign(Arrays.asList(p0));
        while (true) {
            //手动指定offset起始位置
            consumer.seek(p0, 0);
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
            //每个partition单独处理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> pRecord = records.records(partition);
                for (ConsumerRecord<String, String> record : pRecord) {
                    System.out.printf("partition = %d,offset = %d,key = %s,value = %s%n", record.partition(), record.offset(), record.key(), record.value());
                }
                long lastOffset = pRecord.get(pRecord.size() - 1).offset();
                //单个partition中的offset,并且进行提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
                //提交offset
                consumer.commitSync(offset);
                System.out.println("=============partition-" + partition + "end================");
            }
        }
    }
}

3.代码运行结果
(1).控制台输出
执行ConsumerSample代码,可以看到控制台循环输出partition0的offset从0开始的消息。

partition = 0,offset = 0,key = key-0,value = value-0
partition = 0,offset = 1,key = key-2,value = value-2
partition = 0,offset = 2,key = key-4,value = value-4
partition = 0,offset = 3,key = key-6,value = value-6
partition = 0,offset = 4,key = key-8,value = value-8
=============partition - steven-0 end================

最后

以上就是明亮悟空为你收集整理的4.6 Kafka Consumer API之控制偏移量的起始位置的全部内容,希望文章能够帮你解决4.6 Kafka Consumer API之控制偏移量的起始位置所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(48)

评论列表共有 0 条评论

立即
投稿
返回
顶部