我是靠谱客的博主 花痴鸵鸟,最近开发中收集的这篇文章主要介绍Android中MQTT的使用(MQTT的使用大同小异),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

mqtt是轻量级基于代理的发布/订阅的消息传输协议。

作为安卓开发而言,我们一般将其用来作为推送协议或者将其作为与嵌入式通信的协议。

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

1.轻量级的 machine-to-machine 通信协议。
2.publish/subscribe模式。
3.基于TCP/IP。
4.支持QoS。QoS(Quality of Service,服务质量)
5.适合于低带宽、不可靠连接、嵌入式设备、CPU内存资源紧张。

无意中看到个Demo的地址下载Demo.

上面这个解释有些我也是不懂,今天讲的主要还是怎么使用这个MQTT来接收推送信息

接收MQTT消息分几步,就跟把大象放进冰箱里一样
第一步:与MQTT服务端建立一个心跳的长连接,这个连接是干啥的呢,他就是维持连接用的,主要用于PUBLISH(发布态)消息的

代码有点多哈,其实没多少是有用的,createConnectAsync就是创建连接用的,这里的MyMqttCallback回调基本没什么太大的用,继续向下看实际业务中的处理吧

public class HeartbeatMqttManager {
    private static final String TAG = HeartbeatMqttManager.class.getName();
    private static HeartbeatMqttManager mInstance = null;
    private MqttConfig mConfig;
    private String mGroupId;
    private String mClientId;
    private String mTopic;
    private MqttAsyncClient mClient;
    private MyMqttCallback mCallback;
    private ScheduledExecutorService mScheduledExecutorService;

    private HeartbeatMqttManager() {
        this.mConfig = new MqttConfig();
        mGroupId = mConfig.getHeartbeatGroupId();
        mTopic = mConfig.getHeartbeatTopic();
        String deviceId = MainApplication.getInstance().getDeviceId();
        mClientId = "GID_" + mGroupId + "@@@ClientID_" + deviceId;
        Log.d(TAG, "mClientId = " + mClientId);
        Log.d(TAG, "mTopic = " + mTopic);

        mScheduledExecutorService = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("MQTT-Heartbeat-thread-%d").daemon(true).build());
        mScheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    if (MainApplication.getInstance().getUserCurrent() == null || mClient == null || !mClient.isConnected()) {
                        return;
                    }
                    ReqHeartBeat bean = new ReqHeartBeat();
                    bean.setSpeed("0.0");
                    bean.setAppType("B");
                    bean.setAppVersion(MainApplication.getInstance().getPackageVersionName());
                    bean.setDeviceId(MainApplication.getInstance().getDeviceId());
                    bean.setDeviceVersion("2.0.0.0");
                    bean.setEmpName(MainApplication.getInstance().getUserCurrent().getPerson_name());
                    bean.setEmpNo(MainApplication.getInstance().getUserCurrent().getErp_person_no());
                    bean.setOrgId(MainApplication.getInstance().getUserCurrent().getOrg_no());
                    bean.setLatitude(MainApplication.getInstance().getLocationLatitude());
                    //bean.setLatitude("4.9E-324");
                    bean.setHeight("0");
                    //bean.setHeight("4.9E-324");
                    bean.setLongitude(MainApplication.getInstance().getLocationLongitude());
                    // bean.setLongitude("4.9E-324");
                    bean.setImeiNo(MainApplication.getInstance().getDeviceId());
                    bean.setAllocaterFlag(3);
                    //bean.setGmtModified(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
                    bean.setGmtModified("");
                    bean.setCreateUserId(MainApplication.getInstance().getUserCurrent().getPerson_id());
                    bean.setEndRetType(4);
                    //String msg = "{jsonBody:" + JsonUtils.toJson(bean) + "}";
                    String paramAnonymousMessage = JsonUtils.toJson(bean);
                    HashMap<String, String> localHashMap = new HashMap<String, String>();
                    localHashMap.put("jsonBody", paramAnonymousMessage);
                    paramAnonymousMessage = JsonUtils.toJson(localHashMap);
                    Log.d(TAG, paramAnonymousMessage);
                    publish(mTopic, 2, paramAnonymousMessage.getBytes());
                } catch (Exception e) {
                    Log.e(TAG, e.getMessage(), e);
                }
            }
        }, 5, 60, TimeUnit.SECONDS);
    }

    public static HeartbeatMqttManager getInstance() {
        if (mInstance == null) {
            mInstance = new HeartbeatMqttManager();
        }
        return mInstance;
    }

    public static void release() {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    if (mInstance != null) {
                        mInstance.disConnect();
                        mInstance = null;
                        Log.e(TAG, "release:" + TAG);
                    }
                } catch (Exception ex) {
                    Log.e(TAG, "release:" + ex.getMessage());
                }
            }
        });
        thread.start();
    }

    public void createConnectAsync() {
        AsyncTask task =new AsyncTask() {
            @Override
            protected Object doInBackground(Object[] objects) {
                MemoryPersistence localMemoryPersistence = new MemoryPersistence();
                try {
                    mClient = new MqttAsyncClient(mConfig.getBroker(), mClientId, localMemoryPersistence);
                    MqttConnectOptions connOpt = new MqttConnectOptions();
                    connOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
                    connOpt.setUserName(mConfig.getAcessKey());
                    connOpt.setServerURIs(new String[]{mConfig.getBroker()});
                    connOpt.setPassword(MacSignature.macSignature("GID_" + mGroupId, mConfig.getSecretKey()).toCharArray());
                    connOpt.setCleanSession(false);
                    connOpt.setKeepAliveInterval(100);
                    connOpt.setAutomaticReconnect(true);
                    mCallback = new MyMqttCallback();
                    mClient.setCallback(mCallback);
                    mClient.connect(connOpt);
                    Log.i(TAG, "createConnect: " + mConfig.getBroker());
                } catch (Exception ex) {
                    Log.e(TAG, ex.getMessage(), ex);
                }
                return null;
            }
        };
        task.execute();
    }

    public void disConnect() throws MqttException {
        if (mScheduledExecutorService != null) {
            mScheduledExecutorService.shutdown();
            mScheduledExecutorService = null;
        }
        if ((this.mClient != null)) {
            try {
                this.mClient.disconnect();
            } catch (Exception e) {
                this.mClient.disconnectForcibly();
                e.printStackTrace();
            } finally {
                this.mClient.close();
            }
        }
    }

    public boolean isConnected() {
        return this.mClient != null && this.mClient.isConnected();
    }

    public boolean publish(String topic, int qos, byte[] paramArrayOfByte) {
        if (this.mClient == null) {
            return false;
        }
        try {
            MqttMessage publicMsg = new MqttMessage(paramArrayOfByte);
            publicMsg.setQos(qos);
            this.mClient.publish(topic, publicMsg);
            Log.d(TAG, "Publishing to topic "" + topic + "" qos " + qos);
            return true;
        } catch (MqttException ex) {
            Log.e(TAG, ex.getMessage(), ex);
        }
        return false;
    }

    public boolean subscribe(String topic, int qos) {
        if (this.mClient == null) {
            return false;
        }
        try {
            this.mClient.subscribe(topic, qos);
            Log.d(TAG, "Subscribing to topic "" + topic + "" qos " + qos);
            return true;
        } catch (MqttException ex) {
            Log.e(TAG, ex.getMessage(), ex);
        }
        return false;
    }

    private class MyMqttCallback implements MqttCallbackExtended {

        @Override
        public void connectComplete(boolean reconnect, String serverURI) {
            Log.d(TAG, "connectComplete[" + reconnect + "]:" + serverURI);
        }

        @Override
        public void connectionLost(Throwable cause) {
            Log.d(TAG, "connectionLost:" + cause.getMessage());
        }

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            Log.d(TAG, "messageArrived[" + topic + "]:" + message.toString());
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            Log.d(TAG, "deliveryComplete[" + token.getTopics()[0] + "]:" + token.getMessageId());
        }
    }
}

下面来看下实际业务中的处理代码,实际业务的代码我都给删掉了
这里的MyMqttCallback就是MQTT到达我们程序后的接收地点了,里边的 messageArrived便是数据的到达地,对数据加以处理即可

public class QuickReceiveMqttManager {
    private static final String TAG = QuickReceiveMqttManager.class.getName();
    private static QuickReceiveMqttManager mInstance = null;
    private MqttConfig mConfig;
    private String mGroupId;
    private String mTopic;
    private String mClientId;
    private MqttAsyncClient mClient;
    private MyMqttCallback mCallback;

    private QuickReceiveMqttManager() {
        this.mConfig = new MqttConfig();
        mGroupId = mConfig.getQuickReceiveGroupId();
        mTopic = mConfig.getQuickReceiveTopic();
        String deviceId = MainApplication.getInstance().getDeviceId();
        mClientId = "GID_" + mGroupId + "@@@ClientID_" + deviceId;
        Log.d(TAG, "mClientId = " + mClientId);
        Log.d(TAG, "mTopic = " + mTopic);
    }

    public static QuickReceiveMqttManager getInstance() {
        if (mInstance == null) {
            mInstance = new QuickReceiveMqttManager();
        }
        return mInstance;
    }

    public static void release() {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    if (mInstance != null) {
                        mInstance.disConnect();
                        mInstance = null;
                        Log.e(TAG, "release:" + TAG);
                    }
                } catch (Exception ex) {
                    Log.e(TAG, "release:" + ex.getMessage());
                }
            }
        });
        thread.start();
    }

    public void createConnectAsync() {
        AsyncTask task =new AsyncTask() {
            @Override
            protected Object doInBackground(Object[] objects) {
                MemoryPersistence localMemoryPersistence = new MemoryPersistence();
                try {
                    mClient = new MqttAsyncClient(mConfig.getBroker(), mClientId, localMemoryPersistence);
                    MqttConnectOptions connOpt = new MqttConnectOptions();
                    connOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
                    connOpt.setUserName(mConfig.getAcessKey());
                    connOpt.setServerURIs(new String[]{mConfig.getBroker()});
                    connOpt.setPassword(MacSignature.macSignature("GID_" + mGroupId, mConfig.getSecretKey()).toCharArray());
                    connOpt.setCleanSession(false);
                    connOpt.setKeepAliveInterval(100);
                    connOpt.setAutomaticReconnect(true);
                    mCallback = new MyMqttCallback();
                    mClient.setCallback(mCallback);
                    mClient.connect(connOpt);
                    Log.i(TAG, "createConnect: " + mConfig.getBroker());

                } catch (Exception ex) {
                    Log.e(TAG, ex.getMessage(), ex);
                }
                return null;
            }
        };
        task.execute();
    }

    public void disConnect() throws MqttException {
        if ((this.mClient != null)) {
            try {
                this.mClient.disconnect();
            } catch (Exception e) {
                this.mClient.disconnectForcibly();
                e.printStackTrace();
            } finally {
                this.mClient.close();
            }
        }
    }

    public boolean isConnected() {
        return this.mClient != null && this.mClient.isConnected();
    }

    public boolean publish(String topic, int qos, byte[] paramArrayOfByte) {
        if (this.mClient == null) {
            return false;
        }
        try {
            MqttMessage publicMsg = new MqttMessage(paramArrayOfByte);
            publicMsg.setQos(qos);
            this.mClient.publish(topic, publicMsg);
            Log.d(TAG, "Publishing to topic "" + topic + "" qos " + qos);
            return true;
        } catch (MqttException ex) {
            Log.e(TAG, ex.getMessage(), ex);
        }
        return false;
    }

    public boolean subscribe(String topic, int qos) {
        if (this.mClient == null) {
            return false;
        }
        try {
            this.mClient.subscribe(topic, qos);
            Log.d(TAG, "Subscribing to topic "" + topic + "" qos " + qos);
            return true;
        } catch (MqttException ex) {
            Log.e(TAG, ex.getMessage(), ex);
        }
        return false;
    }

    private class MyMqttCallback implements MqttCallbackExtended {

        @Override
        public void connectComplete(boolean reconnect, String serverURI) {
            Log.d(TAG, "connectComplete[" + reconnect + "]:" + serverURI);
            if (mClient != null && mClient.isConnected()) {
                subscribe(mTopic, 2);
            }
        }

        @Override
        public void connectionLost(Throwable cause) {
            Log.d(TAG, "connectionLost:" + cause.getMessage());
        }

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            Log.d(TAG, "messageArrived[" + topic + "]:" + message.toString());
//            if (mTopic.equals(topic)) {
            CommonUtils.writeLog("mqtt", "QuickReceiveMqttManager -->>> messageArrived-topic:[" + topic + "]-mytopic:[" + mTopic + "]n" + message + "n");
            saveData(message);
//            }
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            Log.d(TAG, "deliveryComplete[" + token.getTopics()[0] + "]:" + token.getMessageId());
        }
    }

    private void saveData(MqttMessage message) {
        // 实际业务中的处理
    }

最后

以上就是花痴鸵鸟为你收集整理的Android中MQTT的使用(MQTT的使用大同小异)的全部内容,希望文章能够帮你解决Android中MQTT的使用(MQTT的使用大同小异)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(73)

评论列表共有 0 条评论

立即
投稿
返回
顶部