我是靠谱客的博主 任性大船,最近开发中收集的这篇文章主要介绍Kafka应用点滴(一)——Springboot集成Kafka之手动提交offset,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

        Springboot集成kafka,在实验consumer手动提交offset时,发现设置spring.kafka.consumer.enable-auto-commit=false后,没有调用consumer.commitAsync()的情况下,offset仍然被提交,经查其实spring在负责“手动”提交,按官网说法,有以下几种提交模式:

  • RECORD - commit the offset when the listener returns after processing the record.
  • BATCH - commit the offset when all the records returned by the poll() have been processed.
  • TIME - commit the offset when all the records returned by the poll() have been processed as long as the ackTime since the last commit has been exceeded.
  • COUNT - commit the offset when all the records returned by the poll() have been processed as long as ackCount records have been received since the last commit.
  • COUNT_TIME - similar to TIME and COUNT but the commit is performed if either condition is true.
  • MANUAL - the message listener is responsible to acknowledge() the Acknowledgment; after which, the same semantics as BATCH are applied.
  • MANUAL_IMMEDIATE - commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.

也就是说除开MANUAL和MANUAL_IMMEDIATE,其他模式都是由spring根据约定的条件控制提交,也就是说我们在代码中调用consumer.commitAsync()是不起作用的。

实例代码:

appliaction.properties:

spring.kafka.consumer.enable-auto-commit=false

spring.kafka.consumer.max-poll-records=5

spring.kafka.consumer.heartbeat-interval=900

spring.kafka.consumer.properties.session.timeout.ms=1000

spring.kafka.consumer.properties.auto.commit.interval=100

spring.kafka.consumer.properties.auto.offset.reset=earliest

spring.kafka.consumer.properties.max.poll.interval.ms=1000

spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

consumer:

@KafkaListener(id="C1",topicPartitions= {@TopicPartition(topic = "acktest",partitions= "1")},groupId = "group1")

public void receiveOne(ConsumerRecord<?, ?>> record,Consumer consumer) throws Exception {

    System.out.println("The message one is :" + record.offset() + "-" + record.partition() + "-" + record.value());

    consumer.commitAsync();

}

 

最后

以上就是任性大船为你收集整理的Kafka应用点滴(一)——Springboot集成Kafka之手动提交offset的全部内容,希望文章能够帮你解决Kafka应用点滴(一)——Springboot集成Kafka之手动提交offset所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(71)

评论列表共有 0 条评论

立即
投稿
返回
顶部