我是靠谱客的博主 神勇战斗机,最近开发中收集的这篇文章主要介绍6. 使用SpringBoot来开发基于Java的MQTT客户端1、环境准备2、使用SpringBoot进行开发,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
1、环境准备
参考上一篇: 5. ubuntu系统配置IDEA并创建Maven项目
2、使用SpringBoot进行开发
需要的前置SpringBoot知识:
SpringBoot学习教程
-
1、创建SpringBoot项目
点击下一步后,使用JAVA 8改个项目名字后,一直点下一步即可。 -
2、创建完后,把不需要的文件删除即可,然后再在pom.xml里进行编辑,右下方弹出Maven项目的包是否自动导入,点击Enable-Auto import自动导入
再把下述依赖添加到pom.xml中
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
- (MQTT单个客户端的开发) 3、添加客户端代码
public static void main(String[] args) {
String broker = "tcp://localhost:1883";
String clientId = "JavaSample";
MemoryPersistence persistence = new MemoryPersistence();
try{
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker:" + broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
String topic="demo/topics";
System.out.println("Subscribe to topic:" + topic);
sampleClient.subscribe(topic);
sampleClient.setCallback(new MqttCallback() {
public void messageArrived(String topic, MqttMessage message) throws Exception {
String theMsg = MessageFormat.format("{0} is arrived for topic {1}.", new String(message.getPayload()), topic);
System.out.println(theMsg);
}
public void deliveryComplete(IMqttDeliveryToken token) {
}
public void connectionLost(Throwable throwable) {
}
});
String content = "Message from MqttPublishSample";
int qos = 2;
System.out.println("Publishing message:" + content);
MqttMessage message =new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("Message published");
}catch(MqttException me){
System.out.println("reason" + me.getReasonCode());
System.out.println("msg" + me.getMessage());
System.out.println("loc" + me.getLocalizedMessage());
System.out.println("cause" + me.getCause());
System.out.println("excep" + me);
me.printStackTrace();
}
SpringApplication.run(MqttClient1Application.class, args);
}
运行后效果
参考示例1:使用 Java 开发 MQTT 客户端
参考示例2:MQTT Java客户端的使用
- (MQTT多客户端的开发) 4、添加客户端代码
首先,这是本次的项目结构,MQTT的构建放到了MyMqttClient里,之后用主函数去调用创建。
(pom.xml中添加的依赖与上方相同)
MyMqttClinet.java
package mqttclient2.mqtt_client_mulit_1;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.text.MessageFormat;
public class MyMqttClient {
public static MqttClient mqttClient = null;
private static MemoryPersistence memoryPersistence = null;
private static MqttConnectOptions mqttConnectOptions = null;
// static {
// init("123");
// }
public static void init(String clientId) {
//初始化连接设置对象
mqttConnectOptions = new MqttConnectOptions();
//初始化MqttClient
if(null != mqttConnectOptions) {
// true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态
mqttConnectOptions.setCleanSession(true);
// 设置连接超时
mqttConnectOptions.setConnectionTimeout(30);
// 设置持久化方式
memoryPersistence = new MemoryPersistence();
if(null != memoryPersistence && null != clientId) {
try {
mqttClient = new MqttClient("tcp://127.0.0.1:1883", clientId,memoryPersistence);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
}
}else {
System.out.println("mqttConnectOptions对象为空");
}
System.out.println(mqttClient.isConnected());
//设置连接和回调
if(null != mqttClient) {
if(!mqttClient.isConnected()) {
// 创建回调函数对象
// MqttReceriveCallback mqttReceriveCallback = new MqttReceriveCallback();
// 客户端添加回调函数
MqttCallback mqttCallback = new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
String theMsg = MessageFormat.format("{0} is arrived for topic {1}.", new String(mqttMessage.getPayload()), s);
System.out.println(theMsg);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
};
mqttClient.setCallback(mqttCallback);
// mqttClient.setCallback(mqttReceriveCallback);
// 创建连接
try {
System.out.println("create connection.");
mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}else {
System.out.println("mqttClient is empty.");
}
System.out.println(mqttClient.isConnected());
}
// 关闭连接
public void closeConnect() {
//关闭存储方式
if(null != memoryPersistence) {
try {
memoryPersistence.close();
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
System.out.println("memoryPersistence is null");
}
// 关闭连接
if(null != mqttClient) {
if(mqttClient.isConnected()) {
try {
mqttClient.disconnect();
mqttClient.close();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
System.out.println("mqttClient is not connect");
}
}else {
System.out.println("mqttClient is null");
}
}
// 发布消息
public void publishMessage(String pubTopic,String message,int qos) {
if(null != mqttClient&& mqttClient.isConnected()) {
System.out.println("Publish the message "+mqttClient.isConnected());
System.out.println("id:"+mqttClient.getClientId());
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(message.getBytes());
MqttTopic topic = mqttClient.getTopic(pubTopic);
if(null != topic) {
try {
MqttDeliveryToken publish = topic.publish(mqttMessage);
if(!publish.isComplete()) {
System.out.println("The message publish success!");
}
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}else {
reConnect();
}
}
// 重新连接
public void reConnect() {
if(null != mqttClient) {
if(!mqttClient.isConnected()) {
if(null != mqttConnectOptions) {
try {
mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
System.out.println("mqttConnectOptions is null");
}
}else {
System.out.println("mqttClient is null or connect");
}
}else {
init("123");
}
}
// 订阅主题
public void subTopic(String topic) {
if(null != mqttClient&& mqttClient.isConnected()) {
try {
mqttClient.subscribe(topic, 1);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
System.out.println("mqttClient is error");
}
}
// 清空主题
public void cleanTopic(String topic) {
if(null != mqttClient&& !mqttClient.isConnected()) {
try {
mqttClient.unsubscribe(topic);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
System.out.println("mqttClient is error");
}
}
}
MqttClientMulit1Application.java
package mqttclient2.mqtt_client_mulit_1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.ArrayList;
@SpringBootApplication
public class MqttClientMulit1Application {
public static void main(String[] args) {
// ArrayList<MyMqttClient> mqttClient = new ArrayList<MyMqttClient>();
MyMqttClient mqttClient1 = new MyMqttClient();
MyMqttClient mqttClient2 = new MyMqttClient();
mqttClient1.init("test_1");
mqttClient1.subTopic("world/test_1");
mqttClient1.publishMessage("world/test_1","hello:test_1",1);
mqttClient2.init("test_2");
mqttClient2.subTopic("world/test_1");
mqttClient2.publishMessage("world/test_1","hello:test_1",1);
SpringApplication.run(MqttClientMulit1Application.class, args);
}
}
实现效果
参考示例1:java 实现mqtt发送和接收消息 客户端代码
参考示例2:MQTT客户端实现
拓展知识:
Paho MqttClient 回调函数使用总结
mqtt入门指南 实战经验
最后
以上就是神勇战斗机为你收集整理的6. 使用SpringBoot来开发基于Java的MQTT客户端1、环境准备2、使用SpringBoot进行开发的全部内容,希望文章能够帮你解决6. 使用SpringBoot来开发基于Java的MQTT客户端1、环境准备2、使用SpringBoot进行开发所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复