Android MQTT的订阅和发布消息
MQTT协议简述
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是基于发布/订阅(Publish/Subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,有IBM在1999年发布.MQTT最大的优点在于:可以以极少的代码和有限的宽带,为连接远程设备提供可靠的消息服务.。作为一种开销、低宽带占用的即时通讯协议,在使其物联网、小型设备、移动应用等方面较广泛的应用。
MQTTMQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
特点
1、实现简单
2、提供数据传输的Qos
3、轻量、占用宽带低
4、有三种消息发布服务质量
- 0值:"至多一次",消息发布完成依赖底层TCP/IP网络。会发生消息丢失或重复。这一级可用于如下情况,坏境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送
- 1值:"至少一次",确保消息到达,但消息重复可能会发生。
- 2值:"只有一次",确保消息到达一次,这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5、可传输任意类型的数据
6、可保持的会话(session)
MQTT协议原理
1 MQTT协议实现方式
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
- (1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
- (2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
2 网络传输与应用消息
MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。
当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。
3 MQTT客户端
一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以:
- (1)发布其他客户端可能会订阅的信息;
- (2)订阅其它客户端发布的消息;
- (3)退订或删除应用程序的消息;
- (4)断开与服务器连接。
4 MQTT服务器
MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:
- (1)接受来自客户的网络连接;
- (2)接受客户发布的应用信息;
- (3)处理来自客户端的订阅和退订请求;
- (4)向订阅的客户转发应用程序消息。
5 MQTT协议中的订阅、主题、会话
一、订阅(Subscription)
订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。
注意:订阅myTopic话题,当订阅多条频道,需要遍历逐条订阅,否则有可能订阅失败。
二、会话(Session)
每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。
三、主题名(Topic Name)
连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。
四、主题筛选器(Topic Filter)
一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。
五、负载(Payload)
消息订阅者所具体接收的内容。
6 MQTT协议中的方法
MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:
- (1)Connect。等待与服务器建立连接。
- (2)Disconnect。等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。
- (3)Subscribe。等待完成订阅。
- (4)UnSubscribe。等待服务器取消客户端的一个或多个topics订阅。
- (5)Publish。MQTT客户端发送消息请求,发送完成后返回应用程序线程。
Android 下如何使用MQTT协议
- 导入mqtt包
- 配置MqttConnectOptions
- 调用connect并将配置好的参数写入
- 通过指定的消息进行消息订阅
- 向订阅topic对中发布消息
- 通过mqttCallBack的回调对接收到的消息进行处理
导入mqtt包
1
2
3
4
5
6
7
8
9
10allprojects { repositories { google() jcenter() maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" } } }
1
21 implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' 2 implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161public class MQTTService extends Service { public static final String TAG = MQTTService.class.getSimpleName(); private static MqttAndroidClient client; private MqttConnectOptions conOpt; private String host = "tcp://xxx"; //服务器地址 private String userName = "xxx"; //账号 private String passWord = " xxx"; //密码 private static String myTopic = "topic"; //频道名 private String clientId = "mqtt_client"; //客户端ID @Override public int onStartCommand(Intent intent, int flags, int startId) { init(); return super.onStartCommand(intent, flags, startId); } //发布消息 msg public static void publish(String msg){ String topic = myTopic; Integer qos = 2; Boolean retained = false; try { client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue()); } catch (MqttException e) { e.printStackTrace(); } } private void init() { // 服务器地址(协议+地址+端口号) String uri = host; client = new MqttAndroidClient(this, uri, clientId); // 设置MQTT监听并且接受消息 client.setCallback(mqttCallback); conOpt = new MqttConnectOptions(); // 清除缓存 conOpt.setCleanSession(true); // 设置超时时间,单位:秒 conOpt.setConnectionTimeout(10); // 心跳包发送间隔,单位:秒 conOpt.setKeepAliveInterval(20); // 用户名 conOpt.setUserName(userName); // 密码 conOpt.setPassword(passWord.toCharArray()); // last will message boolean doConnect = true; String message = "{"terminal_uid":"" + clientId + ""}"; String topic = myTopic; Integer qos = 2; Boolean retained = false; if ((!message.equals("")) || (!topic.equals(""))) { // 最后 try { conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue()); } catch (Exception e) { Log.i(TAG, "Exception Occured", e); doConnect = false; iMqttActionListener.onFailure(null, e); } } if (doConnect) { doClientConnection(); } } @Override public void onDestroy() { try { client.disconnect(); //服务销毁,断开连接 } catch (MqttException e) { e.printStackTrace(); } super.onDestroy(); } /** 连接MQTT服务器 */ private void doClientConnection() { if (!client.isConnected() && isConnectIsNomarl()) { try { client.connect(conOpt, null, iMqttActionListener); } catch (MqttException e) { e.printStackTrace(); } } } // MQTT是否连接成功 private IMqttActionListener iMqttActionListener = new IMqttActionListener() { @Override public void onSuccess(IMqttToken arg0) { Log.i(TAG, "连接成功 "); try { // 订阅myTopic话题,当订阅多条频道,需要遍历逐条订阅,否则有可能订阅失败 client.subscribe(myTopic,1); } catch (MqttException e) { e.printStackTrace(); } } @Override public void onFailure(IMqttToken arg0, Throwable arg1) { arg1.printStackTrace(); // 连接失败,重连 } }; // MQTT监听并且接受消息 private MqttCallback mqttCallback = new MqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String str1 = new String(message.getPayload()); MQTTMessage msg = new MQTTMessage(); //自定义接口 msg.setMessage(str1); //订阅信息,接收的信息message String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained(); Log.i(TAG, "messageArrived:" + str1); Log.i(TAG, str2); } @Override public void deliveryComplete(IMqttDeliveryToken arg0) { } @Override public void connectionLost(Throwable arg0) { // 失去连接,重连 } }; /** 判断网络是否连接 */ private boolean isConnectIsNomarl() { ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE); NetworkInfo info = connectivityManager.getActiveNetworkInfo(); if (info != null && info.isAvailable()) { String name = info.getTypeName(); Log.i(TAG, "MQTT当前网络名称:" + name); return true; } else { Log.i(TAG, "MQTT 没有可用网络"); return false; } } @Nullable @Override public IBinder onBind(Intent intent) { return null; } }
连接成功后,就可以实现和服务端消息的发送和接收。
最后在AndroidManifest.xml文件
注册Services
1
21 <service android:name="org.eclipse.paho.android.service.MqttService" /> <!--导入包,Android自带Mqtt的服务,需要注册--> 2 <service android:name=".mqtttest.MQService"/> <!--自己开启的服务-->
权限
1
2
3
4
5
61 <uses-permission android:name="android.permission.WAKE_LOCK" /> 2 <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" /> 3 <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" /> 4 <uses-permission android:name="android.permission.READ_PHONE_STATE" /> 5 <uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" /> 6 <uses-permission android:name="android.permission.INTERNET" />
完结!
最后
以上就是默默小天鹅最近收集整理的关于Android MQTT的订阅和发布消息Android MQTT的订阅和发布消息的全部内容,更多相关Android内容请搜索靠谱客的其他文章。
发表评论 取消回复