添加权限
复制代码
1
2
3
4
5
6
7
8<uses-permission android:name="android.permission.INTERNET" /> <uses-permission android:name="android.permission.WAKE_LOCK" /> <!-- 允许程序访问WiFi网络信息 --> <uses-permission android:name="android.permission.ACCESS_WIFI_STATE" /> <uses-permission android:name="android.permission.CHANGE_WIFI_STATE" /> <!-- 允许程序获取网络状态 --> <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
添加依赖
复制代码
1
2
3
4
5
6
7/** MQTT 通信 */ implementation group: 'org.eclipse.paho', name: 'org.eclipse.paho.client.mqttv3', version: '1.2.2' implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' implementation 'androidx.localbroadcastmanager:localbroadcastmanager:1.0.0' implementation 'com.blankj:utilcode:1.30.0'
MQTTService
类
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255public class MQTTService extends Service { public static final String TAG = MQTTService.class.getSimpleName(); private static MqttAndroidClient client; private MqttConnectOptions conOpt; private boolean reConnectedCode = true; private String host = "tcp://xxx.xxx.xxx.xxx:1883"; private String userName = "username"; private String passWord = "password"; private static String myTopic = "/XXX/xxx";//要订阅的主题 private static String sendTopic = "/XXX/xxx";//要发布控制主题 private String clientId = "clientId_" + System.currentTimeMillis();//客户端标识 private IGetMessageCallBack mGetMessageCallBack; static IGetMessageCallBack mcallBack; @Override public void onCreate() { super.onCreate(); Log.e(getClass().getName(), "onCreate"); init(); } public static void pubSatelish(String msg) { String topic = sendTopic; Integer qos = 0; Boolean retained = false; try { if (client != null) { client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue()); } } catch (MqttException e) { e.printStackTrace(); } } public static void publish(String msg, IGetMessageCallBack callBack) { mcallBack = callBack; String topic = sendTopic; Integer qos = 0; Boolean retained = false; if (client != null) { try { client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue()); } catch (MqttException e) { e.printStackTrace(); } } } private void init() { // 服务器地址(协议+地址+端口号) String uri = host; client = new MqttAndroidClient(this, uri, clientId); // 设置MQTT监听并且接受消息 client.setCallback(mqttCallback); conOpt = new MqttConnectOptions(); //设置为false可以在服务器断开后不用再手动连接 conOpt.setCleanSession(true); // 设置超时时间 单位为秒 //设置为0,防止 ERROR o.e.p.c.mqttv3.internal.ClientState - Timed out as no activity 错误 conOpt.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 conOpt.setKeepAliveInterval(20); //设置自动重连 conOpt.setAutomaticReconnect(true); // 用户名 conOpt.setUserName(userName); // 密码 conOpt.setPassword(passWord.toCharArray()); //将字符串转换为字符串数组 // last will message boolean doConnect = true; String message = "{"terminal_uid":"" + clientId + ""}"; Log.e(getClass().getName(), "message是:" + message); String topic = myTopic; Integer qos = 0; Boolean retained = false; if ((!message.equals("")) || (!topic.equals(""))) { // 最后的遗嘱 // MQTT本身就是为信号不稳定的网络设计的,所以难免一些客户端会无故的和Broker断开连接。 //当客户端连接到Broker时,可以指定LWT,Broker会定期检测客户端是否有异常。 //当客户端异常掉线时,Broker就往连接时指定的topic里推送当时指定的LWT消息。 try { conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue()); } catch (Exception e) { Log.i(TAG, "Exception Occured", e); doConnect = false; iMqttActionListener.onFailure(null, e); } } if (doConnect) { doClientConnection(); } } @Override public void onDestroy() { reConnectedCode = false; // 此处解决MQTT退出异常问题 if (client != null) { client.unregisterResources(); client.close(); } super.onDestroy(); } /** * 连接MQTT服务器 */ private void doClientConnection() { Log.i(TAG, "连接................... "); if (!getClientConnected() && isConnectIsNormal()) { try { Log.i(TAG, "连接中................... "); client.connect(conOpt, null, iMqttActionListener); } catch (MqttException e) { e.printStackTrace(); } } } private boolean getClientConnected() { try { return client.isConnected(); } catch (Exception e) { e.printStackTrace(); return false; } } // MQTT是否连接成功 private IMqttActionListener iMqttActionListener = new IMqttActionListener() { @Override public void onSuccess(IMqttToken arg0) { Log.i(TAG, "连接成功 "); try { // 订阅myTopic话题 Log.i(TAG, "SUB:" + myTopic); client.subscribe(myTopic, 0); } catch (MqttException e) { e.printStackTrace(); Log.e(TAG, "sube:" + e.getMessage()); } } @Override public void onFailure(IMqttToken arg0, Throwable arg1) { arg1.printStackTrace(); // 连接失败,重连 } }; // MQTT监听并且接受消息 private MqttCallback mqttCallback = new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { Log.i(TAG, "连接成功 "); try { // 订阅myTopic话题 Log.i(TAG, "SUB:" + myTopic); client.subscribe(myTopic, 0); } catch (MqttException e) { e.printStackTrace(); Log.e(TAG, "sube:" + e.getMessage()); } } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String str1 = new String(message.getPayload()); Log.i(TAG, "mGetMessageCallBack:" + mGetMessageCallBack); if (mGetMessageCallBack != null) { mGetMessageCallBack.setMessage(str1); } if (mcallBack != null) { mcallBack.setMessage(str1); } String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained(); Log.i(TAG, "messageArrived:" + str1); Log.i(TAG, str2); } @Override public void deliveryComplete(IMqttDeliveryToken arg0) { } @Override public void connectionLost(Throwable arg0) { // 失去连接,重连 Log.i(TAG, "失去连接,重连................."); // reConnecte(); } }; /** * 判断网络是否连接 */ private boolean isConnectIsNormal() { ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext() .getSystemService(Context.CONNECTIVITY_SERVICE); NetworkInfo info = connectivityManager.getActiveNetworkInfo(); if (info != null && info.isAvailable()) { String name = info.getTypeName(); Log.i(TAG, "MQTT当前网络名称:" + name); return true; } else { Log.i(TAG, "MQTT 没有可用网络"); return false; } } @Override public IBinder onBind(Intent intent) { Log.e(getClass().getName(), "onBind"); return new CustomBinder(); } public void setIGetMessageCallBack(IGetMessageCallBack IGetMessageCallBack) { this.mGetMessageCallBack = IGetMessageCallBack; } public class CustomBinder extends Binder { public MQTTService getService() { return MQTTService.this; } } @Override public boolean onUnbind(Intent intent) { return super.onUnbind(intent); } }
消息回调接口
复制代码
1
2
3
4public interface IGetMessageCallBack { public void setMessage(String message); }
MQTT 连接服务类 MyServiceConnection
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class MyServiceConnection implements ServiceConnection { private MQTTService mqttService; private IGetMessageCallBack mIGetMessageCallBack; @Override public void onServiceConnected(ComponentName componentName, IBinder iBinder) { mqttService = ((MQTTService.CustomBinder) iBinder).getService(); mqttService.setIGetMessageCallBack(mIGetMessageCallBack); } @Override public void onServiceDisconnected(ComponentName componentName) { } public MQTTService getMqttService() { return mqttService; } public void setIGetMessageCallBack(IGetMessageCallBack IGetMessageCallBack) { mIGetMessageCallBack = IGetMessageCallBack; } }
初始化MQTT 绑定服务 到 Activity
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13// 此处申请绑定 private void initConnectTion() { serviceConnection = new MyServiceConnection(); serviceConnection.setIGetMessageCallBack(new IGetMessageCallBack() { @Override public void setMessage(String message) { //接收到消息回调 } }); Intent intent = new Intent(this, MQTTService.class); bindService(intent, serviceConnection, Context.BIND_AUTO_CREATE); }
销毁服务
复制代码
1
2
3
4
5
6
7
8
9
10@Override protected void onDestroy() { // 要在此处进行解绑 if (serviceConnection != null) { unbindService(serviceConnection); } super.onDestroy(); }
注册service
复制代码
1
2
3<service android:name="org.eclipse.paho.android.service.MqttService" /> <service android:name=".service.MQTTService" android:enabled="true" android:exported="true"/>
最后
以上就是长情天空最近收集整理的关于android 实现mqtt消息推送,以及断线重连的问题解决的全部内容,更多相关android内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复