我是靠谱客的博主 安详橘子,最近开发中收集的这篇文章主要介绍Kafka多个消费者监听消费同一个Topic主题一、需求介绍二、@kafkaListener注解三、代码实现四、测试,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
多个消费者监听消费同一个Topic主题
- 一、需求介绍
- 二、@kafkaListener注解
- 三、代码实现
- 3.1 第一个消费者
- 3.2 第二个消费者
- 3.3 生产者
- 四、测试
一、需求介绍
有一个Topic:hw_data 有3个分区 3个副本
组:hw-data-group
将这个主题的消息分发给两个(或者多个)消费者消费,(不能消费相同的消息)
二、@kafkaListener注解
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
/**
* 消费者的id,当GroupId没有被配置的时候,默认id为GroupId
*/
String id() default "";
/**
* 监听容器工厂,当监听时需要区分单数据还是多数据消费需要配置containerFactory 属性
*/
String containerFactory() default "";
/**
* 需要监听的Topic,可监听多个,和 topicPattern 属性互斥
*/
String[] topics() default {};
/**
* 需要监听的Topic的正则表达。和 topics,topicPartitions属性互斥
*/
String topicPattern() default "";
/**
* 可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听,可配置该参数, 和 topicPattern 属性互斥
*/
TopicPartition[] topicPartitions() default {};
/**
*侦听器容器组
*/
String containerGroup() default "";
/**
* 监听异常处理器,配置BeanName
*/
String errorHandler() default "";
/**
* 消费组ID
*/
String groupId() default "";
/**
* id是否为GroupId
*/
boolean idIsGroup() default true;
/**
* 消费者Id前缀
*/
String clientIdPrefix() default "";
/**
* 真实监听容器的BeanName,需要在 BeanName前加 "__"
*/
String beanRef() default "__listener";
}
三、代码实现
3.1 第一个消费者
package com.dataWarehouseOss.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
/**
* @author :LiuShihao
* @date :Created in 2020/9/16 4:15 下午
* @desc :
* containerGroup:侦听器容器组
* topicPartitions:可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听,可配置该参数, 和 topicPattern 属性互斥
*/
@Slf4j
@Component
public class Consumer1 {
@KafkaListener(containerGroup="first-group",topicPartitions = {@TopicPartition(topic = "first",partitions = {"0","1"})})
public void m1(ConsumerRecord<String, String> record){
log.info("分区0,1 :"+record.topic()+" : "+record.value());
}
}
3.2 第二个消费者
package com.dataWarehouseOss.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
/**
* @author :LiuShihao
* @date :Created in 2020/9/16 4:15 下午
* @desc :
* containerGroup:侦听器容器组
* topicPartitions:可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听,可配置该参数, 和 topicPattern 属性互斥
*/
@Slf4j
@Component
public class Consumer2 {
@KafkaListener(containerGroup="first-group",topicPartitions = {@TopicPartition(topic = "first",partitions = {"2"})})
public void m1(ConsumerRecord<String, String> record){
log.info("分区2 :"+record.topic()+" : "+record.value());
}
}
3.3 生产者
@Component
@Slf4j
public class SendKafkaToFirst {
@Autowired
KafkaTemplate kafkaTemplate;
public static final String TOPIC = "first";
@Scheduled(cron = "0 */2 * * * ?")
public void sendKafka(){
log.info("---====定时任务执行了:向first发送10条数据====---");
for (int i = 1; i <=10 ; i++) {
kafkaTemplate.send(TOPIC,i+"");
log.info("---==="+i+"===---");
}
}
}
四、测试
创建first
主题 、 三个分区 、 三个副本
向first
主题中发送10
条消息,会到first
的三个分区中
可以看到,我们发送了10条消息到 first 主题的三个分区,
然后第一个消费者消费的 0和1 分区的消息,第二个分区 消费的是 2 分区的消息。
通过日志显示,消息并没有被重复消费。
最后
以上就是安详橘子为你收集整理的Kafka多个消费者监听消费同一个Topic主题一、需求介绍二、@kafkaListener注解三、代码实现四、测试的全部内容,希望文章能够帮你解决Kafka多个消费者监听消费同一个Topic主题一、需求介绍二、@kafkaListener注解三、代码实现四、测试所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复