我是靠谱客的博主 神勇战斗机,最近开发中收集的这篇文章主要介绍6. 使用SpringBoot来开发基于Java的MQTT客户端1、环境准备2、使用SpringBoot进行开发,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1、环境准备

参考上一篇: 5. ubuntu系统配置IDEA并创建Maven项目

2、使用SpringBoot进行开发

需要的前置SpringBoot知识:
SpringBoot学习教程

  • 1、创建SpringBoot项目
    在这里插入图片描述
    点击下一步后,使用JAVA 8改个项目名字后,一直点下一步即可。

  • 2、创建完后,把不需要的文件删除即可,然后再在pom.xml里进行编辑,右下方弹出Maven项目的包是否自动导入,点击Enable-Auto import自动导入
    在这里插入图片描述
    再把下述依赖添加到pom.xml中

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
</dependency>

在这里插入图片描述

  • (MQTT单个客户端的开发) 3、添加客户端代码
public static void main(String[] args) {
        String broker = "tcp://localhost:1883";
        String clientId = "JavaSample";
        MemoryPersistence persistence = new MemoryPersistence();

        try{
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("Connecting to broker:" + broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");

            String topic="demo/topics";
            System.out.println("Subscribe to topic:" + topic);
            sampleClient.subscribe(topic);

            sampleClient.setCallback(new MqttCallback() {
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    String theMsg = MessageFormat.format("{0} is arrived for topic {1}.", new String(message.getPayload()), topic);
                    System.out.println(theMsg);
                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                }

                public void connectionLost(Throwable throwable) {
                }
            });

            String content = "Message from MqttPublishSample";
            int qos = 2;
            System.out.println("Publishing message:" + content);
            MqttMessage message =new MqttMessage(content.getBytes());
            message.setQos(qos);
            sampleClient.publish(topic, message);
            System.out.println("Message published");

        }catch(MqttException me){
            System.out.println("reason" + me.getReasonCode());
            System.out.println("msg" + me.getMessage());
            System.out.println("loc" + me.getLocalizedMessage());
            System.out.println("cause" + me.getCause());
            System.out.println("excep" + me);
            me.printStackTrace();
        }
        SpringApplication.run(MqttClient1Application.class, args);
    }

运行后效果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
参考示例1:使用 Java 开发 MQTT 客户端
参考示例2:MQTT Java客户端的使用

  • (MQTT多客户端的开发) 4、添加客户端代码
    首先,这是本次的项目结构,MQTT的构建放到了MyMqttClient里,之后用主函数去调用创建。
    (pom.xml中添加的依赖与上方相同)
    在这里插入图片描述

MyMqttClinet.java

package mqttclient2.mqtt_client_mulit_1;


import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.text.MessageFormat;


public class MyMqttClient  {

    public static MqttClient mqttClient = null;
    private static MemoryPersistence memoryPersistence = null;
    private static MqttConnectOptions mqttConnectOptions = null;


//    static {
//        init("123");
//    }


    public static  void init(String clientId) {
        //初始化连接设置对象
        mqttConnectOptions = new MqttConnectOptions();
        //初始化MqttClient
        if(null != mqttConnectOptions) {
//			true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态
            mqttConnectOptions.setCleanSession(true);
//			设置连接超时
            mqttConnectOptions.setConnectionTimeout(30);
//			设置持久化方式
            memoryPersistence = new MemoryPersistence();
            if(null != memoryPersistence && null != clientId) {
                try {
                    mqttClient = new MqttClient("tcp://127.0.0.1:1883", clientId,memoryPersistence);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }else {

            }
        }else {
            System.out.println("mqttConnectOptions对象为空");
        }

        System.out.println(mqttClient.isConnected());
        //设置连接和回调
        if(null != mqttClient) {
            if(!mqttClient.isConnected()) {

//			创建回调函数对象
 //               MqttReceriveCallback mqttReceriveCallback = new MqttReceriveCallback();
//			客户端添加回调函数
                MqttCallback mqttCallback = new MqttCallback() {
                    @Override
                    public void connectionLost(Throwable throwable) {

                    }

                    @Override
                    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                        String theMsg = MessageFormat.format("{0} is arrived for topic {1}.", new String(mqttMessage.getPayload()), s);
                        System.out.println(theMsg);
                    }

                    @Override
                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

                    }
                };
                mqttClient.setCallback(mqttCallback);
 //               mqttClient.setCallback(mqttReceriveCallback);
//			创建连接
                try {
                    System.out.println("create connection.");
                    mqttClient.connect(mqttConnectOptions);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        }else {
            System.out.println("mqttClient is empty.");
        }
        System.out.println(mqttClient.isConnected());
    }

    //	关闭连接
    public void closeConnect() {
        //关闭存储方式
        if(null != memoryPersistence) {
            try {
                memoryPersistence.close();
            } catch (MqttPersistenceException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }else {
            System.out.println("memoryPersistence is null");
        }

//		关闭连接
        if(null != mqttClient) {
            if(mqttClient.isConnected()) {
                try {
                    mqttClient.disconnect();
                    mqttClient.close();
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }else {
                System.out.println("mqttClient is not connect");
            }
        }else {
            System.out.println("mqttClient is null");
        }
    }

    //	发布消息
    public void publishMessage(String pubTopic,String message,int qos) {
        if(null != mqttClient&& mqttClient.isConnected()) {
            System.out.println("Publish the message "+mqttClient.isConnected());
            System.out.println("id:"+mqttClient.getClientId());
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setQos(qos);
            mqttMessage.setPayload(message.getBytes());

            MqttTopic topic = mqttClient.getTopic(pubTopic);

            if(null != topic) {
                try {
                    MqttDeliveryToken publish = topic.publish(mqttMessage);
                    if(!publish.isComplete()) {
                        System.out.println("The message publish success!");
                    }
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

        }else {
            reConnect();
        }

    }
    //	重新连接
    public void reConnect() {
        if(null != mqttClient) {
            if(!mqttClient.isConnected()) {
                if(null != mqttConnectOptions) {
                    try {
                        mqttClient.connect(mqttConnectOptions);
                    } catch (MqttException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }else {
                    System.out.println("mqttConnectOptions is null");
                }
            }else {
                System.out.println("mqttClient is null or connect");
            }
        }else {
            init("123");
        }

    }
    //	订阅主题
    public void subTopic(String topic) {
        if(null != mqttClient&& mqttClient.isConnected()) {
            try {
                mqttClient.subscribe(topic, 1);
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }else {
            System.out.println("mqttClient is error");
        }
    }


    //	清空主题
    public void cleanTopic(String topic) {
        if(null != mqttClient&& !mqttClient.isConnected()) {
            try {
                mqttClient.unsubscribe(topic);
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }else {
            System.out.println("mqttClient is error");
        }
    }

}

MqttClientMulit1Application.java

package mqttclient2.mqtt_client_mulit_1;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.ArrayList;

@SpringBootApplication
public class MqttClientMulit1Application {

    public static void main(String[] args) {

//        ArrayList<MyMqttClient> mqttClient = new ArrayList<MyMqttClient>();
        MyMqttClient mqttClient1 = new MyMqttClient();
        MyMqttClient mqttClient2 = new MyMqttClient();
        
        mqttClient1.init("test_1");
        mqttClient1.subTopic("world/test_1");
        mqttClient1.publishMessage("world/test_1","hello:test_1",1);

        mqttClient2.init("test_2");
        mqttClient2.subTopic("world/test_1");
        mqttClient2.publishMessage("world/test_1","hello:test_1",1);

        SpringApplication.run(MqttClientMulit1Application.class, args);
    }

}

实现效果
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
参考示例1:java 实现mqtt发送和接收消息 客户端代码
参考示例2:MQTT客户端实现

拓展知识:
Paho MqttClient 回调函数使用总结
mqtt入门指南 实战经验

最后

以上就是神勇战斗机为你收集整理的6. 使用SpringBoot来开发基于Java的MQTT客户端1、环境准备2、使用SpringBoot进行开发的全部内容,希望文章能够帮你解决6. 使用SpringBoot来开发基于Java的MQTT客户端1、环境准备2、使用SpringBoot进行开发所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部