概述
MQTT简单使用总结
- 前言
- 一、MQTT工具
- 二、使用步骤
- 1.引入库
- 2.注入配置信息
- 3.MQTT消费者
- 4.MQTT回调函数
- 总结
前言
本文介绍MQTT消息队列的简单使用
一、MQTT工具
MQTT的工具有很多,这里选择MQTTX测试: MQTTX客户端
二、使用步骤
1.引入库
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2.注入配置信息
mqtt:
host: tcp://127.0.0.1
clientid: admin
username: admin
password: admin
timeout: 10000
keepalive: 60
这里选择自己的配置信息
public class MqttCofigBean {
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.host}")
private String hostUrl;
@Value("${mqtt.clientid}")
private String clientId;
@Value("${mqtt.timeout}")
private int completionTimeout;
}
3.MQTT消费者
主要功能是初始化连接信息,推送MQTT消息,订阅MQTT主题.
@Slf4j
@Component
public class MqttConsumer implements ApplicationRunner {
private static MqttClient client;
private static MqttTopic mqttTopic;
private static volatile MqttConsumer mqttConsumer = null;
public static MqttConsumer getInstance() {
if (null == mqttConsumer) {
synchronized (MqttConsumer.class) {
if (null == mqttConsumer) {
mqttConsumer = new MqttConsumer();
}
}
}
return mqttConsumer;
}
/**
* MQTT连接属性配置对象
*/
@Autowired
public MqttCofigBean mqttCofigBean;
/**
* 初始化参数配置
*/
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("初始化启动MQTT连接");
this.connect();
}
/**
* 用来连接服务器
* @throws Exception 异常
*/
private void connect() throws Exception {
client = new MqttClient(mqttCofigBean.getHostUrl(), mqttCofigBean.getClientId() + "_" + StrUtil.uuid(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttCofigBean.getUsername());
options.setPassword(mqttCofigBean.getPassword().toCharArray());
//是否清除session
options.setCleanSession(true);
// 设置超时时间
options.setConnectionTimeout(30);
// 设置会话心跳时间
options.setKeepAliveInterval(60);
try {
String[] msgtopic = MqttCofigBean.SUB_TOPICS;
int[] qos = new int[msgtopic.length];
client.setCallback(new TopMsgCallback(client, options, msgtopic, qos));
client.connect(options);
log.info("MQTT连接成功:" + mqttCofigBean.getClientId());
if (msgtopic.length != 0 && !"".equals(msgtopic[0])) {
//订阅消息
for (int i = 0; i < msgtopic.length; i++) {
qos[i] = 0;
}
client.subscribe(msgtopic, qos);
}
} catch (Exception e) {
log.error("MQTT连接异常:" + e);
}
}
/**
* 重新连接
* @throws Exception 异常
*/
public void reConnect() throws Exception {
if (null != client) {
this.connect();
}
}
/**
* 发布,默认qos为0,非持久化
*
* @param topic 主题
* @param pushMessage 推送消息
*/
public void publish(String topic, String pushMessage) throws Exception {
publish(0, false, topic, pushMessage);
}
/**
* 发布
*
* @param qos qos
* @param retained 保留
* @param topic 主题
* @param pushMessage 推送消息
*/
public void publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mTopic = client.getTopic(topic);
if (null == mTopic) {
log.error("topic not exist");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 订阅某个主题
*
* @param topic 主题
* @param qos qos
*/
public void subscribe(String topic, int qos) {
try {
log.info("topic:" + topic);
client.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
public MqttClient getClient() {
return client;
}
public MqttTopic getMqttTopic() {
return mqttTopic;
}
}
4.MQTT回调函数
回调函数主要是负责,连接中断后重连,收到消息的处理,也可以使用监听器的方式,来处理收到消息的业务在处理.
@Slf4j
public class TopMsgCallback implements MqttCallback {
/**
* MqttClient
*/
private MqttClient client;
/**
* MqttConnectOptions
*/
private MqttConnectOptions options;
/**
* topic数组
*/
private String[] topic;
/**
* qos
*/
private int[] qos;
public TopMsgCallback() {
}
public TopMsgCallback(MqttClient client, MqttConnectOptions options) {
this.client = client;
this.options = options;
}
public TopMsgCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
this.client = client;
this.options = options;
this.topic = topic;
this.qos = qos;
}
/**
* 断开重连
*/
@Override
public void connectionLost(Throwable cause) {
log.info("MQTT连接断开,发起重连");
while (true) {
try {
Thread.sleep(30000);
client.connect(options);
log.info("MQTT重新连接成功:" + client);
String[] msgtopic = MqttCofigBean.SUB_TOPICS;
int[] qos = new int[msgtopic.length];
if (msgtopic.length != 0 && !"".equals(msgtopic[0])) {
//订阅消息
for (int i = 0; i < msgtopic.length; i++) {
qos[i] = 0;
}
client.subscribe(msgtopic, qos);
}
break;
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 接收到消息调用令牌中调用
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
/**
* 消息处理
* 封装数据,储存存活设备存入Redis,推送数据到kafka
*/
@Override
public void messageArrived(String topic, MqttMessage message) {
JSONObject jsonObject = JSONObject.parseObject(JSONObject.toJSONString(message));
// TODO 升级结果推送至业务处理
log.info("接收到的消息是{}", message);
}
}
总结
没有总结
最后
以上就是负责鼠标为你收集整理的MQTT简单使用总结前言一、MQTT工具二、使用步骤总结的全部内容,希望文章能够帮你解决MQTT简单使用总结前言一、MQTT工具二、使用步骤总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复