我是靠谱客的博主 勤奋电灯胆,最近开发中收集的这篇文章主要介绍android mqtt详解_MQTT基于安卓的使用,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前言

上一篇提到了MQTT的通用方式,由于智能家居TV的项目网络波动频繁,通用的方式已经无法满足需求,经常会出现重复订阅导致收到多条消息,那就只能另辟蹊径了,最终找到了梦寐以求的MqttAndroidClient。

1.集成

集成方式和上一篇的MQTT简介和使用要新增配置,build.gradle新增

implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

manifest文件里面需要注册服务

2.MqttAndroidClient重要源码解析

MqttAndroidClient是专门对MQTTClient的再封装拓展类,包含了订阅、连接以及多线程的处理,直接看MqttAndroidClient对于连接的封装源码,非关键代码已省略

public IMqttToken connect(MqttConnectOptions options, Object userContext,

IMqttActionListener callback) throws MqttException {

if (mqttService == null) {

}

else {

pool.execute(new Runnable() {

@Override

public void run() {

doConnect();

}

});

}

return token;

}

doConnect()连接操作放在子线程,有效避免网络波动连接时间过长阻塞主线程

private void doConnect() {

...

mqttService.connect(clientHandle, connectOptions, null,

activityToken);

...

}

阿里专门针对安卓客户端写了一个MQTTService,方便统一管理,除了连接操作,重连,断开连接都是在MQTTService中完成。

public void connect(String clientHandle, MqttConnectOptions connectOptions,

String invocationContext, String activityToken)throws MqttSecurityException, MqttException {

MqttConnection client = getConnection(clientHandle);

client.connect(connectOptions, null, activityToken);

}

public void connect(MqttConnectOptions options, String invocationContext,

String activityToken) {

...

if (myClient != null) {

if (isConnecting ) {

}else if(!disconnected){

}

else {

service.traceDebug(TAG, "myClient != null and the client is not connected");

service.traceDebug(TAG,"Do Real connect!");

setConnectingState(true);

myClient.connect(connectOptions, invocationContext, listener);

}

}

...

}

public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)throws MqttException, MqttSecurityException {

final String methodName = "connect";

if (comms.isConnected()) {

throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);

}

if (comms.isConnecting()) {

throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);

}

if (comms.isDisconnecting()) {

throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);

}

if (comms.isClosed()) {

throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);

}

...

connectActionListener.connect();

return userToken;

}

源码自身对于isConnected、isConnecting、isDisconnecting、isClosed做了异常处理,避免正在连接或者断开连接时连接造成重复连接。后面的源码就没贴的必要了,就是开启一个连接线程。

3.MqttAndroidClient的使用

一行代码完成MQTT的连接

mqttAndroidClient.connect(mqttConnectOptions, null, iMqttActionListener);

当然,这样是远远不够的,在实际应用中发现有一个问题,断网一段时间后重连网络MQTT不会自动重连,所以还得我们来做手动优化。思路很简单,在断开的节点开启重连线程,连接成功后关闭重连线程。以下贴的是完整代码,主要注意重连机制和订阅前记得取消订阅再订阅。(部分包含自己的代码,可以忽略,注释很详细)

public class MQTTManager {

private Context mContext;

private MqttAndroidClient mqttAndroidClient;

private String clientId;//自定义

private MqttConnectOptions mqttConnectOptions;

private ScheduledExecutorService reconnectPool;//重连线程池

public MQTTManager(Context mContext) {

this.mContext = mContext;

}

public void buildClient() {

closeMQTT();//先关闭上一个连接

buildMQTTClient();

}

private IMqttActionListener iMqttActionListener = new IMqttActionListener() {

@Override

public void onSuccess(IMqttToken asyncActionToken) {

TVLog.i("connect-"+"onSuccess");

closeReconnectTask();

subscribeToTopic();

}

@Override

public void onFailure(IMqttToken asyncActionToken, Throwable exception) {

//connect-onFailure-MqttException (0) - java.net.UnknownHostException

TVLog.i("connect-"+ "onFailure-"+exception);

startReconnectTask();

}

};

private MqttCallback mqttCallback = new MqttCallback() {

@Override

public void connectionLost(Throwable cause) {

//close-connectionLost-等待来自服务器的响应时超时 (32000)

//close-connectionLost-已断开连接 (32109)

TVLog.i("close-"+"connectionLost-"+cause);

if (cause != null) {//null表示被关闭

startReconnectTask();

}

}

@Override

public void messageArrived(String topic, MqttMessage message) throws Exception {

String body = new String(message.getPayload());

TVLog.i("messageArrived-"+message.getId()+"-"+body);

}

@Override

public void deliveryComplete(IMqttDeliveryToken token) {

try {

TVLog.i("deliveryComplete-"+token.getMessage().toString());

} catch (MqttException e) {

e.printStackTrace();

}

}

};

private void buildMQTTClient(){

mqttAndroidClient = new MqttAndroidClient(mContext, MQTTCons.Broker, clientId);

mqttAndroidClient.setCallback(mqttCallback);

mqttConnectOptions = new MqttConnectOptions();

mqttConnectOptions.setConnectionTimeout(10);

mqttConnectOptions.setKeepAliveInterval(20);

mqttConnectOptions.setCleanSession(true);

try {

mqttConnectOptions.setUserName("Signature|" + MQTTCons.AcessKey + "|" + MQTTCons.instanceId);

mqttConnectOptions.setPassword(MacSignature.macAndSignature(clientId, MQTTCons.SecretKey).toCharArray());

} catch (Exception e) {

}

doClientConnection();

}

private synchronized void startReconnectTask(){

if (reconnectPool != null)return;

reconnectPool = Executors.newScheduledThreadPool(1);

reconnectPool.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

doClientConnection();

}

} , 0 , 5*1000 , TimeUnit.MILLISECONDS);

}

private synchronized void closeReconnectTask(){

if (reconnectPool != null) {

reconnectPool.shutdownNow();

reconnectPool = null;

}

}

/**

* 连接MQTT服务器

*/

private synchronized void doClientConnection() {

if (!mqttAndroidClient.isConnected()) {

try {

mqttAndroidClient.connect(mqttConnectOptions, null, iMqttActionListener);

TVLog.d("mqttAndroidClient-connecting-"+mqttAndroidClient.getClientId());

} catch (MqttException e) {

e.printStackTrace();

}

}

}

private void subscribeToTopic() {//订阅之前会取消订阅,避免重连导致重复订阅

try {

String registerTopic = "";//自定义

String controlTopic = "";//自定义

String[] topicFilter=new String[]{registerTopic , controlTopic };

int[] qos={0,0};

mqttAndroidClient.unsubscribe(topicFilter, null, new IMqttActionListener() {

@Override

public void onSuccess(IMqttToken asyncActionToken) {

TVLog.i("unsubscribe-"+"success");

}

@Override

public void onFailure(IMqttToken asyncActionToken, Throwable exception) {

TVLog.i("unsubscribe-"+"failed-"+exception);

}

});

mqttAndroidClient.subscribe(topicFilter, qos, null, new IMqttActionListener() {

@Override

public void onSuccess(IMqttToken asyncActionToken) {//订阅成功

TVLog.i("subscribe-"+"success");

}

@Override

public void onFailure(IMqttToken asyncActionToken, Throwable exception) {

// startReconnectTask();

TVLog.i("subscribe-"+"failed-"+exception);

}

});

} catch (MqttException ex) {

}

}

public void sendMQTT(String topicSep, String msg) {

try {

if (mqttAndroidClient == null)return;

MqttMessage message = new MqttMessage();

message.setPayload(msg.getBytes());

String topic = "";//自定义

mqttAndroidClient.publish(topic, message, null, new IMqttActionListener() {

@Override

public void onSuccess(IMqttToken asyncActionToken) {

// TVLog.i("sendMQTT-"+"success:" + msg);

}

@Override

public void onFailure(IMqttToken asyncActionToken, Throwable exception) {

// startReconnectTask();

TVLog.i("sendMQTT-"+"failed:" + msg);

}

});

} catch (MqttException e) {

}

}

public void closeMQTT(){

closeReconnectTask();

if (mqttAndroidClient != null){

try {

mqttAndroidClient.unregisterResources();

mqttAndroidClient.disconnect();

TVLog.i("closeMQTT-"+mqttAndroidClient.getClientId());

mqttAndroidClient = null;

} catch (MqttException e) {

e.printStackTrace();

}

}

}

}

调用方式如下,想要做线程安全的单例的可以自己封装

if (mqttManager == null)

mqttManager = new MQTTManager(getApplicationContext());

mqttManager.buildClientId();

结语

基于安卓部分也算是完结了,里面也夹杂着一些源码解释,后续会写更多关于源码的解析。

最后

以上就是勤奋电灯胆为你收集整理的android mqtt详解_MQTT基于安卓的使用的全部内容,希望文章能够帮你解决android mqtt详解_MQTT基于安卓的使用所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(46)

评论列表共有 0 条评论

立即
投稿
返回
顶部