概述
引入依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.11.RELEASE</version> </dependency>
因为我的项目的 springboot 版本是 1.5.22.RELEASE,所以引的是 1.3.11.RELEASE 的包。读者可以根据下图来自行选择对应的版本。图片更新可能不及时,详情可查看spring-kafka 官方网站。
注:这里有个踩坑点,如果引入包版本不对,项目启动时会抛出org.springframework.core.log.LogAccessor 异常:
java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor
创建配置类
/** * kafka 配置类 */ @Configuration @EnableKafka public class KafkaConsumerConfig { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class); @Value("${kafka.bootstrap.servers}") private String kafkaBootstrapServers; @Value("${kafka.group.id}") private String kafkaGroupId; @Value("${kafka.topic}") private String kafkaTopic; public static final String CONFIG_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka_client_jaas.conf"; public static final String LOCATION_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka.client.truststore.jks"; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 设置并发量,小于或者等于 Topic 的分区数 factory.setConcurrency(5); // 设置为批量监听 factory.setBatchListener(Boolean.TRUE); factory.getContainerProperties().setPollTimeout(30000); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); //设置接入点,请通过控制台获取对应Topic的接入点。 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); //设置SSL根证书的路径,请记得将XXX修改为自己的路径。 //与SASL路径类似,该文件也不能被打包到jar中。 System.setProperty("java.security.auth.login.config", CONFIG_PATH); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, LOCATION_PATH); //根证书存储的密码,保持不变。 props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient"); //接入协议,目前支持使用SASL_SSL协议接入。 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); //SASL鉴权方式,保持不变。 props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); // 自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE); //两次Poll之间的最大允许间隔。 //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); //设置单次拉取的量,走公网访问时,该参数会有较大影响。 props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000); //每次Poll的最大数量。 //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); //消息的反序列化方式。 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //当前消费实例所属的消费组,请在控制台申请之后填写。 //属于同一个组的消费实例,会负载消费消息。 props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); //Hostname校验改成空。 props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); return props; } }
注:此处通过 factory.setConcurrency(5); 配置了并发量为 5 ,假设我们线上的 Topic 有 12 个分区。那么将会是 3 个线程分配到 2 个分区,2 个线程分配到 3 个分区,3 * 2 + 2 * 3 = 12。
Kafka 消费者
/** * kafka 消息消费类 */ @Component public class KafkaMessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class); @KafkaListener(topics = {"${kafka.topic}"}) public void listen(List<ConsumerRecord<String, String>> recordList) { for (ConsumerRecord<String,String> record : recordList) { // 打印消息的分区以及偏移量 LOGGER.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset()); String value = record.value(); System.out.println("value = " + value); // 处理业务逻辑 ... } } }
因为我在配置类中设置了批量监听,所以此处 listen 方法的入参是List:List<ConsumerRecord<String, String>>。
到此这篇关于Springboot集成Kafka进行批量消费及踩坑点的文章就介绍到这了,更多相关Springboot Kafka批量消费内容请搜索靠谱客以前的文章或继续浏览下面的相关文章希望大家以后多多支持靠谱客!
最后
以上就是如意酒窝为你收集整理的Springboot集成Kafka进行批量消费及踩坑点的全部内容,希望文章能够帮你解决Springboot集成Kafka进行批量消费及踩坑点所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复