我是靠谱客的博主 秀丽玉米,最近开发中收集的这篇文章主要介绍kafka springboot+@KafkaListener 自动提交的简单使用,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

spring-boot 版本 1.5.12

依赖使用spring-kafka1.3.3(对应kafka-clients版本0.11.0.0,请使用于kafka版本对应版本的依赖)

        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.12.RELEASE</version>
            <relativePath/>
        </parent>        

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.3.3.RELEASE</version>
        </dependency>

 

1、自定义监听工厂  (resources目录下面kafka.properties文件中定义对应参数)

kafka.bootstrapServers=127.0.0.1::9093
kafka.groupId=test
kafka.sessionTimeout=30000
kafka.maxPollRecords=50
kafka.autoOffsetReset=latest
#kafka.max.poll.interval.ms=
kafka.autoCommitIntervalMs=2000
kafka.consumerRequestTimeoutMs=320000
#消费者并发启动个数(对应分区个数)每个listener方法
kafka.concurrency=10
@Configuration
@PropertySource("kafka.properties")
@ConfigurationProperties(prefix = "kafka")
@Data
public class KonkaKafkaConfig {

    private String bootstrapServers;

    private String groupId;

    private String sessionTimeout;

    private String maxPollRecords;

    private String autoOffsetReset;

    private String autoCommitIntervalMs;

    private String consumerRequestTimeoutMs;

    private Integer concurrency;
    @Bean("kafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<?> batchFactory(){
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setBatchListener(true);
        factory.setConcurrency(concurrency);
        return factory;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,consumerRequestTimeoutMs);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaLogMessageDeSer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//每一批数量
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
//        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) 超过对接时间认为是lock
        return props;
    }
}

2、监听器

@Component
public class KafkaListener {

    private final static Logger LOGGER = LoggerFactory.getLogger(KafkaListener.class);


    @KafkaListener(containerFactory = "kafkaListenerContainerFactory", topics = "ucenter")
    public void consumerListener(List<ConsumerRecord> consumerRecords) {
        if (consumerRecords.size() > 0) {
            PartitionCounter.addCounter(consumerRecords.get(0).partition(), consumerRecords.size());
        }
        Iterator<ConsumerRecord> iterator = consumerRecords.iterator();
        while (iterator.hasNext()) {
            ConsumerRecord consumerRecord = iterator.next();
            String key = consumerRecord.key().toString();
            String value = consumerRecord.value().toString();
        }
    }

}

3、spring-boot容器即可
(参数详解看后面文章)

 

 

 

最后

以上就是秀丽玉米为你收集整理的kafka springboot+@KafkaListener 自动提交的简单使用的全部内容,希望文章能够帮你解决kafka springboot+@KafkaListener 自动提交的简单使用所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(40)

评论列表共有 0 条评论

立即
投稿
返回
顶部