概述
一、依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.10.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.citydo</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
二、配置
server.port=8999
spring.kafka.bootstrapServers=192.168.0.195:9092
spring.kafka.consumer.groupId=Group
spring.kafka.consumer.keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.valueDserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.groupId=Group
spring.kafka.producer.keyDeserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.valueDserializer=org.apache.kafka.common.serialization.StringSerializer
三、参数
package com.citydo.kafka;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Getter
@Setter
@ToString
public class PayMessage {
private String orderCode;
private Float fee;
private Long sendTime;
}
四、消费者
package com.citydo.kafka;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class MessageConsumer {
public static final String PAY_TOPIC = "payTopic";
private Gson gson = new GsonBuilder().create();
@KafkaListener(topics = PAY_TOPIC)
public void onMessage(String payMessage) {
PayMessage msg = gson.fromJson(payMessage, PayMessage.class);
log.info("msg"+msg);
}
}
五、生产者
package com.citydo.kafka;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MessageProducer {
public static final String PAY_TOPIC = "payTopic";
@Autowired
private KafkaTemplate kafkaTemplate;
private Gson gson = new GsonBuilder().create();
public void send(PayMessage payMessage) {
String msg = gson.toJson(payMessage);
kafkaTemplate.send(PAY_TOPIC, msg);
log.info("msg"+msg);
}
}
六、启动
package com.citydo.kafka;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import java.util.UUID;
@SpringBootApplication
@Slf4j
public class KafkaApplication {
public static void main(String[] args) {
ApplicationContext applicationContext = SpringApplication.run(KafkaApplication.class, args);
//发送消息
MessageProducer producer = applicationContext.getBean(MessageProducer.class);
while (true){
PayMessage message = new PayMessage();
message.setFee((float) System.currentTimeMillis());
message.setOrderCode(UUID.randomUUID().toString());
message.setSendTime(System.currentTimeMillis());
producer.send(message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
log.info("{}",e.getMessage());
}
}
}
}
最后
以上就是无情含羞草为你收集整理的springboot与kafka的实例的全部内容,希望文章能够帮你解决springboot与kafka的实例所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复