我是靠谱客的博主 不安往事,最近开发中收集的这篇文章主要介绍Mqtt协议无法接收到离线消息,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Mqtt协议的实现,网上有很多文章、源码,可以下载来慢慢研究。比如《自己动手实现MQTT协议》,我参考的是这篇文章,按协议一点一点实现的。但对于离线消息的接收,所说的不多,一般的文章都是说到,要将ClearSession的标识设置为0,而用QoS要为1或者2,然后重新连接就可以收到离线消息了。

但在用C#实现的过程中,连接已经正常了,QoS也设置为2了,就是收不到离线的消息。后来发现,问题不是出现在协议上,而是在数据的接收上。在接收数据时,用socket.Receive来接收,在连接的时候,会返回数据,而接收的buff开的较大,则会在接收到服务端对connect回应的数据包外,也会把离线的数据也一起收起来。这时就要按协议来作解包处理了。如果没有作解包处理,则对收到的connect的conack命令处理,那后面的离线数据就会丢失了。

namespace MqttSDK
{
    /// <summary>
    /// Mqtt包的接收器
    /// </summary>
    public class MqttPacketReceiver
    {
        #region 属性

        #region 固定头
        /// <summary>
        /// 包类型。占第一个字节的4-7bit
        /// </summary>
        public PacketTypeEnum PacketType { get; set; }

        /// <summary>
        /// 标识。占第一个字节的0-3bit
        /// 包类型为PUBLISH时,对于MQTT 3.1.1,Bit 0:RETAIN3,Bit 1:QoS2,Bit 2:QoS2,Bit 3:DUP1。
        /// 包类型为PUBREL、SUBSCRIBE、UNSUBSCRIBE时,Bit 1:1,Bit 0、Bit 2、Bit 3均为0。
        /// 其他包类型,Bit 0、Bit 1、Bit 2、Bit 3均为0。
        /// </summary>
        public int Flags { get; set; }

        /// <summary>
        /// 剩余长度。可变头部和载荷的字节数之和。
        /// </summary>
        public int RemainingLength { get; set; }

        public byte FirstByte { get; set; }

        /// <summary>
        /// 剩余长度对应的字节
        /// </summary>
        public List<byte> RemainingLengthBytes { get; set; }
        #endregion

        #region 可变头和载荷的数据
        /// <summary>
        /// 可变头和载荷的数据
        /// </summary>
        public List<byte> BytesOfVariableHeaderAndPayload { get; set; }
        #endregion

        #endregion

        #region Read
        /// <summary>
        /// 从socket读取数据
        /// </summary>
        /// <param name="socket"></param>
        public IMqttPacket Read(Socket socket)
        {
            if (socket == null)
            {
                return null;
            }

            var byteOne = new byte[1];
            var len = socket.Receive(byteOne);
            if (len <= 0)
            {
                return null;
            }
            BytesOfVariableHeaderAndPayload = new List<byte>();
            RemainingLengthBytes = new List<byte>();

            //读取第1个字节
            FirstByte = byteOne[0];
            Flags = FirstByte & 0b00001111;
            PacketType = (PacketTypeEnum)(FirstByte >> 4);

            //读取第2个字节
            byteOne[0] = 0;
            len = socket.Receive(byteOne);
            if (len <= 0)
            {
                throw new MqttPacketException("请取第2个字节异常");
            }
            var second_byte = byteOne[0];

            //===计算【剩余长度】===
            //【剩余长度】取7bit来计算,最高位的bit7作为标识,
            // bit7为1则说明下一个字节也是剩余长度的一部分,
            // bit7为0则读取剩余长度完毕。
            int remaining_length = second_byte & 0b01111111;
            var flag_bit = (second_byte & 0b10000000);
            int multiplier = 1;
            while (true)
            {
                if (flag_bit == 0)
                {
                    break;
                }
                len = socket.Receive(byteOne);
                if (len <= 0)
                {
                    throw new MqttPacketException("请取剩余长度的字节异常");
                }
                var next_byte = byteOne[0];
                flag_bit = next_byte & 0b10000000;
                remaining_length += (next_byte & 0b01111111) * multiplier;
                RemainingLengthBytes.Add(next_byte);
            }
            RemainingLength = remaining_length;

            //读取可变头和载荷的数据
            var blockSize = 4096;
            var buff = new byte[blockSize];
            int readSize = blockSize;
            while (remaining_length > 0)
            {
                if (remaining_length < blockSize)
                {//读取长度超过了【RemainingLength】,会把后续数据包也读进来,导致不必要的错误。在断开再重连后,要接收离线消息时最可能发生。
                    readSize = remaining_length;
                }
                else
                {
                    readSize = blockSize;
                }
                len = socket.Receive(buff, 0, readSize, SocketFlags.None);
                if (len <= 0)
                {
                    throw new MqttPacketException("固定头后面的数据异常");
                }
                remaining_length -= len;
                var tempBuff = new byte[len];
                Array.Copy(buff, tempBuff, len);
                BytesOfVariableHeaderAndPayload.AddRange(tempBuff);
            }
            var packet = CreatePacket();
            return packet;
        }
        #endregion

        #region CreatePacket
        private IMqttPacket CreatePacket()
        {
            IMqttPacket packet = null;//IMqttPacket是对Mqtt各个命令响应协议包的接口。
            switch (PacketType)
            {
                case PacketTypeEnum.CONNACK:
                    {//对连接的响应
                        packet = new MqttConnackPacket(this);
                        break;
                    }
                case PacketTypeEnum.SUBACK:
                    {//对订阅的响应
                        packet = new MqttSubackPacket(this);
                        break;
                    }
                case PacketTypeEnum.PUBACK:
                    {//对QoS=1的Publis的响应
                        packet = new MqttPubackPacket(this);
                        break;
                    }
                case PacketTypeEnum.PUBREC:
                    {//对QoS=2的Publis的响应,是QoS=2的第2个协议包,第1个是Publish。
                        packet = new MqttPubrecPacket(this);
                        break;
                    }
                case PacketTypeEnum.PUBREL:
                    {//对QoS=2的Publis的响应,是QoS=2的第3个协议包。
                        packet = new MqttPubrelPacket(this);
                        break;
                    }
                case PacketTypeEnum.PUBCOMP:
                    {//对QoS=2的Publis的响应,是QoS=2的第4个协议包。
                        packet = new MqttPubcompPacket(this);
                        break;
                    }
                case PacketTypeEnum.PUBLISH:
                    {//收到服务端发来的Publish包
                        packet = new MqttPublishPacket(this);
                        break;
                    }
                case PacketTypeEnum.UNSUBACK:
                    {//【取消订阅】确认
                        packet = new MqttUnsubackPacket(this);
                        break;
                    }
                case PacketTypeEnum.PINGRESP:
                    {//【Ping请求】的响应
                        packet = new MqttPingrespPacket(this);
                        break;
                    }
            }
            return packet;
        }
        #endregion

        #region GetTotalBytes
        /// <summary>
        /// 获取所有字节
        /// </summary>
        /// <returns></returns>
        public byte[] GetTotalBytes()
        {
            var list = new List<byte>();
            list.Add(FirstByte);
            if (!ListHelper.IsEmpty(RemainingLengthBytes))
            {
                list.AddRange(RemainingLengthBytes);
            }
            if (!ListHelper.IsEmpty(BytesOfVariableHeaderAndPayload))
            {
                list.AddRange(BytesOfVariableHeaderAndPayload);
            }
            var buff = list.ToArray();
            return buff;
        }
        #endregion
    }
}

其中MqttxxxPacket是对每个命令包的实现,可以参照协议格式自行封装。

附上协议包类型的枚举值,加上一些个人理解的注释,仅供参考。

namespace MqttSDK
{
    /// <summary>
    /// Mqtt的包类型
    /// </summary>
    public enum PacketTypeEnum
    {
        /// <summary>
        /// 保留位1
        /// Reserved
        /// </summary>
        Reserved1 = 0,

        /// <summary>
        /// 客户端请求连接到服务端(broken)
        /// Client request to connect to Server
        /// </summary>
        CONNECT = 1,

        /// <summary>
        /// 连接确认
        /// Connect acknowledgment
        /// </summary>
        CONNACK = 2,

        /// <summary>
        /// 由【发出Publish信息端】向【接收Publish信息端】发布信息。
        /// Publish message
        /// 【发出Publish信息端】可以是【信息发布者Publish】,也可以是【中间代理Broken】
        /// 【接收Publish信息端】可以是【中间代表Broken】,也可以是【订阅者Subscribe】
        /// </summary>
        PUBLISH = 3,

        /// <summary>
        /// 由【接收Publish信息端】向【发出Publish信息端】的确认(针对QoS=1)
        /// Publish acknowledgment。
        /// </summary>
        PUBACK = 4,

        /// <summary>
        /// 由【接收Publish信息端】向【发出Publish信息端】的确认,表示收到了信息(针对QoS=2的第2个协议包,第1个是Publish)
        /// Publish received (assured delivery part 1)
        /// </summary>
        PUBREC = 5,

        /// <summary>
        /// 由【发出Publish信息端】向【接收Publish信息端】的确认,表示信息已经释放(针对QoS=2的第3个协议包)
        /// Publish release (assured delivery part 2)
        /// </summary>
        PUBREL = 6,

        /// <summary>
        /// 由【接收Publish信息端】向【发出Publish信息端】的确认,表示信息已经处理完成,比如信息已缓存起来。(针对QoS=2的第4个协议包)
        /// Publish complete (assured delivery part 3)
        /// </summary>
        PUBCOMP = 7,

        /// <summary>
        /// 由客户端向服务端请求【订阅】
        /// Client subscribe request
        /// </summary>
        SUBSCRIBE = 8,

        /// <summary>
        /// 由客户端向服务端请求【订阅】确认
        /// Subscribe acknowledgment
        /// </summary>
        SUBACK = 9,

        /// <summary>
        /// 由客户端向服务端请求【取消订阅】
        /// Unsubscribe request
        /// </summary>
        UNSUBSCRIBE = 10,

        /// <summary>
        /// 由客户端向服务端请求【取消订阅】确认
        /// Unsubscribe acknowledgment
        /// </summary>
        UNSUBACK = 11,

        /// <summary>
        /// 由客户端向服务端的Ping请求
        /// PING request
        /// </summary>
        PINGREQ = 12,

        /// <summary>
        /// 由服务端向客户端的Ping响应
        /// PING response
        /// </summary>
        PINGRESP = 13,

        /// <summary>
        /// 由客户端向服务端的断开连接
        /// Client is disconnecting
        /// </summary>
        DISCONNECT = 14,

        /// <summary>
        /// 保留位2
        /// Reserved
        /// </summary>
        Reserved2 = 15 
    }
}

测试:先用Publish端发布消息。

测试:再开始Subcribe端接收消息(离线消息)

 

最后

以上就是不安往事为你收集整理的Mqtt协议无法接收到离线消息的全部内容,希望文章能够帮你解决Mqtt协议无法接收到离线消息所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(66)

评论列表共有 0 条评论

立即
投稿
返回
顶部