最近项目需要用到消息的订阅发布,因此使用了MQTT方式实现。
MQTT介绍:MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级协议,该协议构建于TCP/IP协议之上,MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
首先,在项目中引入MMQTT包:
1.在官网下载MMQTT包(此处用的4.3.0.0版本),将包copy到项目的libs文件夹中
2.在项目的“引用”那里右键,“添加引用”,将包添加进项目中。
本项目拿文件订阅发布为例,其中,消息发布部分:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55#region MQTT发布 string serverIp = ConfigHelper.GetAppSettingsValue("ServerIP"); string serverPort = ConfigHelper.GetAppSettingsValue("ServerPort"); //创建客户端实例 MqttClient client = new MqttClient(serverIp, Convert.ToInt32(serverPort), false, null, null, MqttSslProtocols.TLSv1_2); string clientId = Guid.NewGuid().ToString(); //输入用户名 string username = "admin"; //输入密码 string password = "admin"; //建立连接 client.Connect(clientId, username, password); //判断文件是否存在 if (File.Exists(req.PhysicalPath + @"" + req.StorageName)) { //使用文件流下形式读写文件 using (FileStream fsRead = new FileStream(req.PhysicalPath + @"" + req.StorageName, FileMode.Open)) { int fsLen = (int)fsRead.Length; byte[] heByte = new byte[fsLen]; int r = fsRead.Read(heByte, 0, heByte.Length); string myStr = System.Text.Encoding.UTF8.GetString(heByte); //文件名 string fileName = req.StorageName; //发布主题 client.Publish(Const.PRE_TRANSFER + device.SerialNumber, System.Text.Encoding.UTF8.GetBytes(string.Format("{0};{1}", myStr, fileName)), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false); } } #endregion ConfigHelper部分(获取appsettings中的value值): public class ConfigHelper { /// <summary> /// 获取AppSettings里的Value值 /// </summary> /// <param name="keyName"></param> /// <returns></returns> public static string GetAppSettingValue(string keyName) { string result = string.Empty; try { result = ConfigurationManager.AppSettings[keyName]; } catch (Exception ex) { LogHelper.Error(string.Format("获取配置文件出错:{0}", ex.Message)); } return result; } }
消息订阅部分:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58引入M2MQTT using uPLibrary.Networking.M2Mqtt; using uPLibrary.Networking.M2Mqtt.Messages; public class SubscribeFileTransfer { private string serverIp = ConfigHelper.GetAppSettingValue("ServerIP"); private string serverPort = ConfigHelper.GetAppSettingValue("ServerPort"); private string serialNumber = ConfigHelper.GetAppSettingValue("SerialNumber"); /// <summary> /// 订阅传输文件 /// </summary> public void SubscribeTransfer() { //创建客户端实例 MqttClient client = new MqttClient(serverIp, Convert.ToInt32(serverPort), false, null, null, MqttSslProtocols.TLSv1_2); string clientId = Guid.NewGuid().ToString(); string username = "admin"; string password = "admin"; //建立连接 client.Connect(clientId, username, password); //订阅主题 client.Subscribe(new string[] {Const.PRE_TRANSFER + serialNumber}, new byte[] {MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE}); //+=可以存储多个事件方法,这些方法称为订阅者 client.MqttMsgPublishReceived += client_MqttMsgPublishTransferReceived; } /// <summary> /// 传输文件订阅相应 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> void client_MqttMsgPublishTransferReceived(object sender, MqttMsgPublishEventArgs e) { //处理接收到的消息 string msg = System.Text.Encoding.Default.GetString(e.Message); //分割出文件名 string fileName = msg.Substring(msg.LastIndexOf(";") + 1, msg.Length - msg.LastIndexOf(";") - 1); //分割出文件内容 byte[] fileContent = Encoding.UTF8.GetBytes(msg.Substring(0, msg.LastIndexOf(";"))); //文件路径 string filePath = @"D:" + fileName; //写入文件 using (FileStream fsWrite = new FileStream(filePath, FileMode.Append)) { fsWrite.Write(fileContent, 0, fileContent.Length); } SqlHelper sqlHelper = new SqlHelper(AppSettings.CenterDatabaseConnStr); //更新数据库,修改字段issend为true string sql = string.Format("UPDATE DeviceFileTransfer set IsSend = 'true', SendTime = '{0}' where SerialNumber = '{1}'", DateTime.Now.ToString(), serialNumber); int count = sqlHelper.ExecuteSql(sql); if (count > 0) { LogHelper.Info("文件传输成功!"); } } }
注:订阅和发布可以分别放在两个不同的程序中
调试过程中可能要用到mqtt.fx软件,用来调试程序。如下:
最后
以上就是威武荔枝最近收集整理的关于C#使用M2MQTT快速实现MQTT通信的全部内容,更多相关C#使用M2MQTT快速实现MQTT通信内容请搜索靠谱客的其他文章。
发表评论 取消回复