概述
最近在一款内存资源非常紧张的芯片上进行开发,由于比较嫌弃mqtt c库的代码比较多余,所以自己写了一个简单框架,实现mqtt的连接、订阅和发布。
对于一款物联网设备(比如车载tbox),就需要3个功能:连接mqtt服务器,订阅接收topic,周期性发布消息到服务器。
目录
先了解需要用到mqtt协议的哪些部分
1、MQTT报文格式
1.1、固定头
1.2、可变头
1.3、有效载荷
2、实现服务器连接
2.1 最简单的方法
2.2 服务器响应
2.3 高级一点的方法
3、订阅服务器Topic
3.1 订阅确认
4、消息发布
4.1 固定头
4.2 固定报文
4.3 有效载荷
4.4 封包和解包的方法
先了解需要用到mqtt协议的哪些部分
如图,我们就只需要这三种报文(5个)。
至于心跳包12、13,一般也是不需要的,,我们只需要设置为最大值,,本身大部分应用场景都需要嵌入式设备周期性上传设备信息(如GPS数据、自定义心跳数据)。
1、MQTT报文格式
前提要说明一下MQTT的协议固定格式(可以跳过)。
MQTT协议,数据包格式,分为3部分:固定头 + 可变头 + 有效载荷。
1.1、固定头
固定头,第一个字节:
前4个bit表示类型,后4个bit作为标志位。
长话短说就是第一个字节是固定不变的(3.1.1协议版本)。
第二个字节表示剩余长度,
并且具有扩展性,即每个字节最高bit表示是否有更多的字节,后7bit用于编码数据长度,剩余长度字段最大4byte,即256MB。
具体含义和用法在后面有说明。
1.2、可变头
第一部分就是报文标识符,用于交换传递,有些报文需要,有些则不需要,具体请查看上面两张图。
1.3、有效载荷
第二部分就是有效载荷的控制报文,我更愿意理解为是扩展的控制报文,PUBLISH除外(因为其有效载荷就是纯数据,与MQTT无关)
2、实现服务器连接
参考报文如下(原始hex报文):
102400044D51545404C20078001454424F585F38363538303030343233383733363600000000
注释如下:
0x10, //head
0x24, //packet length
0x00,0x04,0x4D,0x51,0x54,0x54, //mqtt
0x04, //protocol_level
0xC2, //connect_flag
0x00,0x78, //heartbeat interval
0x00,0x14, //Length of the client ID
0x54,0x42,0x4F,0x58,0x5F,0x38,0x36,0x35,0x38,0x30,0x30,0x30,0x34,0x32,0x33,0x38,0x37,0x33,0x36,0x36, //client ID
0x00,0x00, //username
0x00,0x00, //password
其中字符MQTT和protocol_level基本固定,不同的mqtt协议版本会有改动。
心跳间隔为0X0078,该数值以秒为单位,即120s。
connect_flag是连接标志位,0xC2即1100 0010,①bit7和bit6 分别表示有效载荷包含用户名和密码,②bit5 表示不保留遗嘱,bit4和3遗嘱消息的qos等级,③bit3表示异常时是否发送遗嘱消息,④ bit1 为1时表示必须要开始1个新的会话,bit0保留。
A. username和password,由于其起始字节是2byte的长度位,此处为0表示不需要用户名和密码。
B. client ID部分,就是进行MQTT连接的客户端id。
以上,需要修改的就A和B两部分,以及对应的长度位和剩余长度。
2.1 最简单的方法
参数不变的情况下,比如用户名密码不需要进行更改,客户端ID固定如”TBOX_${IMEI}“格式。
就可以自己做一个报文出来,不同的设备只需要更改1次信息。如下:
uint8_t ConnectPacket[] = {0x10,0x22,0x00,0x04,0x4D,0x51,0x54,0x54,0x04,0xC2,0x00,0x78,0x00,0x12,0x46,0x43,0x5F,0x38,0x36,0x35,0x38,0x30,0x30,0x30,0x34,0x32,0x33,0x38,0x37,0x33,0x36,0x36,0x00,0x00,0x00,0x00};
//每次连接之前进行组包时,修改ID值
//或者在首次上电时,修改ID值并将之写入Flash,后续每次只需要读取
memcpy(ConnectPacket+17,IMEI,15);
//调用TCP/IP 数据发送接口将数据包发出去 注意一定是HEX数据发送
TcpPacket_Transport(ConnectPacket,sizeof(ConnectPacket));
//在TCP/IP接收处判断服务器响应数据
OSSemPend(ConnectSem,1000,&byErr);
2.2 服务器响应
更简单,如下只有4byte。
20020000
注释如下:
0x20, //head
0x02, //len
0x00, //ACK
0x00 //return OK
收到该正确响应,就可以进入下一步了。
有异常,要么是TCP/IP交互处理有问题,要么是MQTT服务器有问题。
2.3 高级一点的方法
自己手动写两个mqtt接口,函数设计如下。
/*
* 函数名: MqttPacket_Connect
* 功能: 输入参数,得到mqtt连接报文
* 入参: CilentID,username,password 字符串形式的客户端ID,用户名,密码
* 返回: 返回数据指针,外部需要进行内存释放
*/
uint8_t *MqttPacket_Connect(char *CilentID, char *username, char *password, uint16_t *pLen);
/*
* 函数名: MqttAck_Connect
* 功能: 确认连接报文的响应是否OK
* 入参: buf,len 数据指针和长度
* 返回: 参数错误返回-1,成功返回0,其它错误返回正数
*/
int MqttAck_Connect(uint8_t *buf, uint16_t len);
3、订阅服务器Topic
在步骤2中能实现连接,到这一步就非常简单了。
参考报文如下:
82210001001C2F504D542F5332432F3836353830303034323338373336362F4F424400
注释如下:
0x82, //head
0x21, //packet length
0x00,0x01, //packet id_flag
0x00,0x1C, //topic length
0x2F,0x50,0x4D,0x54,0x2F,0x53,0x32,0x43,0x2F,0x38,0x36,0x35,0x38,0x30,0x30,0x30,0x34,0x32,0x33,0x38,0x37,0x33,0x36,0x36,0x2F,0x4F,0x42,0x44, //topic
0x00 //qos
第3和4个字节就是上面说到的”报文标识符“,用于交换确认。
最后一个字节表示QoS服务等级,一般填0(既不需要,也能较低交互复杂度和服务器负载)。
跟步骤2一样,即可以①用固定报文去修改,亦可以②写一个方法生成报文。
3.1 订阅确认
参考报文:
9003000100
注释如下:
0x90, //head
0x03, //len
0x00, 0x01, //packet id_flag
0x00 //qos0 OK
报文标识符与下发的相对应,qos0 OK与下发的最后一位QoS等级对应且表示成功。
其它回复类型说明:0x80表示失败,0x01表示Qos1等级成功,0x02表示Qos2等级成功。
4、消息发布
PUBLISH报文的结构,是固定头+有效载荷。
4.1 固定头
特别的,其中DUP标志位表示是否是重发,即为1时表示该报文是重发,但只对于QoS等级1和2才生效,且QoS为0时后4bit必须全部置0。
4.2 固定报文
必要内容1:topic主题的长度+主题名
可选内容2:2byte的报文标识符
具体参考如下:
4.3 有效载荷
有效载荷就是纯数据了。
这里还要重复说一句,能用到这个手动实现mqtt协议的应用,一般都是4G模块本身不支持mqtt协议栈,那么一定是在TCP的基础上进行实现的。所以一定要注意,模块默认发送的是string类型数据还是hex格式数据,亦或者是hex字符串数据。
4.4 封包和解包的方法
封包的关键在于剩余长度的编码,解包的关键在于计算剩余长度对应的有效长度。
二者都可以从随便一个mqtt官方库的代码中,把函数拉出来改造一下。
参考函数如下:
/*
* 函数名: MqttPacket_Publish
* 功能: 对发布的消息进行封包
* 入参: pData unDataLen 数据指针和长度
* 返回: 返回数据指针和长度,外部需要用掉之后释放
* 说明: 1、本API只适合短包传输
* 2、返回指针不为空时需要手动释放内存
*/
uint8_t *MqttPacket_Publish(char *topic, uint8_t *pData, uint16_t unDataLen, uint16_t *pLen)
{
unsigned char *buf = NULL;
int packet_len = 0,strLen = 0,lenLen = 0,offset = 0;
strLen = strlen(topic);
packet_len = 2 + strLen + unDataLen;
if(pData == NULL || unDataLen == 0)
return NULL;
if(packet_len < 128)
lenLen = 1;
else if (packet_len < 16384)
lenLen = 2;
else
return NULL;
buf = malloc(packet_len + 1 + lenLen);
if(buf == NULL)
return NULL;
buf[offset++] = 0x30; //head
if(lenLen == 1)
buf[offset++] = packet_len; //len type1
else
{
buf[offset++] = packet_len%128 + 128;
buf[offset++] = (packet_len/128)%128; //len type2
}
buf[offset++] = 0x00;
buf[offset++] = strLen; //topic len
memcpy(&buf[offset], topic, strLen); //topic
offset += strLen;
memcpy(&buf[offset], pData, unDataLen); //data
offset += unDataLen;
*pLen = offset;
return buf;
}
/*
* 函数名: MqttUnpack_Publish
* 功能: 对发布的消息进行解包
* 入参: pData unDataLen 数据指针和长度
* 返回: 返回有效载荷的数据指针和长度
* 说明: 1、本API只适合短包传输,且无内存申请操作
* 2、返回指针为空时应当丢弃该包数据
*/
uint8_t *MqttUnpack_Publish(uint8_t *pData, uint16_t unDataLen, uint16_t *pLen)
{
unsigned char *buf = NULL;
int packet_len = 0,topic_len = 0,remain_len = 0;
if(pData == NULL || unDataLen == 0 || pData[0] != 0x30 || unDataLen >= 16384)
return NULL;
//30 83 02 00 1D
if( (pData[1] & 0x80) == 1)
{
packet_len = (pData[1]&0x7F) + pData[2]*128;
remain_len = unDataLen - 3;
}
else
{
packet_len = pData[1];
remain_len = unDataLen - 2;
}
topic_len = pData[4];
buf = &pData[4+topic_len];
remain_len = unDataLen - topic_len - 2;
*pLen = packet_len - topic_len - 2;
if(*pLen > remain_len)
{
*pLen = 0;
buf = NULL;
}
return buf;
}
备注,这两个函数本身做了限制:
1、封包函数要求输入的数据长度不超过16384,即剩余长度最多编码2个字节。
2、解包函数要求输入的数据为完整数据,若0x30包的尾部数据丢了一部分则失效
另外,事实上也可以抛弃解包函数。
比如有效数据设置一个0x9876的固定头,一个0x5432的固定尾,
再设立一个专门的线程或函数去专门处理字节流数据,
这样就可以直接把收到的整包数据扔过去处理。
因为对于mqtt发布的消息的固定头,只有长度位的数据会变,但是一般的数据长度小于16K,基本不会与固定头和固定尾重合。
事实上测试下来,也没有什么问题。
但是,但是,如果4G或GPRS模块本身夹杂了一下数据,则必须要进行处理,将真正的有效数据剥离处理,,就比如有些操蛋模块,开头结尾都有一些随机数据提示。
最后
以上就是快乐世界为你收集整理的不用MQTT C库就能实现MQTT连接、订阅和发布先了解需要用到mqtt协议的哪些部分1、MQTT报文格式2、实现服务器连接3、订阅服务器Topic4、消息发布的全部内容,希望文章能够帮你解决不用MQTT C库就能实现MQTT连接、订阅和发布先了解需要用到mqtt协议的哪些部分1、MQTT报文格式2、实现服务器连接3、订阅服务器Topic4、消息发布所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复