我是靠谱客的博主 敏感月饼,这篇文章主要介绍spring-boot配置kafka、kafka使用、kafka生产者和消费者导入对应的包配置文件配置发送kafka消息方法接收kafka消息方法其他步骤,现在分享给大家,希望可以做个参考。

spring-boot配置kafka、kafka使用、kafka生产者和消费者

  • 导入对应的包
  • 配置文件配置
  • 发送kafka消息方法
  • 接收kafka消息方法
  • 其他步骤

导入对应的包

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<!--kafka--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.1.0</version> <exclusions> <exclusion> <artifactId>jackson-databind</artifactId> <groupId>com.fasterxml.jackson.core</groupId> </exclusion> <exclusion> <artifactId>jackson-core</artifactId> <groupId>com.fasterxml.jackson.core</groupId> </exclusion> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> </exclusion> </exclusions> </dependency> <!--kafka-->

配置文件配置

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
spring: kafka: bootstrap-servers: 127.0.0.1:9092 #地址,多个中间使用,分割:127.0.0.1:9092,127.0.0.1:9093 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer #发送key类型 value-serializer: org.apache.kafka.common.serialization.StringSerializer #发送value类型 consumer: group-id: TEST #群组ID enable-auto-commit: false #停止(取消)自动发送 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #接收key类型 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #接收value类型

发送kafka消息方法

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//KafkaTemplate的类型为配置文件中定义的类型 @Component public class SendKafka{ @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendKafka(){ try { //发送消息 testtopic为topic主题,testkey为key键,测试值为传的内容 kafkaTemplate.send("testtopic","testkey","测试值") } catch (Exception e) { e.printStackTrace(); log.error("{}",e); } } }

接收kafka消息方法

复制代码
1
2
3
4
5
6
7
8
9
//接收的方法(消费者) @KafkaListener(topics = {"testtopic"}) //testtopic为topic主题 public void testtopic(ConsumerRecord<String, String> record) { //获取key和value并设置编码 String key = new String(record.key().getBytes("UTF-8"),"UTF-8"); String value = new String(record.value().getBytes("UTF-8"),"UTF-8"); System.out.println("key:"+key+",value:"+value); }

其他步骤

  1. kafka服务自带zookeeper下载与启动
  2. Spring boot配置kafka服务
  3. kafka生产者发送消息成功回调
  4. kafka根据ip端口获取消息队列上的topic
  5. kafka动态设置监听哪些topic
  6. 动态启动关闭kafka监听、设置默认不监听kafka
  7. kafka设置:1只接受消息、不发送消息;2只发送消息不接受消息;3既接受消息也发送消息;4既不接收消息也不发送消息
  8. kafka会把历史数据都获取下来
  9. Spring boot kafka执行多次多次消费

最后

以上就是敏感月饼最近收集整理的关于spring-boot配置kafka、kafka使用、kafka生产者和消费者导入对应的包配置文件配置发送kafka消息方法接收kafka消息方法其他步骤的全部内容,更多相关spring-boot配置kafka、kafka使用、kafka生产者和消费者导入对应内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(73)

评论列表共有 0 条评论

立即
投稿
返回
顶部