mqtt协议介绍:
mqtt的客户端需要订阅一个主题(简单来说就是监听一个事件)client.subscribe,但是客户端订阅主题时也会触发服务器端的published事件(也就是发送消息事件,可以理解为需要先打个招呼),如果客户端需要发送消息给服务器端,需要触发client.send函数,服务器端就会触发published事件。服务器端需要给客户端广播消息时(广播就是给全部的用户发送消息)使用MqttServer.publish事件,客户端需要自定义一个事件给client事件
安装 mqtt 服务器必要依赖项 mosca
npm install mosca
服务器端mqtt
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28const mosca = require("mosca"); const MqttServer = new mosca.Server({ port: 1883 }); MqttServer.on("clientConnected", function(client) { //当有客户端连接时的回调. console.log("client connected", client.id); }); /** * 监听MQTT主题消息 * 当客户端有连接发布主题消息时 **/ MqttServer.on("published", function(packet, client) { var topic = packet.topic; switch (topic) { case "temperature": conso.log('监听到来自客户端的tempeture消息'); break; case "other": console.log("message-123", packet.payload.toString()); break; } }); MqttServer.on("ready", function() { //当服务开启时的回调 console.log("mqtt is running..."); });
客户端node代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15const mqtt = require("mqtt"); // const mqtt = require('./node_modules/mqtt/dist/mqtt.min.js') const client = mqtt.connect("mqtt://127.0.0.1:1883"); //指定服务端地址和端口 client.on("connect", function() { console.log("服务器连接成功"); // connected = client.connected client.subscribe("temperature", { qos: 1 }); //订阅主题为tempertrue的消息(监听该消息 //发布消息 client.send('temperature'); }); client.on("message", function(top, message) { console.log("当前topic:", top); console.log("当前温度:", message.toString()); });
服务端代码:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55var mosca = require('mosca'); var fs = require('fs'); //构建自带服务器 var dirname = "."; var MqttServer = new mosca.Server({ // port: 1883, interfaces: [{ type: "mqtt", port: 1883 }, { type: "http", port: 3000, bundle: true, static: "./public" }, ], }); //对服务器端口进行配置, 在此端口进行监听 MqttServer.on('clientConnected', function(client) { //监听连接 console.log('client connected', client.id); }); /** * 监听MQTT主题消息 **/ MqttServer.on('published', function(packet, client, username) { console.log('客户端有连接发布主题消息'); //当客户端有连接发布主题消息 //console.log('主题为:'+packet.payload); var topic = packet.topic; console.log("===================="); console.log(topic); console.log("===================="); switch (topic) { case 'wenshideng': console.log('后台监听到前台发来的温室灯消息'); MqttServer.publish({ topic: 'wenshidengclient', payload: 'sssss' }); break; } console.log('发布结束'); }); MqttServer.on('ready', function() { MqttServer.authenticate = authenticate; //当服务开启时 console.log('mqtt is running...'); }); var authenticate = function(client, username, password, callback) { console.log(2132121) var authorized = (username === '525' && password.toString() === '525'); if (authorized) client.user = username; callback(null, authorized); }
html代码:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177//定义连接对象 var client = null; //定义一个变量判断是否连接 var connected = false; //定义一个开关 var flag = false; //调用连接函数 connect(); //发送消息函数 logMessage("INFO", "开始运行"); // document.getElementById("publishMessageInput").value = "Hello"; //连接 function onConnect(){ console.log("正在连接"); var list = ['humidity', 'Temp','wenshidengclient'];//订阅包括两个主题 for (var i = 0; i < list.length; i++){ console.log("发布第" + i + "个主题"); console.log(list[i]); topic = list[i]; //发布主题 client.subscribe(topic); } send(); // client.subscribe(topic); } //发送 function send() { console.log('消息已经发送'); // var s = document.getElementById("msg").value; var s = "wengshideng"; if (s) { message = new Paho.Message(s); //设置主题 message.destinationName = "wenshideng";//topic //发送消息 client.send(message); } } //停止 function stop() { //清除定时器 window.clearInterval(window.tester); } //连接失败 function onFail(context) { //发送消息 logMessage("ERROR", "Failed to connect. [Error Message: ", context.errorMessage, "]"); //连接状态设置为false connected = false; console.log("onFail"); //??? connect(); } //连接 function connect(){ //配置连接的参数 var hostname = '49.122.47.230', //'192.168.1.2', port = 3000, clientId = "asd" + new Date().getTime(), timeout = 5, keepAlive = 100, cleanSession = false, user = '525', pass = '525', path = ''; if (path.length > 0) { client = new Paho.Client(hostname, Number(port), path, clientId); } else { client = new Paho.Client(hostname, Number(port), clientId); } // 配置连接的参数 var options = { invocationContext: { host: hostname, port: port, path: path, clientId: clientId }, timeout: timeout, keepAliveInterval: keepAlive, cleanSession: cleanSession, useSSL: false, userName: user, password: pass, reconnect: true, onSuccess: onConnect, onFailure: onFail }; //保存配置项的账号密码 if (user.length > 0) { options.userName = user; } if (pass.length > 0) { options.password = pass; } //??? client.onConnectionLost = onConnectionLost; client.onMessageArrived = onMessageArrived; client.connect(options); console.log('连接服务器端成功'); } //连接丢失 function onConnectionLost(responseObject) { console.log(responseObject); if (responseObject.errorCode !== 0) { console.log("onConnectionLost:" + responseObject.errorMessage); console.log("连接已断开"); } } //接收消息 function onMessageArrived(message) { if (message.destinationName == "wenshidengclient") {//订阅的主题如果是zdk1 // xinxi.temp = message.payloadString + "℃"; console.log("收到消息:" + message.payloadString); } else { // xinxi.zdk2 = message.payloadString + "%"; } } //断开链接 function disconnect() { logMessage("INFO", "Disconnecting from Server."); //断开连接 client.disconnect(); // var statusSpan = document.getElementById("connectionStatus"); // statusSpan.innerHTML = "Connection - Disconnected."; connected = false; setFormEnabledState(false); } //订阅 function subscribe() { var topic = "zdk"; console.log(topic) var qos = 0; logMessage("INFO", "Subscribing to: [Topic: ", topic, ", QoS: ", qos, "]"); //订阅主题 client.subscribe(topic, { qos: Number(qos) }); } //取消订阅 function unsubscribe() { var topic = "zdk"; logMessage("INFO", "Unsubscribing: [Topic: ", topic, "]"); client.unsubscribe(topic, { onSuccess: unsubscribeSuccess, onFailure: unsubscribeFailure, invocationContext: { topic: topic } }); } //未发送成功 function unsubscribeSuccess(context) { logMessage("INFO", "Unsubscribed. [Topic: ", context.invocationContext.topic, "]"); } //发送错误 function unsubscribeFailure(context) { logMessage("ERROR", "Failed to unsubscribe. [Topic: ", context.invocationContext.topic, ", Error: ", context.errorMessage, "]"); } function safeTagsRegex(str) { return str.replace(/&/g, "&").replace(/</g, "<").replace(/>/g, ">"); } function makeid() { var text = ""; var possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; for (var i = 0; i < 5; i++) text += possible.charAt(Math.floor(Math.random() * possible.length)); return "_" + text; } function logMessage(type, ...content) { // var consolePre = document.getElementById("consolePre"); var date = new Date(); var timeString = date.toUTCString(); var logMessage = timeString + " - " + type + " - " + content.join(""); // consolePre.innerHTML += logMessage + "n"; if (type === "INFO") { console.info(logMessage); } else if (type === "ERROR") { console.log();(logMessage); } else { console.log(logMessage); } }
最后
以上就是彩色过客最近收集整理的关于使用node开启一个mqtt服务器实现与网页的双向通信的全部内容,更多相关使用node开启一个mqtt服务器实现与网页内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复