概述
Mqtt协议的实现,网上有很多文章、源码,可以下载来慢慢研究。比如《自己动手实现MQTT协议》,我参考的是这篇文章,按协议一点一点实现的。但对于离线消息的接收,所说的不多,一般的文章都是说到,要将ClearSession的标识设置为0,而用QoS要为1或者2,然后重新连接就可以收到离线消息了。
但在用C#实现的过程中,连接已经正常了,QoS也设置为2了,就是收不到离线的消息。后来发现,问题不是出现在协议上,而是在数据的接收上。在接收数据时,用socket.Receive来接收,在连接的时候,会返回数据,而接收的buff开的较大,则会在接收到服务端对connect回应的数据包外,也会把离线的数据也一起收起来。这时就要按协议来作解包处理了。如果没有作解包处理,则对收到的connect的conack命令处理,那后面的离线数据就会丢失了。
namespace MqttSDK
{
/// <summary>
/// Mqtt包的接收器
/// </summary>
public class MqttPacketReceiver
{
#region 属性
#region 固定头
/// <summary>
/// 包类型。占第一个字节的4-7bit
/// </summary>
public PacketTypeEnum PacketType { get; set; }
/// <summary>
/// 标识。占第一个字节的0-3bit
/// 包类型为PUBLISH时,对于MQTT 3.1.1,Bit 0:RETAIN3,Bit 1:QoS2,Bit 2:QoS2,Bit 3:DUP1。
/// 包类型为PUBREL、SUBSCRIBE、UNSUBSCRIBE时,Bit 1:1,Bit 0、Bit 2、Bit 3均为0。
/// 其他包类型,Bit 0、Bit 1、Bit 2、Bit 3均为0。
/// </summary>
public int Flags { get; set; }
/// <summary>
/// 剩余长度。可变头部和载荷的字节数之和。
/// </summary>
public int RemainingLength { get; set; }
public byte FirstByte { get; set; }
/// <summary>
/// 剩余长度对应的字节
/// </summary>
public List<byte> RemainingLengthBytes { get; set; }
#endregion
#region 可变头和载荷的数据
/// <summary>
/// 可变头和载荷的数据
/// </summary>
public List<byte> BytesOfVariableHeaderAndPayload { get; set; }
#endregion
#endregion
#region Read
/// <summary>
/// 从socket读取数据
/// </summary>
/// <param name="socket"></param>
public IMqttPacket Read(Socket socket)
{
if (socket == null)
{
return null;
}
var byteOne = new byte[1];
var len = socket.Receive(byteOne);
if (len <= 0)
{
return null;
}
BytesOfVariableHeaderAndPayload = new List<byte>();
RemainingLengthBytes = new List<byte>();
//读取第1个字节
FirstByte = byteOne[0];
Flags = FirstByte & 0b00001111;
PacketType = (PacketTypeEnum)(FirstByte >> 4);
//读取第2个字节
byteOne[0] = 0;
len = socket.Receive(byteOne);
if (len <= 0)
{
throw new MqttPacketException("请取第2个字节异常");
}
var second_byte = byteOne[0];
//===计算【剩余长度】===
//【剩余长度】取7bit来计算,最高位的bit7作为标识,
// bit7为1则说明下一个字节也是剩余长度的一部分,
// bit7为0则读取剩余长度完毕。
int remaining_length = second_byte & 0b01111111;
var flag_bit = (second_byte & 0b10000000);
int multiplier = 1;
while (true)
{
if (flag_bit == 0)
{
break;
}
len = socket.Receive(byteOne);
if (len <= 0)
{
throw new MqttPacketException("请取剩余长度的字节异常");
}
var next_byte = byteOne[0];
flag_bit = next_byte & 0b10000000;
remaining_length += (next_byte & 0b01111111) * multiplier;
RemainingLengthBytes.Add(next_byte);
}
RemainingLength = remaining_length;
//读取可变头和载荷的数据
var blockSize = 4096;
var buff = new byte[blockSize];
int readSize = blockSize;
while (remaining_length > 0)
{
if (remaining_length < blockSize)
{//读取长度超过了【RemainingLength】,会把后续数据包也读进来,导致不必要的错误。在断开再重连后,要接收离线消息时最可能发生。
readSize = remaining_length;
}
else
{
readSize = blockSize;
}
len = socket.Receive(buff, 0, readSize, SocketFlags.None);
if (len <= 0)
{
throw new MqttPacketException("固定头后面的数据异常");
}
remaining_length -= len;
var tempBuff = new byte[len];
Array.Copy(buff, tempBuff, len);
BytesOfVariableHeaderAndPayload.AddRange(tempBuff);
}
var packet = CreatePacket();
return packet;
}
#endregion
#region CreatePacket
private IMqttPacket CreatePacket()
{
IMqttPacket packet = null;//IMqttPacket是对Mqtt各个命令响应协议包的接口。
switch (PacketType)
{
case PacketTypeEnum.CONNACK:
{//对连接的响应
packet = new MqttConnackPacket(this);
break;
}
case PacketTypeEnum.SUBACK:
{//对订阅的响应
packet = new MqttSubackPacket(this);
break;
}
case PacketTypeEnum.PUBACK:
{//对QoS=1的Publis的响应
packet = new MqttPubackPacket(this);
break;
}
case PacketTypeEnum.PUBREC:
{//对QoS=2的Publis的响应,是QoS=2的第2个协议包,第1个是Publish。
packet = new MqttPubrecPacket(this);
break;
}
case PacketTypeEnum.PUBREL:
{//对QoS=2的Publis的响应,是QoS=2的第3个协议包。
packet = new MqttPubrelPacket(this);
break;
}
case PacketTypeEnum.PUBCOMP:
{//对QoS=2的Publis的响应,是QoS=2的第4个协议包。
packet = new MqttPubcompPacket(this);
break;
}
case PacketTypeEnum.PUBLISH:
{//收到服务端发来的Publish包
packet = new MqttPublishPacket(this);
break;
}
case PacketTypeEnum.UNSUBACK:
{//【取消订阅】确认
packet = new MqttUnsubackPacket(this);
break;
}
case PacketTypeEnum.PINGRESP:
{//【Ping请求】的响应
packet = new MqttPingrespPacket(this);
break;
}
}
return packet;
}
#endregion
#region GetTotalBytes
/// <summary>
/// 获取所有字节
/// </summary>
/// <returns></returns>
public byte[] GetTotalBytes()
{
var list = new List<byte>();
list.Add(FirstByte);
if (!ListHelper.IsEmpty(RemainingLengthBytes))
{
list.AddRange(RemainingLengthBytes);
}
if (!ListHelper.IsEmpty(BytesOfVariableHeaderAndPayload))
{
list.AddRange(BytesOfVariableHeaderAndPayload);
}
var buff = list.ToArray();
return buff;
}
#endregion
}
}
其中MqttxxxPacket是对每个命令包的实现,可以参照协议格式自行封装。
附上协议包类型的枚举值,加上一些个人理解的注释,仅供参考。
namespace MqttSDK
{
/// <summary>
/// Mqtt的包类型
/// </summary>
public enum PacketTypeEnum
{
/// <summary>
/// 保留位1
/// Reserved
/// </summary>
Reserved1 = 0,
/// <summary>
/// 客户端请求连接到服务端(broken)
/// Client request to connect to Server
/// </summary>
CONNECT = 1,
/// <summary>
/// 连接确认
/// Connect acknowledgment
/// </summary>
CONNACK = 2,
/// <summary>
/// 由【发出Publish信息端】向【接收Publish信息端】发布信息。
/// Publish message
/// 【发出Publish信息端】可以是【信息发布者Publish】,也可以是【中间代理Broken】
/// 【接收Publish信息端】可以是【中间代表Broken】,也可以是【订阅者Subscribe】
/// </summary>
PUBLISH = 3,
/// <summary>
/// 由【接收Publish信息端】向【发出Publish信息端】的确认(针对QoS=1)
/// Publish acknowledgment。
/// </summary>
PUBACK = 4,
/// <summary>
/// 由【接收Publish信息端】向【发出Publish信息端】的确认,表示收到了信息(针对QoS=2的第2个协议包,第1个是Publish)
/// Publish received (assured delivery part 1)
/// </summary>
PUBREC = 5,
/// <summary>
/// 由【发出Publish信息端】向【接收Publish信息端】的确认,表示信息已经释放(针对QoS=2的第3个协议包)
/// Publish release (assured delivery part 2)
/// </summary>
PUBREL = 6,
/// <summary>
/// 由【接收Publish信息端】向【发出Publish信息端】的确认,表示信息已经处理完成,比如信息已缓存起来。(针对QoS=2的第4个协议包)
/// Publish complete (assured delivery part 3)
/// </summary>
PUBCOMP = 7,
/// <summary>
/// 由客户端向服务端请求【订阅】
/// Client subscribe request
/// </summary>
SUBSCRIBE = 8,
/// <summary>
/// 由客户端向服务端请求【订阅】确认
/// Subscribe acknowledgment
/// </summary>
SUBACK = 9,
/// <summary>
/// 由客户端向服务端请求【取消订阅】
/// Unsubscribe request
/// </summary>
UNSUBSCRIBE = 10,
/// <summary>
/// 由客户端向服务端请求【取消订阅】确认
/// Unsubscribe acknowledgment
/// </summary>
UNSUBACK = 11,
/// <summary>
/// 由客户端向服务端的Ping请求
/// PING request
/// </summary>
PINGREQ = 12,
/// <summary>
/// 由服务端向客户端的Ping响应
/// PING response
/// </summary>
PINGRESP = 13,
/// <summary>
/// 由客户端向服务端的断开连接
/// Client is disconnecting
/// </summary>
DISCONNECT = 14,
/// <summary>
/// 保留位2
/// Reserved
/// </summary>
Reserved2 = 15
}
}
测试:先用Publish端发布消息。
测试:再开始Subcribe端接收消息(离线消息)
最后
以上就是不安往事为你收集整理的Mqtt协议无法接收到离线消息的全部内容,希望文章能够帮你解决Mqtt协议无法接收到离线消息所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复