我是靠谱客的博主 负责鼠标,最近开发中收集的这篇文章主要介绍MQTT简单使用总结前言一、MQTT工具二、使用步骤总结,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

MQTT简单使用总结

  • 前言
  • 一、MQTT工具
  • 二、使用步骤
    • 1.引入库
    • 2.注入配置信息
    • 3.MQTT消费者
    • 4.MQTT回调函数
  • 总结


前言

本文介绍MQTT消息队列的简单使用

一、MQTT工具

MQTT的工具有很多,这里选择MQTTX测试: MQTTX客户端

二、使用步骤

1.引入库

       <dependency>
           <groupId>org.springframework.integration</groupId>
           <artifactId>spring-integration-mqtt</artifactId>
       </dependency>

2.注入配置信息

mqtt:
  host: tcp://127.0.0.1
  clientid: admin
  username: admin
  password: admin
  timeout: 10000
  keepalive: 60

这里选择自己的配置信息

public class MqttCofigBean {

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.host}")
    private String hostUrl;

    @Value("${mqtt.clientid}")
    private String clientId;

    @Value("${mqtt.timeout}")
    private int completionTimeout;
}

3.MQTT消费者

主要功能是初始化连接信息,推送MQTT消息,订阅MQTT主题.

@Slf4j
@Component
public class MqttConsumer implements ApplicationRunner {


    private static MqttClient client;

    private static MqttTopic mqttTopic;

    private static volatile MqttConsumer mqttConsumer = null;

    public static MqttConsumer getInstance() {

        if (null == mqttConsumer) {
            synchronized (MqttConsumer.class) {
                if (null == mqttConsumer) {
                    mqttConsumer = new MqttConsumer();
                }
            }
        }
        return mqttConsumer;

    }

    /**
     * MQTT连接属性配置对象
     */
    @Autowired
    public MqttCofigBean mqttCofigBean;

    /**
     * 初始化参数配置
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("初始化启动MQTT连接");
        this.connect();
    }

    /**
     * 用来连接服务器
     * @throws Exception 异常
     */
    private void connect() throws Exception {
        client = new MqttClient(mqttCofigBean.getHostUrl(), mqttCofigBean.getClientId() + "_" + StrUtil.uuid(), new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(mqttCofigBean.getUsername());
        options.setPassword(mqttCofigBean.getPassword().toCharArray());
        //是否清除session
        options.setCleanSession(true);
        // 设置超时时间
        options.setConnectionTimeout(30);
        // 设置会话心跳时间
        options.setKeepAliveInterval(60);
        try {
            String[] msgtopic = MqttCofigBean.SUB_TOPICS;
            int[] qos = new int[msgtopic.length];
            client.setCallback(new TopMsgCallback(client, options, msgtopic, qos));
            client.connect(options);
            log.info("MQTT连接成功:" + mqttCofigBean.getClientId());
            if (msgtopic.length != 0 && !"".equals(msgtopic[0])) {
                //订阅消息
                for (int i = 0; i < msgtopic.length; i++) {
                    qos[i] = 0;
                }
                client.subscribe(msgtopic, qos);
            }

        } catch (Exception e) {
            log.error("MQTT连接异常:" + e);
        }
    }

    /**
     * 重新连接
     * @throws Exception 异常
     */
    public void reConnect() throws Exception {
        if (null != client) {
            this.connect();
        }
    }

    /**
     * 发布,默认qos为0,非持久化
     *
     * @param topic       主题
     * @param pushMessage 推送消息
     */
    public void publish(String topic, String pushMessage) throws Exception {
        publish(0, false, topic, pushMessage);
    }

    /**
     * 发布
     *
     * @param qos         qos
     * @param retained    保留
     * @param topic       主题
     * @param pushMessage 推送消息
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = client.getTopic(topic);
        if (null == mTopic) {
            log.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 订阅某个主题
     *
     * @param topic 主题
     * @param qos   qos
     */
    public void subscribe(String topic, int qos) {
        try {
            log.info("topic:" + topic);
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    public MqttClient getClient() {
        return client;
    }

    public MqttTopic getMqttTopic() {
        return mqttTopic;
    }
}

4.MQTT回调函数

回调函数主要是负责,连接中断后重连,收到消息的处理,也可以使用监听器的方式,来处理收到消息的业务在处理.

@Slf4j
public class TopMsgCallback implements MqttCallback {

    /**
     * MqttClient
     */
    private MqttClient client;
    /**
     * MqttConnectOptions
     */
    private MqttConnectOptions options;
    /**
     * topic数组
     */
    private String[] topic;
    /**
     * qos
     */
    private int[] qos;


    public TopMsgCallback() {
    }

    public TopMsgCallback(MqttClient client, MqttConnectOptions options) {
        this.client = client;
        this.options = options;
    }

    public TopMsgCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
        this.client = client;
        this.options = options;
        this.topic = topic;
        this.qos = qos;
    }

    /**
     * 断开重连
     */
    @Override
    public void connectionLost(Throwable cause) {
        log.info("MQTT连接断开,发起重连");
        while (true) {
            try {
                Thread.sleep(30000);
                client.connect(options);
                log.info("MQTT重新连接成功:" + client);
                String[] msgtopic = MqttCofigBean.SUB_TOPICS;
                int[] qos = new int[msgtopic.length];
                if (msgtopic.length != 0 && !"".equals(msgtopic[0])) {
                    //订阅消息
                    for (int i = 0; i < msgtopic.length; i++) {
                        qos[i] = 0;
                    }
                    client.subscribe(msgtopic, qos);
                }
                break;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    /**
     * 接收到消息调用令牌中调用
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
    }

    /**
     * 消息处理
     * 封装数据,储存存活设备存入Redis,推送数据到kafka
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        JSONObject jsonObject = JSONObject.parseObject(JSONObject.toJSONString(message));
        // TODO 升级结果推送至业务处理
        log.info("接收到的消息是{}", message);
    }
}

总结

没有总结

最后

以上就是负责鼠标为你收集整理的MQTT简单使用总结前言一、MQTT工具二、使用步骤总结的全部内容,希望文章能够帮你解决MQTT简单使用总结前言一、MQTT工具二、使用步骤总结所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(45)

评论列表共有 0 条评论

立即
投稿
返回
顶部