概述
mqtt协议介绍:
mqtt的客户端需要订阅一个主题(简单来说就是监听一个事件)client.subscribe,但是客户端订阅主题时也会触发服务器端的published事件(也就是发送消息事件,可以理解为需要先打个招呼),如果客户端需要发送消息给服务器端,需要触发client.send函数,服务器端就会触发published事件。服务器端需要给客户端广播消息时(广播就是给全部的用户发送消息)使用MqttServer.publish事件,客户端需要自定义一个事件给client事件
安装 mqtt 服务器必要依赖项 mosca
npm install mosca
服务器端mqtt
const mosca = require("mosca");
const MqttServer = new mosca.Server({
port: 1883
});
MqttServer.on("clientConnected", function(client) {
//当有客户端连接时的回调.
console.log("client connected", client.id);
});
/**
* 监听MQTT主题消息
* 当客户端有连接发布主题消息时
**/
MqttServer.on("published", function(packet, client) {
var topic = packet.topic;
switch (topic) {
case "temperature":
conso.log('监听到来自客户端的tempeture消息');
break;
case "other":
console.log("message-123", packet.payload.toString());
break;
}
});
MqttServer.on("ready", function() {
//当服务开启时的回调
console.log("mqtt is running...");
});
客户端node代码
const mqtt = require("mqtt");
// const mqtt = require('./node_modules/mqtt/dist/mqtt.min.js')
const client = mqtt.connect("mqtt://127.0.0.1:1883"); //指定服务端地址和端口
client.on("connect", function() {
console.log("服务器连接成功");
// connected = client.connected
client.subscribe("temperature", { qos: 1 }); //订阅主题为tempertrue的消息(监听该消息
//发布消息
client.send('temperature');
});
client.on("message", function(top, message) {
console.log("当前topic:", top);
console.log("当前温度:", message.toString());
});
服务端代码:
var mosca = require('mosca');
var fs = require('fs');
//构建自带服务器
var dirname = ".";
var MqttServer = new mosca.Server({
//
port: 1883,
interfaces: [{
type: "mqtt",
port: 1883
},
{
type: "http",
port: 3000,
bundle: true,
static: "./public"
},
],
});
//对服务器端口进行配置, 在此端口进行监听
MqttServer.on('clientConnected', function(client) {
//监听连接
console.log('client connected', client.id);
});
/**
* 监听MQTT主题消息
**/
MqttServer.on('published', function(packet, client, username) {
console.log('客户端有连接发布主题消息');
//当客户端有连接发布主题消息
//console.log('主题为:'+packet.payload);
var topic = packet.topic;
console.log("====================");
console.log(topic);
console.log("====================");
switch (topic) {
case 'wenshideng':
console.log('后台监听到前台发来的温室灯消息');
MqttServer.publish({ topic: 'wenshidengclient', payload: 'sssss' });
break;
}
console.log('发布结束');
});
MqttServer.on('ready', function() {
MqttServer.authenticate = authenticate;
//当服务开启时
console.log('mqtt is running...');
});
var authenticate = function(client, username, password, callback) {
console.log(2132121)
var authorized = (username === '525' && password.toString() === '525');
if (authorized) client.user = username;
callback(null, authorized);
}
html代码:
//定义连接对象
var client = null;
//定义一个变量判断是否连接
var connected = false;
//定义一个开关
var flag = false;
//调用连接函数
connect();
//发送消息函数
logMessage("INFO", "开始运行");
// document.getElementById("publishMessageInput").value = "Hello";
//连接
function onConnect(){
console.log("正在连接");
var list = ['humidity', 'Temp','wenshidengclient'];//订阅包括两个主题
for (var i = 0; i < list.length; i++){
console.log("发布第" + i + "个主题");
console.log(list[i]);
topic = list[i];
//发布主题
client.subscribe(topic);
}
send();
// client.subscribe(topic);
}
//发送
function send() {
console.log('消息已经发送');
// var s = document.getElementById("msg").value;
var s = "wengshideng";
if (s) {
message = new Paho.Message(s);
//设置主题
message.destinationName = "wenshideng";//topic
//发送消息
client.send(message);
}
}
//停止
function stop() {
//清除定时器
window.clearInterval(window.tester);
}
//连接失败
function onFail(context) {
//发送消息
logMessage("ERROR", "Failed to connect. [Error Message: ", context.errorMessage, "]");
//连接状态设置为false
connected = false;
console.log("onFail");
//???
connect();
}
//连接
function connect(){
//配置连接的参数
var hostname = '49.122.47.230', //'192.168.1.2',
port = 3000,
clientId = "asd" + new Date().getTime(),
timeout = 5,
keepAlive = 100,
cleanSession = false,
user = '525',
pass = '525',
path = '';
if (path.length > 0) {
client = new Paho.Client(hostname, Number(port), path, clientId);
} else {
client = new Paho.Client(hostname, Number(port), clientId);
}
// 配置连接的参数
var options = {
invocationContext: { host: hostname, port: port, path: path, clientId: clientId },
timeout: timeout,
keepAliveInterval: keepAlive,
cleanSession: cleanSession,
useSSL: false,
userName: user,
password: pass,
reconnect: true,
onSuccess: onConnect,
onFailure: onFail
};
//保存配置项的账号密码
if (user.length > 0) {
options.userName = user;
}
if (pass.length > 0) {
options.password = pass;
}
//???
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;
client.connect(options);
console.log('连接服务器端成功');
}
//连接丢失
function onConnectionLost(responseObject) {
console.log(responseObject);
if (responseObject.errorCode !== 0) {
console.log("onConnectionLost:" + responseObject.errorMessage);
console.log("连接已断开");
}
}
//接收消息
function onMessageArrived(message) {
if (message.destinationName == "wenshidengclient") {//订阅的主题如果是zdk1
// xinxi.temp = message.payloadString + "℃";
console.log("收到消息:" + message.payloadString);
}
else {
// xinxi.zdk2 = message.payloadString + "%";
}
}
//断开链接
function disconnect() {
logMessage("INFO", "Disconnecting from Server.");
//断开连接
client.disconnect();
// var statusSpan = document.getElementById("connectionStatus");
// statusSpan.innerHTML = "Connection - Disconnected.";
connected = false;
setFormEnabledState(false);
}
//订阅
function subscribe() {
var topic = "zdk";
console.log(topic)
var qos = 0;
logMessage("INFO", "Subscribing to: [Topic: ", topic, ", QoS: ", qos, "]");
//订阅主题
client.subscribe(topic, { qos: Number(qos) });
}
//取消订阅
function unsubscribe() {
var topic = "zdk";
logMessage("INFO", "Unsubscribing: [Topic: ", topic, "]");
client.unsubscribe(topic, {
onSuccess: unsubscribeSuccess,
onFailure: unsubscribeFailure,
invocationContext: { topic: topic }
});
}
//未发送成功
function unsubscribeSuccess(context) {
logMessage("INFO", "Unsubscribed. [Topic: ", context.invocationContext.topic, "]");
}
//发送错误
function unsubscribeFailure(context) {
logMessage("ERROR", "Failed to unsubscribe. [Topic: ", context.invocationContext.topic, ", Error: ", context.errorMessage, "]");
}
function safeTagsRegex(str) {
return str.replace(/&/g, "&").replace(/</g, "<").replace(/>/g, ">");
}
function makeid() {
var text = "";
var possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
for (var i = 0; i < 5; i++)
text += possible.charAt(Math.floor(Math.random() * possible.length));
return "_" + text;
}
function logMessage(type, ...content) {
// var consolePre = document.getElementById("consolePre");
var date = new Date();
var timeString = date.toUTCString();
var logMessage = timeString + " - " + type + " - " + content.join("");
// consolePre.innerHTML += logMessage + "n";
if (type === "INFO") {
console.info(logMessage);
} else if (type === "ERROR") {
console.log();(logMessage);
} else {
console.log(logMessage);
}
}
最后
以上就是彩色过客为你收集整理的使用node开启一个mqtt服务器实现与网页的双向通信的全部内容,希望文章能够帮你解决使用node开启一个mqtt服务器实现与网页的双向通信所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复