概述
<!-- mqtt依赖 --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
-- application.yml配置
mqtt: client: url: tcp://ip:port username: admin password: public topic: /test_topic/# completionTimeout: 3000 clientId: 123456 enabled: true qos: 0
-- 创建mq连接信息实体
@Component @Data public class MqttBase { @Value("${spring.mqtt.client.url}") private String mqttHost; @Value("${spring.mqtt.client.username}") private String mqttUserName; @Value("${spring.mqtt.client.password}") private String mqttPwd; @Value("${spring.mqtt.client.topic}") private String topic; @Value("${spring.mqtt.client.completionTimeout}") private Integer completionTimeout; @Value("${spring.mqtt.client.qos}") private Integer qos; @Value("${spring.mqtt.client.clientId}") private String clientId; }
-- mq工具类
@Configuration @Slf4j public class MqttConfig { @Autowired private MqttBase mqttBase; /***** * 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用SSL验证,也在这里设置。 * @return */ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[]{mqttBase.getMqttHost()}); options.setUserName(mqttBase.getMqttUserName()); options.setPassword(mqttBase.getMqttPwd().toCharArray()); factory.setConnectionOptions(options); return factory; } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttBase.getClientId() + System.currentTimeMillis(), mqttClientFactory(), mqttBase.getTopic()); adapter.setCompletionTimeout(mqttBase.getCompletionTimeout()); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(mqttBase.getQos()); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean //ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。 @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return message -> { String payload = message.getPayload().toString(); String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); // 根据topic分别进行消息处理。 log.info(topic + "主题,消息为{}",payload); switch (topic) { case YUAN_AN_ELECTRIC_TOPIC: break; case YUAN_AN_ENVIRONMENT_TOPIC: break; default: log.info(topic + ": 未处理消息" + payload); } }; } // 发送消息 @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /***** * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory * @return */ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler outbound() { // 在这里进行mqttOutboundChannel的相关设置 MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttBase.getClientId() + System.currentTimeMillis(), mqttClientFactory()); messageHandler.setAsync(true); //如果设置成true,发送消息时将不会阻塞。 messageHandler.setDefaultTopic("testTopic"); return messageHandler; } @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { // 定义重载方法,用于消息发送 void sendToMqtt(String payload); // 指定topic进行消息发送 void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); // 指定topic和qos进行消息发送 void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); } }
最后
以上就是疯狂墨镜为你收集整理的Mqtt消息发送、主题订阅的全部内容,希望文章能够帮你解决Mqtt消息发送、主题订阅所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复