复制代码
1
2
3
4
5<!-- mqtt依赖 --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
-- application.yml配置
复制代码
1
2
3
4
5
6
7
8
9
10mqtt: client: url: tcp://ip:port username: admin password: public topic: /test_topic/# completionTimeout: 3000 clientId: 123456 enabled: true qos: 0
-- 创建mq连接信息实体
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23@Component @Data public class MqttBase { @Value("${spring.mqtt.client.url}") private String mqttHost; @Value("${spring.mqtt.client.username}") private String mqttUserName; @Value("${spring.mqtt.client.password}") private String mqttPwd; @Value("${spring.mqtt.client.topic}") private String topic; @Value("${spring.mqtt.client.completionTimeout}") private Integer completionTimeout; @Value("${spring.mqtt.client.qos}") private Integer qos; @Value("${spring.mqtt.client.clientId}") private String clientId; }
-- mq工具类
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98@Configuration @Slf4j public class MqttConfig { @Autowired private MqttBase mqttBase; /***** * 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用SSL验证,也在这里设置。 * @return */ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[]{mqttBase.getMqttHost()}); options.setUserName(mqttBase.getMqttUserName()); options.setPassword(mqttBase.getMqttPwd().toCharArray()); factory.setConnectionOptions(options); return factory; } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttBase.getClientId() + System.currentTimeMillis(), mqttClientFactory(), mqttBase.getTopic()); adapter.setCompletionTimeout(mqttBase.getCompletionTimeout()); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(mqttBase.getQos()); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean //ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。 @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return message -> { String payload = message.getPayload().toString(); String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); // 根据topic分别进行消息处理。 log.info(topic + "主题,消息为{}",payload); switch (topic) { case YUAN_AN_ELECTRIC_TOPIC: break; case YUAN_AN_ENVIRONMENT_TOPIC: break; default: log.info(topic + ": 未处理消息" + payload); } }; } // 发送消息 @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /***** * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory * @return */ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler outbound() { // 在这里进行mqttOutboundChannel的相关设置 MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttBase.getClientId() + System.currentTimeMillis(), mqttClientFactory()); messageHandler.setAsync(true); //如果设置成true,发送消息时将不会阻塞。 messageHandler.setDefaultTopic("testTopic"); return messageHandler; } @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { // 定义重载方法,用于消息发送 void sendToMqtt(String payload); // 指定topic进行消息发送 void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); // 指定topic和qos进行消息发送 void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); } }
最后
以上就是疯狂墨镜最近收集整理的关于Mqtt消息发送、主题订阅的全部内容,更多相关Mqtt消息发送、主题订阅内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复