概述
基于MQTTnet做了个MQTT协议测试程序,本程序包括服务的和两个客户端,方便在不引入外部支撑的情况下测试MQTT协议。
测试软件界面如下
1:启动MQTT服务
在Server区域,
启动服务可以选择MQTT服务使用的端口、用户名、密码。
如有有客户端需要连接服务,需要把这些信息告诉他们。
右边tab页有启动工程的详细日志。
2:MQTT客户端
在该测试程序中实现了两个客户端,方便测试使用,两个客户端功能万千一致,以客户端1为例介绍配置过程中的相关参数。
首先是需要连接的MQTT服务器IP和端口,这些信息一般有服务端提供,本程序直接连接本机的服务端,因此IP配置为127.0.0.1,端口和服务器端口一致12345。
其次,根据MQTT协议,连接服务器时需要提供用户名、密码用于鉴权认证。
最后,需要一个终端唯一标识,即客户标识,该标识在一个服务器下需要保证唯一。
到此和MQTT服务器连接相关参数就完备了,可以连接服务器了。
Qos是遗嘱消息服务质量等级,取值即含义如下
RETAIN: 保留标志位,如果为1,服务器存储最新一条RETAIN消息,以便分发给新的订阅者
MQTT是发布订阅模式的,客户端如果要收到消息需要先订阅对应的主题,订阅主题数量是没有限制的。
例如:client1订阅主题client1/sub、client2/sub
client2订阅主题client2/sub
然后在已订主题的下拉框中就可以看到本客户端订阅的主题。
如何要取消对应的主题,在下拉列表中先择,然后点击取消,就会取消订阅。
发布主题需要设定主题和发布内容。
例如用client1发布:
主题:client2/sub
内容:新信息。
例如用client2发布:
主题:client1/sub
内容:test
根据刚才设置的订阅信息,两个客户端收到的数据如下:
我们可以看到主题:client2/sub在两个客户端都收到了。
自此,测试功能基本就介绍完了。
3:程序功能实现
该程序是基于MQTTnet库最的。
因为要实现服务端和客户端。因此创建连个类:MQTTServer、MQTTClients,这部分代码也是在网上找的,但功能实现比较完备,所有我就直接使用了。
服务端MQTTServer
public class MQTTServer
{
public StringBuilder sb = new StringBuilder("");
public MqttServer mqttServer = null;
public bool MqttServer_runflag = false;
public async void StartMqttServer(int port = 12345, string user = "123", string passwd = "123")
{
if (mqttServer == null)
{
var optionsBuilder = new MqttServerOptionsBuilder()
.WithDefaultEndpoint().WithDefaultEndpointPort(port).WithConnectionValidator(
c =>
{
var currentUser = user;
var currentPWD = passwd;
if (currentUser == null || currentPWD == null)
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
if (c.Username != currentUser)
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
if (c.Password != currentPWD)
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
c.ReasonCode = MqttConnectReasonCode.Success;
}).WithSubscriptionInterceptor(
c =>
{
c.AcceptSubscription = true;
}).WithApplicationMessageInterceptor(
c =>
{
c.AcceptPublish = true;
});
mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(OnMqttServerStarted);
mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(OnMqttServerStopped);
mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(OnMqttServerClientConnected);
mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(OnMqttServerClientDisconnected);
mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(OnMqttServerClientSubscribedTopic);
mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(OnMqttServerClientUnsubscribedTopic);
mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnMqttServer_ApplicationMessageReceived);
await mqttServer.StartAsync(optionsBuilder.Build());
}
}
public async void StopMqttServer()
{
await mqttServer?.StopAsync();
}
public async void PublishMqttTopic(string topic, string payload)
{
var message = new MqttApplicationMessage()
{
Topic = topic,
Payload = Encoding.UTF8.GetBytes(payload)
};
await mqttServer.PublishAsync(message);
}
public void OnMqttServerStarted(EventArgs e)
{
sb.Append("MQTT服务启动完成!rn");
}
public void OnMqttServerStopped(EventArgs e)
{
sb.Append("MQTT服务停止完成!rn");
}
public void OnMqttServerClientConnected(MqttServerClientConnectedEventArgs e)
{
sb.Append($"客户端[{e.ClientId}]已连接rn");
}
public void OnMqttServerClientDisconnected(MqttServerClientDisconnectedEventArgs e)
{
sb.Append($"客户端[{e.ClientId}]已断开连接!rn");
//PublishMqttTopic("client/Disconnected", $"客户端[{ e.ClientId}]已断开连接");
}
public void OnMqttServerClientSubscribedTopic(MqttServerClientSubscribedTopicEventArgs e)
{
//sb.Append($"客户端[{e.ClientId}]已成功订阅主题[{e.TopicFilter}]!rn");
sb.Append($"客户端[{e.ClientId}]已成功订阅主题[{e.TopicFilter.Topic}]!rn");
}
public void OnMqttServerClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e)
{
sb.Append($"客户端[{e.ClientId}]已成功取消订阅主题[{e.TopicFilter}]!rn");
}
public void OnMqttServer_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
sb.Append($"客户端[{e.ClientId}]>> 主题:{e.ApplicationMessage.Topic} rn");
//Console.WriteLine($"客户端[{e.ClientId}]>> 主题:{e.ApplicationMessage.Topic} 负荷:{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} Qos:{e.ApplicationMessage.QualityOfServiceLevel} 保留:{e.ApplicationMessage.Retain}");
}
}
客户端MQTTClients
public class MQTTClients
{
private MqttClient mqttClient = null;
public StringBuilder sb = new StringBuilder("");
public async Task<MqttClientConnectResultCode> ClientStart(string tcpServer, int tcpPort, string mqttUser, string mqttPassword,
String ClientId = "01001")
{
try
{
var mqttFactory = new MqttFactory();
//https://www.cnblogs.com/zhaoqm999/p/12960677.html
var options = new MqttClientOptions
{
ClientId = ClientId,
ProtocolVersion = MQTTnet.Formatter.MqttProtocolVersion.V311,
ChannelOptions = new MqttClientTcpOptions
{
Server = tcpServer,
Port = tcpPort
},
WillDelayInterval = 10,
WillMessage = new MqttApplicationMessage()
{
Topic = $"LastWill/121313",
Payload = Encoding.UTF8.GetBytes("I Lost the connection!" + ClientId),
QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce
}
};
if (options.ChannelOptions == null)
{
throw new InvalidOperationException();
}
if (!string.IsNullOrEmpty(mqttUser))
{
options.Credentials = new MqttClientCredentials
{
Username = mqttUser,
Password = Encoding.UTF8.GetBytes(mqttPassword)
};
}
options.CleanSession = true;
options.KeepAlivePeriod = TimeSpan.FromSeconds(5);
mqttClient = mqttFactory.CreateMqttClient() as MqttClient;
mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnMqttClientConnected);
mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnMqttClientDisConnected);
mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnSubscriberMessageReceived);
MqttClientAuthenticateResult result = await mqttClient.ConnectAsync(options);
return result.ResultCode;
}
catch (Exception ex)
{
return MqttClientConnectResultCode.ServerBusy;
//lbxMonitor.BeginInvoke(_updateMonitorAction,
// Logger.TraceLog(Logger.Level.Fatal, $"客户端尝试连接出错.>{ex.Message}"));
}
}
public async Task<int> CLientConnect(string tcpServer, int tcpPort, string mqttUser, string mqttPassword,
String ClientId = "01001")
{
MqttClientConnectResultCode resultCode = await ClientStart(tcpServer, tcpPort, mqttUser, mqttPassword, ClientId);
if(resultCode == MqttClientConnectResultCode.Success)
{
return 0;
}
else
{
return 1;
}
}
public async Task ClientStop()
{
try
{
if (mqttClient == null) return;
await mqttClient.DisconnectAsync();
mqttClient = null;
}
catch (Exception ex)
{
}
}
public void OnMqttClientConnected(MqttClientConnectedEventArgs e)
{
//Console.WriteLine($"客户端[{e.ClientId}]已断开连接!");
//btnConnect.Text = "Connected"; ;
//btnConnect.BackColor = Color.LightGreen;
//btnConnect.Tag = 1;
}
public void OnMqttClientDisConnected(MqttClientDisconnectedEventArgs e)
{
//Console.WriteLine($"客户端[{e.ClientId}]已断开连接!");
}
public async Task ClientPublishMqttTopic(string topic, string payload, int qos = 1, bool retain = false)
{
try
{
var message = new MqttApplicationMessage()
{
Topic = topic,
Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("HH:mm:ss:ffff") + ":" + payload),
QualityOfServiceLevel = (MqttQualityOfServiceLevel)qos,// cmbQos.SelectedIndex,
Retain = retain//bool.Parse(cmbRetain.SelectedItem.ToString())
};
await mqttClient.PublishAsync(message);
//lbxMonitor.BeginInvoke(_updateMonitorAction,
// Logger.TraceLog(Logger.Level.Info, string.Format("客户端[{0}]发布主题[{1}]成功!", mqttClient.Options.ClientId, topic)));
}
catch (Exception ex)
{
//lbxMonitor.BeginInvoke(_updateMonitorAction,
// Logger.TraceLog(Logger.Level.Fatal, string.Format("客户端[{0}]发布主题[{1}]异常!>{2}", mqttClient.Options.ClientId, topic, ex.Message)));
}
}
public async Task ClientSubscribeTopic(string topic)
{
await mqttClient.SubscribeAsync(topic);
//lbxMonitor.BeginInvoke(_updateMonitorAction,
// Logger.TraceLog(Logger.Level.Info, string.Format("客户端[{0}]订阅主题[{1}]成功!", mqttClient.Options.ClientId, topic)));
}
public async Task ClientUnSubscribeTopic(string topic)
{
await mqttClient.UnsubscribeAsync(topic);
//lbxMonitor.BeginInvoke(_updateMonitorAction,
// Logger.TraceLog(Logger.Level.Info, string.Format("客户端[{0}]取消主题[{1}]成功!", mqttClient.Options.ClientId, topic)));
}
/// <summary>
/// 当客户端接收到所订阅的主题消息时
/// </summary>
/// <param name="e"></param>
private void OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
string text = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
string Topic = e.ApplicationMessage.Topic;
string QoS = e.ApplicationMessage.QualityOfServiceLevel.ToString();
string Retained = e.ApplicationMessage.Retain.ToString();
Console.WriteLine(Topic +" " +text);
sb.Append(DateTime.Now.ToString("HH:mm:ss:ffff") + ">>" + Topic + " " + text);
//lbxMonitor.BeginInvoke(_updateMonitorAction,
// Logger.TraceLog(Logger.Level.Info, "MessageReceived >>Topic:" + Topic + "; QoS: " + QoS + "; Retained: " + Retained));
//lbxMonitor.BeginInvoke(_updateMonitorAction,
// Logger.TraceLog(Logger.Level.Info, "MessageReceived >>Msg: " + text));
}
}
然后在Winform的界面程序中定义使用。
最后
以上就是耍酷手套为你收集整理的MQTT协议测试工具及核心代码的全部内容,希望文章能够帮你解决MQTT协议测试工具及核心代码所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复