概述
mqtt是轻量级基于代理的发布/订阅的消息传输协议。
作为安卓开发而言,我们一般将其用来作为推送协议或者将其作为与嵌入式通信的协议。
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。
1.轻量级的 machine-to-machine 通信协议。
2.publish/subscribe模式。
3.基于TCP/IP。
4.支持QoS。QoS(Quality of Service,服务质量)
5.适合于低带宽、不可靠连接、嵌入式设备、CPU内存资源紧张。
无意中看到个Demo的地址下载Demo.
上面这个解释有些我也是不懂,今天讲的主要还是怎么使用这个MQTT来接收推送信息
接收MQTT消息分几步,就跟把大象放进冰箱里一样
第一步:与MQTT服务端建立一个心跳的长连接,这个连接是干啥的呢,他就是维持连接用的,主要用于PUBLISH(发布态)消息的
代码有点多哈,其实没多少是有用的,createConnectAsync就是创建连接用的,这里的MyMqttCallback回调基本没什么太大的用,继续向下看实际业务中的处理吧
public class HeartbeatMqttManager {
private static final String TAG = HeartbeatMqttManager.class.getName();
private static HeartbeatMqttManager mInstance = null;
private MqttConfig mConfig;
private String mGroupId;
private String mClientId;
private String mTopic;
private MqttAsyncClient mClient;
private MyMqttCallback mCallback;
private ScheduledExecutorService mScheduledExecutorService;
private HeartbeatMqttManager() {
this.mConfig = new MqttConfig();
mGroupId = mConfig.getHeartbeatGroupId();
mTopic = mConfig.getHeartbeatTopic();
String deviceId = MainApplication.getInstance().getDeviceId();
mClientId = "GID_" + mGroupId + "@@@ClientID_" + deviceId;
Log.d(TAG, "mClientId = " + mClientId);
Log.d(TAG, "mTopic = " + mTopic);
mScheduledExecutorService = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("MQTT-Heartbeat-thread-%d").daemon(true).build());
mScheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
if (MainApplication.getInstance().getUserCurrent() == null || mClient == null || !mClient.isConnected()) {
return;
}
ReqHeartBeat bean = new ReqHeartBeat();
bean.setSpeed("0.0");
bean.setAppType("B");
bean.setAppVersion(MainApplication.getInstance().getPackageVersionName());
bean.setDeviceId(MainApplication.getInstance().getDeviceId());
bean.setDeviceVersion("2.0.0.0");
bean.setEmpName(MainApplication.getInstance().getUserCurrent().getPerson_name());
bean.setEmpNo(MainApplication.getInstance().getUserCurrent().getErp_person_no());
bean.setOrgId(MainApplication.getInstance().getUserCurrent().getOrg_no());
bean.setLatitude(MainApplication.getInstance().getLocationLatitude());
//bean.setLatitude("4.9E-324");
bean.setHeight("0");
//bean.setHeight("4.9E-324");
bean.setLongitude(MainApplication.getInstance().getLocationLongitude());
// bean.setLongitude("4.9E-324");
bean.setImeiNo(MainApplication.getInstance().getDeviceId());
bean.setAllocaterFlag(3);
//bean.setGmtModified(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
bean.setGmtModified("");
bean.setCreateUserId(MainApplication.getInstance().getUserCurrent().getPerson_id());
bean.setEndRetType(4);
//String msg = "{jsonBody:" + JsonUtils.toJson(bean) + "}";
String paramAnonymousMessage = JsonUtils.toJson(bean);
HashMap<String, String> localHashMap = new HashMap<String, String>();
localHashMap.put("jsonBody", paramAnonymousMessage);
paramAnonymousMessage = JsonUtils.toJson(localHashMap);
Log.d(TAG, paramAnonymousMessage);
publish(mTopic, 2, paramAnonymousMessage.getBytes());
} catch (Exception e) {
Log.e(TAG, e.getMessage(), e);
}
}
}, 5, 60, TimeUnit.SECONDS);
}
public static HeartbeatMqttManager getInstance() {
if (mInstance == null) {
mInstance = new HeartbeatMqttManager();
}
return mInstance;
}
public static void release() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
if (mInstance != null) {
mInstance.disConnect();
mInstance = null;
Log.e(TAG, "release:" + TAG);
}
} catch (Exception ex) {
Log.e(TAG, "release:" + ex.getMessage());
}
}
});
thread.start();
}
public void createConnectAsync() {
AsyncTask task =new AsyncTask() {
@Override
protected Object doInBackground(Object[] objects) {
MemoryPersistence localMemoryPersistence = new MemoryPersistence();
try {
mClient = new MqttAsyncClient(mConfig.getBroker(), mClientId, localMemoryPersistence);
MqttConnectOptions connOpt = new MqttConnectOptions();
connOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
connOpt.setUserName(mConfig.getAcessKey());
connOpt.setServerURIs(new String[]{mConfig.getBroker()});
connOpt.setPassword(MacSignature.macSignature("GID_" + mGroupId, mConfig.getSecretKey()).toCharArray());
connOpt.setCleanSession(false);
connOpt.setKeepAliveInterval(100);
connOpt.setAutomaticReconnect(true);
mCallback = new MyMqttCallback();
mClient.setCallback(mCallback);
mClient.connect(connOpt);
Log.i(TAG, "createConnect: " + mConfig.getBroker());
} catch (Exception ex) {
Log.e(TAG, ex.getMessage(), ex);
}
return null;
}
};
task.execute();
}
public void disConnect() throws MqttException {
if (mScheduledExecutorService != null) {
mScheduledExecutorService.shutdown();
mScheduledExecutorService = null;
}
if ((this.mClient != null)) {
try {
this.mClient.disconnect();
} catch (Exception e) {
this.mClient.disconnectForcibly();
e.printStackTrace();
} finally {
this.mClient.close();
}
}
}
public boolean isConnected() {
return this.mClient != null && this.mClient.isConnected();
}
public boolean publish(String topic, int qos, byte[] paramArrayOfByte) {
if (this.mClient == null) {
return false;
}
try {
MqttMessage publicMsg = new MqttMessage(paramArrayOfByte);
publicMsg.setQos(qos);
this.mClient.publish(topic, publicMsg);
Log.d(TAG, "Publishing to topic "" + topic + "" qos " + qos);
return true;
} catch (MqttException ex) {
Log.e(TAG, ex.getMessage(), ex);
}
return false;
}
public boolean subscribe(String topic, int qos) {
if (this.mClient == null) {
return false;
}
try {
this.mClient.subscribe(topic, qos);
Log.d(TAG, "Subscribing to topic "" + topic + "" qos " + qos);
return true;
} catch (MqttException ex) {
Log.e(TAG, ex.getMessage(), ex);
}
return false;
}
private class MyMqttCallback implements MqttCallbackExtended {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
Log.d(TAG, "connectComplete[" + reconnect + "]:" + serverURI);
}
@Override
public void connectionLost(Throwable cause) {
Log.d(TAG, "connectionLost:" + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.d(TAG, "messageArrived[" + topic + "]:" + message.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.d(TAG, "deliveryComplete[" + token.getTopics()[0] + "]:" + token.getMessageId());
}
}
}
下面来看下实际业务中的处理代码,实际业务的代码我都给删掉了
这里的MyMqttCallback就是MQTT到达我们程序后的接收地点了,里边的 messageArrived便是数据的到达地,对数据加以处理即可
public class QuickReceiveMqttManager {
private static final String TAG = QuickReceiveMqttManager.class.getName();
private static QuickReceiveMqttManager mInstance = null;
private MqttConfig mConfig;
private String mGroupId;
private String mTopic;
private String mClientId;
private MqttAsyncClient mClient;
private MyMqttCallback mCallback;
private QuickReceiveMqttManager() {
this.mConfig = new MqttConfig();
mGroupId = mConfig.getQuickReceiveGroupId();
mTopic = mConfig.getQuickReceiveTopic();
String deviceId = MainApplication.getInstance().getDeviceId();
mClientId = "GID_" + mGroupId + "@@@ClientID_" + deviceId;
Log.d(TAG, "mClientId = " + mClientId);
Log.d(TAG, "mTopic = " + mTopic);
}
public static QuickReceiveMqttManager getInstance() {
if (mInstance == null) {
mInstance = new QuickReceiveMqttManager();
}
return mInstance;
}
public static void release() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
if (mInstance != null) {
mInstance.disConnect();
mInstance = null;
Log.e(TAG, "release:" + TAG);
}
} catch (Exception ex) {
Log.e(TAG, "release:" + ex.getMessage());
}
}
});
thread.start();
}
public void createConnectAsync() {
AsyncTask task =new AsyncTask() {
@Override
protected Object doInBackground(Object[] objects) {
MemoryPersistence localMemoryPersistence = new MemoryPersistence();
try {
mClient = new MqttAsyncClient(mConfig.getBroker(), mClientId, localMemoryPersistence);
MqttConnectOptions connOpt = new MqttConnectOptions();
connOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
connOpt.setUserName(mConfig.getAcessKey());
connOpt.setServerURIs(new String[]{mConfig.getBroker()});
connOpt.setPassword(MacSignature.macSignature("GID_" + mGroupId, mConfig.getSecretKey()).toCharArray());
connOpt.setCleanSession(false);
connOpt.setKeepAliveInterval(100);
connOpt.setAutomaticReconnect(true);
mCallback = new MyMqttCallback();
mClient.setCallback(mCallback);
mClient.connect(connOpt);
Log.i(TAG, "createConnect: " + mConfig.getBroker());
} catch (Exception ex) {
Log.e(TAG, ex.getMessage(), ex);
}
return null;
}
};
task.execute();
}
public void disConnect() throws MqttException {
if ((this.mClient != null)) {
try {
this.mClient.disconnect();
} catch (Exception e) {
this.mClient.disconnectForcibly();
e.printStackTrace();
} finally {
this.mClient.close();
}
}
}
public boolean isConnected() {
return this.mClient != null && this.mClient.isConnected();
}
public boolean publish(String topic, int qos, byte[] paramArrayOfByte) {
if (this.mClient == null) {
return false;
}
try {
MqttMessage publicMsg = new MqttMessage(paramArrayOfByte);
publicMsg.setQos(qos);
this.mClient.publish(topic, publicMsg);
Log.d(TAG, "Publishing to topic "" + topic + "" qos " + qos);
return true;
} catch (MqttException ex) {
Log.e(TAG, ex.getMessage(), ex);
}
return false;
}
public boolean subscribe(String topic, int qos) {
if (this.mClient == null) {
return false;
}
try {
this.mClient.subscribe(topic, qos);
Log.d(TAG, "Subscribing to topic "" + topic + "" qos " + qos);
return true;
} catch (MqttException ex) {
Log.e(TAG, ex.getMessage(), ex);
}
return false;
}
private class MyMqttCallback implements MqttCallbackExtended {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
Log.d(TAG, "connectComplete[" + reconnect + "]:" + serverURI);
if (mClient != null && mClient.isConnected()) {
subscribe(mTopic, 2);
}
}
@Override
public void connectionLost(Throwable cause) {
Log.d(TAG, "connectionLost:" + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.d(TAG, "messageArrived[" + topic + "]:" + message.toString());
// if (mTopic.equals(topic)) {
CommonUtils.writeLog("mqtt", "QuickReceiveMqttManager -->>> messageArrived-topic:[" + topic + "]-mytopic:[" + mTopic + "]n" + message + "n");
saveData(message);
// }
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.d(TAG, "deliveryComplete[" + token.getTopics()[0] + "]:" + token.getMessageId());
}
}
private void saveData(MqttMessage message) {
// 实际业务中的处理
}
最后
以上就是花痴鸵鸟为你收集整理的Android中MQTT的使用(MQTT的使用大同小异)的全部内容,希望文章能够帮你解决Android中MQTT的使用(MQTT的使用大同小异)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复