概述
一、简介
记录学习java语言中进行mqtt通讯的一些代码,通讯解析采用fastjson
二、工程代码
gradle依赖
repositories {
mavenCentral()
maven {
url "https://repository.mulesoft.org/nexus/content/repositories/public"
}
}
dependencies {
implementation group: 'org.eclipse.paho', name: 'org.eclipse.paho.client.mqttv3', version: '1.2.2'
implementation group: 'com.alibaba', name: 'fastjson', version: '1.2.75'
}
MqttCommunication
package app;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/*
mqtt 通讯类
*/
public class MqttCommunication {
private MqttClient client;
private String userName = "admin";
private String passWord = "password";
private MqttMessage message;
private MqttTopic mqttTopic;
public MqttCommunication(String id) {
this(id, null, "tcp://127.0.0.1:1883");
}
/*
* 初始化化时每个应用应该具有唯一性 host = "tcp://127.0.0.1:1883";
*/
public MqttCommunication(String id, MqttCallback callback, String host) {
try {
client = new MqttClient(host, id, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
message = new MqttMessage();
if (callback == null) {
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable arg0) {
System.out.println(" connectionLost " + arg0);
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
System.out.println(" deliveryComplete " + arg0);
}
@Override
public void messageArrived(String topic, MqttMessage msg) throws Exception {
System.out.println(topic + " messageArrived: " + msg.toString());
}
});
} else {
client.setCallback(callback);
}
client.connect(options);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void sendMessage(String topic, String msg) {
try {
message.setQos(0);
message.setRetained(true);
message.setPayload(msg.getBytes());
mqttTopic = client.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(message);// 发布主题
token.waitForCompletion();
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
/*
* String[] topics = new String[] { "Tcs/test1" }; int[] qoss = new int[] { 0 };
*/
public void subscribe(String[] topicFilters, int[] qos) {
try {
client.subscribe(topicFilters, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
进行json通讯,网上好像没有像cjson一样简单快捷的方式来操作json,目前使用了阿里的fastjson,但json对象生成json必须符合一定规则,即Java Bean对象
// 这个是符合规则的Java Bean 对象
public class TestObject {
private int key1;
private String key2;
private double key3=3.14;
public void setKey1(int key1) {
this.key1 = key1;
}
public int getKey1() {
return key1;
}
public void setKey2(String key2) {
this.key2 = key2;
}
public String getKey2() {
return key2;
}
public void setKey3(double key1) {
this.key3 = key1;
}
public double getKey3() {
return key3;
}
}
使用fastjson进行序列化后结果如下
{
"key1" : 780,
"key2" : "key2",
"key3" : 3.14
}
App
package app;
import com.alibaba.fastjson.JSON;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class App extends Thread {
private TestObject obj;
public App() {
obj = new TestObject();
}
//实现Javabaice类转换成json字符串
public String JsonDataCnt(int cnt) {
obj.setKey1(cnt);
obj.setKey2("key2");
// 这里将javabean转化成json字符串
String jsonString = JSON.toJSONString(obj);
return jsonString;
}
public void TopicTest() {
MqttCallback callback = new MqttCallback() {
@Override
/* 断开连接回调 */
public void connectionLost(Throwable arg0) {
System.out.println(" connectionLost " + arg0);
}
@Override
/* 发送完成回调函数 */
public void deliveryComplete(IMqttDeliveryToken arg0) {
// System.out.println(" deliveryComplete " + arg0);
}
@Override
/* 接收回调 */
public void messageArrived(String Topic, MqttMessage msg) throws Exception {
System.out.println(Topic + " messageArrived: " + msg.toString());
}
};
MqttCommunication myMqtt = new MqttCommunication("Server", callback, "tcp://192.168.2.18:1883");
String[] topics = new String[] { "Tcs/test1" };
int[] qoss = new int[] { 0 };
myMqtt.subscribe(topics, qoss);
int cnt = 0;
while (true) {
myMqtt.sendMessage("Topic", JsonDataCnt(cnt++));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
System.out.println("hello");
App a = new App();
a.TopicTest();
System.out.println("end");
}
}
三、其他
最后
以上就是冷傲衬衫为你收集整理的Java使用Json+mqtt进行通讯一、简介二、工程代码三、其他的全部内容,希望文章能够帮你解决Java使用Json+mqtt进行通讯一、简介二、工程代码三、其他所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复