概述
在这篇文章中将会讲述本人在研究使用过程中遇到的一些问题,怎样解决的以及如何与Springboot进行集成,如有不对,欢迎指正。
是什么
MQTT(消息队列遥测传输),是一种基于发布订阅模式的消息协议,他工作在TCP/IP协议簇上,他的存在解决的是硬件性能低下的远程设备以及网络状况糟糕的情况下能正常发送消息这一问题,所以他需要一个消息中间件(也就是mqtt服务器)。
优点
MQTT协议是轻量、简单、开放和易于实现的。
实际应用
话不多说上干货
在开始进行mqtt通信之前,需要搭建mqtt服务器,这里不需要我们写代码,我们直接使用开源的emqx即可,所以需要在自己的机器上运行emqx服务器
运行条件:最好在docker中运行,优点是简便快捷。
docker中下载运行emqx命令:
拉取 Docker 镜像
docker pull emqx/emqx-ee:4.4.4
启动 Docker 容器
docker run -d --name emqx-ee -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx-ee:4.4.4
启动成功后
如果docker在本地访问地址:http://localhost:18083/
如果docker不在本地访问地址:http://你的docker所在服务器ip:18083(可能需要开放接口)
emqx还是很值得研究研究的,他可以订阅,可以发送,还有注册上来的客户端信息,client及其订阅了那些topic
(topic有特殊写法,比如+代表此处有值)
qos消息级别,这个值影响的只是消息的安全性,你是否接收到消息由topic是否监听以及发送端是否发了消息决定
JAVA代码
订阅者
package com;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttClientTest {
//订阅的主题
public static final String TOPIC = "mqtt-fabu";
public static void main(String[] args) {
MyClient myClient = new MyClient();
myClient.subscribe(TOPIC, 1);
}
}
/**
* 订阅方
*/
class MyClient {
//mqtt服务器默认的地址和端口号
//这里的tcp地址就是上述mqtt服务端地址(emqx地址,端口号就是1883,emqx服务器的端口号是18083)
public static final String HOST = "tcp://172.16.9.205:1883";
//连接MQTT的客户端ID,一般以唯一标识符表示
private static final String CLIENTID = "client-1";
//连接的用户名密码(非必需)
private String userName = "admin";
private String password = "public";
private MqttClient mqttClient;
public MyClient() {
try {
mqttClient = new MqttClient(HOST, CLIENTID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(60);
options.setKeepAliveInterval(10);
options.setUserName(userName);
options.setPassword(password.toCharArray());
//定义回调函数
mqttClient.setCallback(new PushCallBack());
mqttClient.connect(options);
} catch (MqttException e) {
e.printStackTrace();
}
}
//订阅主题
public void subscribe(String topic, int qos) {
try {
mqttClient.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
发布者
package com;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttServerTest {
//发布的主题
public static final String TOPIC = "mqtt-fabu";
public static void main(String[] args) throws InterruptedException {
MqttServer mqttServer = new MqttServer();
MqttMessage message = new MqttMessage();
message.setQos(1);
message.setPayload("第一次广播".getBytes());
mqttServer.publish(TOPIC, message);
Thread.sleep(1000);
message.setPayload("第二次广播".getBytes());
mqttServer.publish(TOPIC, message);
Thread.sleep(1000);
message.setPayload("第三次广播".getBytes());
mqttServer.publish(TOPIC, message);
}
}
/**
* 发布方
*/
class MqttServer {
//mqtt服务器默认的地址和端口号
private static final String HOST = "tcp://172.16.9.205:1883";
//连接MQTT的客户端ID,一般以唯一标识符表示
private static final String CLIENTID = "server-1";
//连接的用户名密码(非必需)
private String userName = "admin";
private String password = "public";
private MqttClient mqttClient;
//构造方法,启动mqttClient
public MqttServer() {
try {
// MemoryPersistence设置clientid的保存形式,默认为以内存保存
mqttClient = new MqttClient(HOST, CLIENTID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(60);
options.setKeepAliveInterval(10);
options.setUserName(userName);
options.setPassword(password.toCharArray());
//定义回调函数
mqttClient.setCallback(new PushCallBack());
mqttClient.connect(options);
} catch (MqttException e) {
e.printStackTrace();
}
}
//发布主题
public void publish(String topic, MqttMessage message) {
try {
mqttClient.publish(topic, message);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
回调方法类
package com;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class PushCallBack implements MqttCallback {
/**
* mqtt连接丢失时触发(不包括主动disconnect)
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println("连接失败,可做重连");
}
/**
* 收到订阅消息后调用
* @param s
* @param mqttMessage
* @throws Exception
*/
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("---------------------------");
System.out.println("接收到的主题为:" + s);
System.out.println("接收到的消息为:" + new String(mqttMessage.getPayload()));
}
/**
* 发布消息完成后调用
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("---------------------------");
System.out.println("广播完成");
}
}
整体项目在我的Gitee仓库中,有需要的可以自取(maven项目,下载依赖时会比较慢一点,耐心等待~~)
Gitee地址:https://gitee.com/monkeywu/mqtt3.git
运行结果
先启动订阅者,再启动发布者
订阅者结果:
发布者结果:
SpringBoot集成MQTT
非常简单,无论是maven项目还是gradle项目,只需要添加对应依赖即可
mvn仓库地址:https://mvnrepository.com/?__cf_chl_rt_tk=oPlq9Zc4102gw4LbWKmWeQ3cOdKx3YylLgF927cgDoA-1654756590-0-gaNycGzNCr0
implementation("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5")
链接地址
菜鸟教程:https://www.runoob.com/w3cnote/mqtt-intro.html
百度百科:https://baike.baidu.com/item/MQTT/3618851?fr=aladdin
官网:http://mqtt.p2hp.com/
总结
东西是好东西,用也是真好用,大家学会之后有什么新的想法和新的知识点,欢迎讨论
最后
以上就是腼腆大碗为你收集整理的MQTT研究之路(mqtt3)—保姆级的全部内容,希望文章能够帮你解决MQTT研究之路(mqtt3)—保姆级所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复