继上个星期之后,这个星期有花了两三天的时间来搞emqtt的ssl/tls加密。
主流程还是参考了:https://blog.csdn.net/a704397849/article/details/88885198#commentsedit, 到最后一步用mosquitto_sub订阅消息的时候始终有问题:
联系了参考文章的作者,并且加了QQ,想不到是个很热情的人。他重新走了一下流程并且确定是没有问题的,给了我一份客户端的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164package com.zkong.mqttssl; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import javax.net.SocketFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; public class MqttTLSTest { static MqttClientCallback mqttClientCallback = new MqttClientCallback(); static MqttAsyncClient mqttClient = null; static String username = "stan"; //注意这里 填你自己的mqtt账号密码 static String password = "123456"; //注意这里 填你自己的mqtt账号密码 //String broker = "tcp://xxx.xx.xxx.xxx:1883"; //注意这里要填自己mqtt服务器所在地址 static String broker = "ssl://192.168.100.117:8883"; //注意这里要填自己mqtt服务器所在地址 public static void main(String[] args) throws InterruptedException { start(); while (true){ Thread.sleep(10000); } } public static void start() { String clientId = "mqttserver" + String.valueOf(System.currentTimeMillis()); try { mqttClient = new MqttAsyncClient(broker, clientId, new MemoryPersistence()); mqttClient.setCallback(mqttClientCallback); //订阅 连接mqtt服务器 subscribeConnect(); //发布 连接mqtt服务器 //... 略 } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } public static void subscribeConnect() { System.out.println("订阅连接"); if (mqttClient != null) { try { MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setMaxInflight(100000); connOpts.setUserName(username); connOpts.setPassword(password.toCharArray()); //ssl 连接 , 这里的 TrustManager 是自己实现的,没有去校验服务端的证书 TrustManager[] trustAllCerts = new TrustManager[1]; TrustManager tm = new MyTM(); trustAllCerts[0] = tm; SSLContext sc = SSLContext.getInstance("SSL"); sc.init(null, trustAllCerts, null); SocketFactory factory = sc.getSocketFactory(); connOpts.setSocketFactory(factory); // mqttClient.connect(connOpts, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { try { //订阅 topic 为test 的消息,消息质量1 mqttClient.subscribe("test", 1); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { System.out.println("mqtt 没有连接上:" + exception.getMessage()); exception.printStackTrace(); } }); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } catch (KeyManagementException e) { e.printStackTrace(); } } } //MyTM 是自己实现的认证管理类,里面并有校验服务端的证书就返回true,永久成功! static class MyTM implements TrustManager, X509TrustManager { @Override public X509Certificate[] getAcceptedIssuers() { return null; } public boolean isServerTrusted(X509Certificate[] certs) { return true; } public boolean isClientTrusted(X509Certificate[] certs) { return true; } @Override public void checkServerTrusted(X509Certificate[] certs, String authType) throws CertificateException { return; } @Override public void checkClientTrusted(X509Certificate[] certs, String authType) throws CertificateException { return; } } public static class MqttClientCallback implements MqttCallback{ @Override public void connectionLost(Throwable arg0) { System.out.println("mqtt 失去了连接"); } @Override public void deliveryComplete(IMqttDeliveryToken arg0) { System.out.println("mqtt 发送完成!"); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String content = new String(message.getPayload(), "utf-8"); System.out.println("收到mqtt消息,topic: "+topic+" ,content: "+content); } } }
用这份客户端代码订阅消息,是没有问题的,因为我的服务器开启了基于mysql的认证,所以必须提供username和password。
要彻底走通的,估计还得找一台干净的机器重新安装emqtt。
有些问题在此记录一下:
关于mosquitto
它跟emqtt一样,本身也是一个mqtt的broker,在windows上安装完之后作为一个服务存在,当它处于启动状态时,会占掉1883端口导致emqtt无法完全启动,可以在Windows的服务管理中将其关闭:
Centos7上安装openssl
https://www.cnblogs.com/rxbook/p/9367725.html
Centos7上安装mosquitto
https://blog.csdn.net/xj178926426/article/details/78832296
未走通流程:
双向认证:https://blog.csdn.net/zljintan/article/details/83619309
自带证书验证(使用MQTTBox作为客户端):https://www.cnblogs.com/lexiaofei/p/8403995.html
转载于:https://my.oschina.net/u/4042451/blog/3103012
最后
以上就是兴奋人生最近收集整理的关于Emqtt的ssl/tls加密的全部内容,更多相关Emqtt内容请搜索靠谱客的其他文章。
发表评论 取消回复