概述
MQTT消息订阅与解析
-
通过配置文件创建mqtt客户端
1.1 xml文件中添加mqtt客户端配置项
<!-- mqtt客户端 --> <bean id="clientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory"> <property name="userName" value="test" /> <property name="password" value="test" /> </bean> <!-- 消息适配器 --> <int-mqtt:message-driven-channel-adapter id="mqtttest" client-id="mqttTest" url="tcp://MQTT服务器地址:1883" topics="/data/#" qos="2" client-factory="clientFactory" auto-startup="true" send-timeout="12" channel="startCase" /> <int:channel id="startCase" /> <!-- 消息处理类 --> <int:service-activator id="startCaseService" input-channel="startCase" ref="mqttCaseService" method="messageArrived" /> <bean id="mqttCaseService" class="com.iamapsycho.mqtt.DataHandler" />
1.2 创建消息解析的类DataHandler
public class DataHandler{ public void messageArrived(Message<?> message) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException { //获取消息体订阅的主题 final String topic = (String) message.getHeaders().get("mqtt_topic"); //获取消息体中的消息 String contentStr = message.getPayload().toString(); //消息转json JSONObject jo = JSONObject.parseObject(contentStr); // TODO Auto-generated method stub // 根据具体情况添加解析消息的方法 }
-
自定义mqtt客户端并创建连接
2.1 xml文件中注入bean
<!-- mqttDataHandler mqtt构造类 --> <bean id="mqttDataHandler" class="com.iamapsycho.mqtt.MqttDataHandler"></bean> <!-- 自定义监听springmvc启动完成执行内部方法 --> <bean id="mqttBeanListener" class="com.iamapsycho.mqtt.listener.MqttBeanListener"></bean>
2.2 声明MqttDataHandler类
public class MqttDataHandler implements MqttCallback{ public static Logger logger = Logger.getLogger(MqttDataHandler.class); Service1 service1; Service2 service2; Service3 service3; Service4 service4; MqttBeanListener mqttBeanListener; public MqttDataHandler() {} /** * 构造函数 * @param service1 * @param service2 * @param service3 * @param service4 * @param mqttBeanListener */ public MqttDataHandler(Service1 service1, Service2 service2, Service3 service3, Service4 service4, MqttBeanListener mqttBeanListener) { super(); this.service1= service1; this.service2= service2; this.service3= service3; this.service4= service4; this.mqttBeanListener = mqttBeanListener; } /** * 连接丢失处理方法 */ @Override public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 logger.info("连接断开了,准备进行重连"); while (true){ try {//如果没有发生异常说明连接成功,如果发生异常,则死循环 Thread.sleep(6000); logger.info("正在尝试重连"); mqttBeanListener.connect(); mqttBeanListener.autoSubsrcribe(); break; }catch (Exception e){ continue; } } } /** * 消息处理方法 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String eqno = topic.split("/")[2]; String contentStr = new String(message.getPayload()); JSONObject jo = JSONObject.parseObject(contentStr); // TODO Auto-generated method stub // 根据具体情况添加解析消息的方法 } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }
2.3 声明MqttBeanListener类
/**
*
* @TODO: Spring/SpringMVC在启动完成后执行方法
*/
public class MqttBeanListener implements ApplicationListener<ContextRefreshedEvent> {
//ContextRefreshedEvent为初始化完毕事件,spring还有很多事件可以利用
@Value("#{prop.mqttServerURI}")
private String mqttServerURI; //在配置文件中定义,也可以在类中写死
@Value("#{prop.mqttUsername}")
private String mqttUsername;//在配置文件中定义,也可以在类中写死
@Value("#{prop.mqttPassword}")
private String mqttPassword;//在配置文件中定义,也可以在类中写死
@Autowired
Service1 service1;
@Autowired
Service2 service2;
@Autowired
Service3 service3;
@Autowired
Service4 service4;
private static MqttClient sampleClient ;
MqttDataHandler mqttDataHandler;
@Override
public void onApplicationEvent(ContextRefreshedEvent ev) {
// 防止重复执行。
if (ev.getApplicationContext().getParent() == null) {
//需要执行的逻辑代码,当spring容器初始化完成后就会执行该方法。
this.createMqttClient();
//订阅默认的主题
this.autoSubsrcribe();
}
}
/**
*
* @TODO: mqtt链接
* @throws MqttException
*/
public void connect() throws MqttException {
MemoryPersistence persistence = new MemoryPersistence();
mqttDataHandler = new MqttDataHandler(service1, service2,
service3, service4, MqttBeanListener.this);
//防止重复创建MQTTClient实例
if (sampleClient==null) {
String clientId = IDUtils.createUUID().toString();
sampleClient = new MqttClient(mqttServerURI, clientId, persistence);
sampleClient.setCallback(mqttDataHandler);
}
MqttConnectOptions options = getOptions();
//判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
if (!sampleClient.isConnected()) {
sampleClient.connect(options);
System.out.println("连接成功");
}else {
//这里的逻辑是如果连接成功就重新连接
sampleClient.disconnect();
sampleClient.connect(getOptions());
System.out.println("连接成功");
}
}
/**
* @TODO(生成配置对象,用户名,密码等)
* @return MqttConnectOptions
*/
public MqttConnectOptions getOptions() {
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
//超时设置
connOpts.setConnectionTimeout(10);
return connOpts;
}
/**
* @author iamapsycho
* @TODO 创建Mqtt客户端
*/
public void createMqttClient() {
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttConnectOptions connOpts = getOptions();
mqttDataHandler = new MqttDataHandler(service1, service2,
service3, service4, MqttBeanListener.this);
String clientId = IDUtils.createUUID().toString();
sampleClient = new MqttClient(mqttServerURI, clientId, persistence);
sampleClient.setCallback(mqttDataHandler);
sampleClient.connect(connOpts);
} catch (MqttException me) {
me.printStackTrace();
}
}
/**
* 订阅主题
*/
public void subsrcribe(String topic) {
try {
//设置消息级别
int Qos=0;
sampleClient.subscribe(topic,Qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* @author iamapsycho
* 订阅 data/upload/equNo,
* 其中equipmentNoList代表数据库中设备编号
*/
public void autoSubsrcribe() {
List<String> equNoList = equipmentManageService.getEquipmentNoList();
String uploadTopic = "data/upload/";
for(String equNo: equipmentNoList) {
this.subsrcribe(equNo, uploadTopic);
}
}
}
后期需要优化的地方
在添加新设备的同时,要对相应的设备进行订阅
最后
以上就是孝顺冰棍为你收集整理的MQTT消息订阅与解析MQTT消息订阅与解析的全部内容,希望文章能够帮你解决MQTT消息订阅与解析MQTT消息订阅与解析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复