概述
1、了解下MQTT协议
虽然上一篇用起来了MQTT,但是并不十分了解,基本就局限于,发布主题是发送数据,订阅主题是接收数据,今天就再好好了解一下吧。
分享下网页版的“MQTT协议中文版”链接:Introduction · MQTT协议中文版
1.1、基本的概念
- 客户端:可以发布和订阅某主题的消息;
- 服务器:接收消息、转发消息给订阅该主题的客户端;
1.2、主题的表示
1.2.1、主题名用UTF-8编码
UTF-8编码兼容ASCII码,所以自己定义主题名的时候就用字母、数字、下划线就行了;
1.2.2、层级分隔符"/"
主题名之间可以层级分隔符"/"隔开,这样主题名是层次化的,方便管理;
1.2.3、主题通配符
主题通配符,是为了可以一次性订阅多个主题,所以只使用在订阅主题,不能用在发布主题;
- 多层通配符"#"
- 在末尾分隔符后,匹配当前层级,以及所有子层级;
- "xxx/#"可匹配:"xxx","xxx/xx1","xxx/xx2","xxx/xx1/xx3";
- 单层通配符"+"
- 任意位置,匹配当前层级的下一子层级;
- "xxx/+"可匹配:"xxx/xx1","xxx/xx2";
1.2.4、以$开头的主题
$开头的主题名 不能匹配 通配符"#""+"开头的主题;
P.S. 比如百度云的主题名形式就为:"$iot/device/msg";关于主题名更细节或特别使用方式的说明见文档;
1.3、关于UTF-8编码
非常详细的字符编码讲解,ASCII、GB2312、GBK、Unicode、UTF-8等知识点都有_哔哩哔哩_bilibili
不知道UTF-8编码是啥,所以去瞅了瞅,这个视频主要内容:
- 最初的ASCII编码,以及衍生出ASCII扩展码;
- 各国家文字有自己的编码,比如中国的GB2312编码、GBK编码;
- 为了同一各国字符编码,Unicode标准提出了USC-2、USC-4字符集;
- UTF-8编码就是根据USC-4字符集而来的编码方式,将字符集划分为4个区间,根据实际字符选择对应的x字节格式,并以二进制填充到此格式中;
比如,王的Unicode编码:U+738B,属于3字节的UTF-8编码范围,将0x738B转换成2进制,填充到1110xxxx 10xxxxxx 10xxxxxx;
为了避免UP主糊弄我,啊不是我开玩笑的,狗头狗头,为了追本溯源我又找到了个查询Unicode的网站,网站很友好,除了Unicode编码还给出了HTML代码、CSS代码、处于哪个编码区域、UTF-8、UTF-16、UTF-32等;
王 - 中日韩象形文字: U+738B - Unicode 字符百科 (unicode-table.com)
1.4、服务质量等级QoS
QoS = 0:至多发送一次消息;
QoS = 1:至少发送一次消息;
QoS = 2:仅发送一次消息;
昨天还是前天刚接触到MQTT,随便搜索的时候看到的例子,链接不知道了,TA说明了共享单车在不同情境下的QoS设置。
- 当骑行的时候,车辆定期上报数据,所以偶尔丢失也没关系,此时QoS = 0;
- 当开锁的时候,因为要保证开锁成功,所以至少要有一次收到开锁指令,QoS = 1;
- 当骑行完成缴费的时候,就要求有且仅有一次支付,这时QoS = 2;
1.5、MQTT报文格式(固定报头、可变报头、有效载荷)
1.5.1、固定报头(所有报文都有)
分类下你可以发现,其实控制报文的类型就三大类,连接、订阅、发布;
除发布PUBLISH外,其余报文类型标志位都是与报文类型直接一一对应的;
剩余长度:可变报头和有效载荷的字节数;
采用变长编码方式,每个字节的低7位表示数据,最高位表示有无下一字节,最多4个字节;
也就是128进制,举个例子,比如剩余长度编码为两个字节,第一个字节1111 1111,第二个字节0111 1111;
可以看做1 127、0 127,也就是127*128^1 + 127*128^0 = 16383;
1.5.2、可变报头(部分报文有)
根据报文类型的不同,可变报头存在多个字段,这里只介绍其中的一个“报文标识符”,其余字段根据具体报文类型来看;
1.5.3、有效载荷(部分报文有)
1.6、MQTT协议中字符串的表示
前两个字节表示后面字符串的长度,后面是字符串的UTF-8格式的字符数据;
2、连接相关的报文
2.1、 CONNECT 连接服务端
2.1.1、思维导图
2.1.2、涉及到的概念——会话
2.1.2、涉及到的概念——遗嘱
客户端发送DISCONNECT断开连接,服务端会删除遗嘱;
2.2、CONNACK 确认连接请求
2.3、DISCONNECT 断开连接
客户端发送完DISCONNECT,断开连接;
服务端发送完,如有则删除遗嘱,如客户端未关闭连接则关闭连接;
2.4、PINGREQ 心跳请求、PINGRESP 心跳响应
3、发布相关的报文
3.1、PUBLISH 发布消息
3.2、PUBACK 发布确认(对QoS=1的PUBLISH响应)
4、订阅相关的报文
4.1、SUBSCRIBE 订阅主题
4.2、SUBACK 订阅确认
4.3、UNSUBSCRIBE 取消订阅、UNSUBACK 取消订阅确认
5、MQTT的代码
5.1、config 加载或更新系统参数
config.h定义了一个系统参数的结构体,用来存放WiFi信息和MQTT服务端域名、端口、本机的用户名密码等;
配置这些系统参数的宏定义在mqtt_config.h;
这个config文件的作用就是,从flash中加载系统参数,代码中利用了两个扇区存放系统参数,一个扇区存放现在的系统参数,一个扇区存放之前的系统参数,算是做一个备份。每次修改cfg_holder的时候,就会读出不一致,然后把默认的参数及新cfg_holder写入另一个扇区;
// 系统参数---------------------------------------------------------------
typedef struct{
uint32_t cfg_holder; // 持有人标识(只有更新此数值,系统参数才会更新)
uint8_t device_id[64]; // 客户端ID[64] 【官方例程中是[32],将其改为[64]】
uint8_t sta_ssid[64]; // WIFI名[64]
uint8_t sta_pwd[64]; // WIFI密码[64]
uint32_t sta_type; // STA类型
uint8_t mqtt_host[64]; // MQTT服务端域名[64]
uint32_t mqtt_port; // MQTT端口
uint8_t mqtt_user[64]; // MQTT用户名[64] 【官方例程中是[32],将其改为[64]】
uint8_t mqtt_pass[64]; // MQTT密码[64] 【官方例程中是[32],将其改为[64]】
uint32_t mqtt_keepalive; // 保持连接时长
uint8_t security; // 安全类型
} SYSCFG;
// 加载/更新系统参数【WIFI参数、MQTT参数】
void ICACHE_FLASH_ATTR CFG_Load()
{
spi_flash_read,读扇区0x7C的标志位saveFlag.flag
if (saveFlag.flag == 0){
spi_flash_read,读出扇区0x79的系统参数
}else{
spi_flash_read,读出扇区0x7A的系统参数
}
if(sysCfg.cfg_holder != CFG_HOLDER){ // 持有人标识不同
sysCfg.cfg_holder = CFG_HOLDER; // 更新持有人标识
写入默认参数;
CFG_Save(); 将更新后的系统参数烧录到另一扇区,并更新saveFlag.flag
}
}
5.2、wifi、sntp、开启或断开MQTT连接
WIFI_Connect函数中,设置STA模式、WiFi名和密码,并开启1s的定时器WiFiLinker;
WiFiLinker回调函数中检查连接状态、是否获取到IP:
- 如果获取到IP则定时器改为2s;
- 如果未获取到IP则定时器改为0.5s;
- 此外,如果连接状态改变,调用user_main.c中的wifiConnectCb;
wifiConnectCb函数:
- 如果更新的状态为STATION_GOT_IP,意味着WiFi连接成功,则开启SNTP服务,并启动1s的定时器sntp_timer;
- 如果更新的状态为其他,则是断开了网络连接,调用MQTT_Disconnect(&mqttClient)断开MQTT连接;
sntp_timer回调函数:
- 如果获取网络时间失败,则打印错误信息、继续保持这个1s的定时器;
- 如果获取网络时间成功,则打印正确的时间、关闭这个定时器,调用MQTT_Connect(&mqttClient)开启MQTT连接;
前面大概的启动流程就是这样,下面看看有关MQTT的部分了;
5.3、MQTT_Client结构体、user_init中MQTT初始化
// MQTT客户端
typedef struct
{
struct espconn *pCon; // TCP连接结构体指针
uint8_t security; // 安全类型
uint8_t* host; // 服务端域名/地址
uint32_t port; // 网络连接端口号
ip_addr_t ip; // 32位IP地址
mqtt_state_t mqtt_state; // MQTT状态
mqtt_connect_info_t connect_info; // MQTT【CONNECT】报文的连接参数
MqttCallback connectedCb; // MQTT连接成功_回调
MqttCallback disconnectedCb; // MQTT断开连接_回调
MqttCallback publishedCb; // MQTT发布成功_回调
MqttCallback timeoutCb; // MQTT超时_回调
MqttDataCallback dataCb; // MQTT接收数据_回调
ETSTimer mqttTimer; // MQTT定时器
uint32_t keepAliveTick; // MQTT客户端(ESP8266)心跳计数
uint32_t reconnectTick; // 重连等待计时
uint32_t sendTimeout; // 报文发送超时时间
tConnState connState; // ESP8266运行状态
QUEUE msgQueue; // 消息队列
void* user_data; // 用户数据(预留给用户的指针)
} MQTT_Client;
void user_init(void)
{
.......
// 网络连接参数赋值:服务端域名、端口【1883】、安全类型【0:NO_TLS】
MQTT_InitConnection(&mqttClient, sysCfg.mqtt_host, sysCfg.mqtt_port, sysCfg.security);
// MQTT连接参数赋值:客户端标识符、MQTT用户名、MQTT密钥、保持连接时长【120s】、清除会话【1:clean_session】
MQTT_InitClient(&mqttClient, sysCfg.device_id, sysCfg.mqtt_user, sysCfg.mqtt_pass, sysCfg.mqtt_keepalive, 1);
// 设置遗嘱参数(如果云端没有对应的遗嘱主题,则MQTT连接会被拒绝)
// MQTT_InitLWT(&mqttClient, "Will", "ESP8266_offline", 0, 0);
// 设置MQTT相关函数
MQTT_OnConnected(&mqttClient, mqttConnectedCb); // 设置【MQTT成功连接】函数的另一种调用方式
MQTT_OnDisconnected(&mqttClient, mqttDisconnectedCb); // 设置【MQTT成功断开】函数的另一种调用方式
MQTT_OnPublished(&mqttClient, mqttPublishedCb); // 设置【MQTT成功发布】函数的另一种调用方式
MQTT_OnData(&mqttClient, mqttDataCb); // 设置【接收MQTT数据】函数的另一种调用方式
.......
}
- MQTT_InitConnection
- 为mqttClient申请内存;
- 赋值服务器域名、端口、安全类型;
- MQTT_InitClient
- mqttClient赋值,除了参数列表中的还有mqtt_state、connect_info、msgQueue等信息;
- 安排任务MQTT_Task;
- MQTT_Onxxx,这部分是设置mqttClient中的回调函数;
5.4、MQTT_Connect、DNS域名解析、mqttTimer定时器
5.4.1、MQTT_Connect
MQTT_Connect 是在WiFi已连接、SNTP获取成功后调用的,下面的代码省略了很多,主要内容:
- 设置TCP连接的参数,如服务器和ESP8266的端口号;
- 注册TCP相关的回调函数:mqtt_tcpclient_connect_cb、mqtt_tcpclient_recon_cb;
- 开启mqttTimer定时器;
- 设置DNS域名解析,解析MQTT服务器的域名;
void ICACHE_FLASH_ATTR MQTT_Connect(MQTT_Client *mqttClient)
{
// 开始MQTT连接前,判断是否存在MQTT的TCP连接。如果有,则清除之前的TCP连接-
if (mqttClient->pCon){
mqtt_tcpclient_delete(mqttClient); // 删除TCP连接、释放pCon内存、清除TCP连接指针
}
// TCP连接设置---------------------------------------------------------
mqttClient->pCon->proto.tcp->local_port = espconn_port(); // 获取ESP8266可用端口
mqttClient->pCon->proto.tcp->remote_port = mqttClient->port; // 设置端口号
espconn_regist_connectcb(mqttClient->pCon, mqtt_tcpclient_connect_cb); // 注册TCP连接成功的回调函数
espconn_regist_reconcb(mqttClient->pCon, mqtt_tcpclient_recon_cb); // 注册TCP异常中断的回调函数
mqttClient->keepAliveTick = 0; // MQTT客户端(ESP8266)心跳计数
mqttClient->reconnectTick = 0; // 重连等待计时:当进入重连请求状态后,需等待5秒,之后进行重新连接
// 设置MQTT定时(1秒重复)【功能:心跳计时、重连计时、TCP发送计时】
os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient);
// 解析域名----------------------------------------------------
espconn_gethostbyname(mqttClient->pCon, mqttClient->host, &mqttClient->ip, mqtt_dns_found);
mqttClient->connState = TCP_CONNECTING; // TCP正在连接
}
5.4.2、DNS域名解析回调
- 失败:
- 状态设为 TCP_RECONNECT_REQ,表示TCP重连请求;
- 成功:
- 调用espconn_connect 连接TCP;
- 状态TCP_CONNECTING,表示TCP正在连接;
- 安排任务MQTT_Task;
TCP_CONNECTING状态,任务里并没有执行什么操作,所以下一步5.5中是看TCP是否连接成功
LOCAL void ICACHE_FLASH_ATTR mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
{
if (ipaddr == NULL){ // 域名解析失败
client->connState = TCP_RECONNECT_REQ; // TCP重连请求(等待5秒)
return;
}
// 判断IP地址是否正确(?=0)
if (client->ip.addr == 0 && ipaddr->addr != 0){
espconn_connect(client->pCon); // TCP连接(作为Client连接Server)
client->connState = TCP_CONNECTING; // TCP正在连接
}
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); // 安排任务MQTT_Task
}
5.4.3、mqttTimer定时器
现在看上面蓝色的,DNS解析失败的后续操作;
mqttTimer定时器回调函数中,如果状态为TCP_RECONNECT_REQ,每秒自增,到5s后切换状态为TCP_RECONNECT,运行任务;
任务中再次调用MQTT_Connect,重新设置TCP连接相关参数,设置DNS域名解析,见5.4.1;
else if (client->connState == TCP_RECONNECT_REQ){ // TCP重连请求(等待5秒)
client->reconnectTick ++; // 重连计时++
if (client->reconnectTick > MQTT_RECONNECT_TIMEOUT){ // 重连请求超过5秒
client->reconnectTick = 0; // 重连计时 = 0
client->connState = TCP_RECONNECT; // TCP重新连接
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);// 安排任务
}
}
case TCP_RECONNECT_REQ:
break;
case TCP_RECONNECT:
mqtt_tcpclient_delete(client); // 删除TCP连接、释放pCon内存、清除TCP连接指针
MQTT_Connect(client); // MQTT连接准备:TCP连接、域名解析等
INFO("TCP: Reconnect to: %s:%drn", client->host, client->port);
client->connState = TCP_CONNECTING; // TCP正在连接
break;
5.5、TCP连接的相关操作(CONNECT、CONNACK)
当前面DNS域名解析成功,得知MQTT服务器的IP地址后,就会开始TCP连接了,有关注册TCP连接成功、连接失败的回调函数是在MQTT_Connect中;
5.5.1、TCP连接失败
mqtt_tcpclient_recon_cb,切换状态为TCP_RECONNECT_REQ,进入TCP重连请求状态,上面刚说过,就是在mqttTimer定时器中自增,到5s后切换状态为TCP_RECONNECT,在任务中重新调用MQTT_Connect;
void ICACHE_FLASH_ATTR mqtt_tcpclient_recon_cb(void *arg, sint8 errType)
{
struct espconn *pCon = (struct espconn *)arg; // 获取TCP连接指针
MQTT_Client* client = (MQTT_Client *)pCon->reverse; // 获取mqttClient指针
INFO("TCP: Reconnect to %s:%drn", client->host, client->port);
client->connState = TCP_RECONNECT_REQ; // TCP重连请求(等待5秒)
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); // 安排任务MQTT_Task
}
5.5.2、TCP连接成功、开始建立MQTT连接
- 注册TCP正常断开、收发数据的回调函数;
- 发送CONNECT报文;
发送CONNECT报文,就是按照MQTT协议,一点点配置固定报头、可变报头、有效载荷的部分,太细节了,这又涉及到了mqtt_msg,好家伙,又是个死长的代码,先不看了,等我写出bug再研究,哈哈哈哈~
void ICACHE_FLASH_ATTR mqtt_tcpclient_connect_cb(void *arg)
{
struct espconn *pCon = (struct espconn *)arg; // 获取TCP连接指针
MQTT_Client* client = (MQTT_Client *)(pCon->reverse);// 获取mqttClient指针
// 注册回调函数--------------------------------------------------------------
espconn_regist_disconcb(client->pCon, mqtt_tcpclient_discon_cb); // TCP断开成功_回调
espconn_regist_recvcb(client->pCon, mqtt_tcpclient_recv); // TCP接收成功_回调
espconn_regist_sentcb(client->pCon, mqtt_tcpclient_sent_cb); // TCP发送成功_回调
INFO("MQTT: Connected to broker %s:%drn", client->host, client->port);
// 【CONNECT】报文发送准备--------------------------------------------------------------
// 初始化MQTT报文缓存区
mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length);
// 配置【CONNECT】控制报文,并获取【CONNECT】报文[指针]、[长度]
client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info);
// 获取待发送的报文类型(此处是【CONNECT】报文)
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
// 获取待发送报文中的【报文标识符】(【CONNECT】报文中没有)
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data,client->mqtt_state.outbound_message->length);
// TCP发送成功/报文发送5秒计时结束 => 报文发送结束(sendTimeout=0)
client->sendTimeout = MQTT_SEND_TIMOUT; // 发送MQTT报文时,sendTimeout=5
INFO("MQTT: Sending, type: %d, id: %04Xrn", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
// TCP:发送【CONNECT】报文----------------------------------------------------------
espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
// 报文发送完--------------------------------------------------------------
client->mqtt_state.outbound_message = NULL; //清除出站报文指针
client->connState = MQTT_CONNECT_SENDING; //更新状态
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); // 安排任务MQTT_Task
}
5.5.3、TCP接收数据回调中有关CONNACK连接确认
上面已经发送了CONNECT报文,服务器要返回CONNACK报文,所以看下TCP接收数据的回调,同样这里只截取有关CONNACK的内容;
- 打印接收数据的长度、前4字节的内容;(长度为4、前4字节为:32、2、0、0)
- 判断是MQTT_CONNECT_SENDING状态,这一状态在发送CONNECT报文中被设置;
- 判断当前报文是CONNACK;
- 修改状态为MQTT_DATA;
- 执行MQTT连接成功的回调函数:mqttConnectedCb
- 判断当前报文是CONNACK;
void ICACHE_FLASH_ATTR mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
{
INFO("TCP: data received %d bytesrn",len);
INFO("TCP: data received %d,%d,%d,%d rn", *pdata,*(pdata+1),*(pdata+2),*(pdata+3));
if (len<MQTT_BUF_SIZE && len>0){ // 接收到的数据长度在允许范围内
os_memcpy(client->mqtt_state.in_buffer, pdata, len); // 获取接收数据,存入【入站报文缓存区】
msg_type = mqtt_get_type(client->mqtt_state.in_buffer); // 获取【报文类型】
// 根据ESP8266运行状态,执行相应操作---------------------------------------------
switch (client->connState){
case MQTT_CONNECT_SENDING: // 【MQTT_CONNECT_SENDING】
if (msg_type == MQTT_MSG_TYPE_CONNACK){ // 判断消息类型!=【CONNACK】
// ESP8266发送 == 【CONNECT】报文---------------------------------------
INFO("MQTT: Connected to %s:%drn", client->host, client->port);
client->connState = MQTT_DATA; // ESP8266状态改变:【MQTT_DATA】
if (client->connectedCb) // 执行[mqttConnectedCb]函数(MQTT连接成功函数)
client->connectedCb((uint32_t*)client); // 参数 = mqttClient
}
break;
.......
}
}
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); // 安排任务MQTT_Task
}
5.6、建立MQTT连接之后
在建立MQTT连接之后会调用这个函数,可以看到:
- 订阅了主题"$iot/esp8266/user/both";
- 向主题"$iot/esp8266/user/both"发布"ESP8266_Online"的消息;
// MQTT已成功连接:ESP8266发送【CONNECT】,并接收到【CONNACK】========================
void mqttConnectedCb(uint32_t *args)
{
MQTT_Client* client = (MQTT_Client*)args; // 获取mqttClient指针
INFO("MQTT: Connectedrn");
// 订阅主题【参数2:主题过滤器 / 参数3:订阅Qos】
MQTT_Subscribe(client, "$iot/esp8266/user/both", 0);
// 发布主题【参数2:主题名 / 参数3:发布消息的有效载荷 / 参数4:有效载荷长度 / 参数5:发布Qos / 参数6:Retain】
MQTT_Publish(client, "$iot/esp8266/user/both", "ESP8266_Online", strlen("ESP8266_Online"), 0, 0);
}
5.7、订阅主题、向主题发布、解析收到的PUBLISH数据
终于到这里了,其实使用的话,只知道怎么订阅、发布,解析就可以,只是我不想。
5.7.1、订阅主题,使用MQTT_Subscribe
- 根据主题过滤器、订阅Qos,配置SUBSCRIBE报文内容,其实调用的是mqtt_msg_subscribe;
- 然后将SUBSCRIBE报文写入队列;
- 发送SUBSCRIBE报文在MQTT_Task的case MQTT_DATA中;
// ESP8266订阅主题【参数2:主题过滤器 / 参数3:订阅Qos】
BOOL ICACHE_FLASH_ATTR MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos)
{
uint8_t dataBuffer[MQTT_BUF_SIZE]; // 解析后报文缓存(1204字节)
uint16_t dataLen; // 解析后报文长度
// 配置【SUBSCRIBE】报文,并获取【SUBSCRIBE】报文[指针]、[长度]
client->mqtt_state.outbound_message=mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,topic, qos,&client->mqtt_state.pending_msg_id);
INFO("MQTT: queue subscribe, topic"%s", id: %drn", topic, client->mqtt_state.pending_msg_id);
// 将报文写入队列,并返回写入字节数(包括特殊码)
while(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
INFO("MQTT: Queue fullrn");
// 解析队列中的报文
if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1){
INFO("MQTT: Serious buffer errorrn");
return FALSE;
}
}
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); // 安排任务MQTT_Task
return TRUE;
}
5.7.2、发布主题,使用MQTT_Publish
- 根据主题、有效载荷及长度、订阅Qos,retain,配置PUBLISH报文内容,其实调用的是mqtt_msg_publish;
- 然后将PUBLISH报文写入队列;
- 发送PUBLISH报文在MQTT_Task的case MQTT_DATA中;
// ESP8266向主题发布消息:【参数2:主题名 / 参数3:发布消息的有效载荷 / 参数4:有效载荷长度 / 参数5:发布Qos / 参数6:Retain】
BOOL ICACHE_FLASH_ATTR MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain)
{
uint8_t dataBuffer[MQTT_BUF_SIZE]; // 解析后报文缓存(1204字节)
uint16_t dataLen; // 解析后报文长度
// 配置【PUBLISH】报文,并获取【PUBLISH】报文[指针]、[长度]
client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
topic, data, data_length,
qos, retain,
&client->mqtt_state.pending_msg_id);
if (client->mqtt_state.outbound_message->length == 0){ // 判断报文是否正确
INFO("MQTT: Queuing publish failedrn");
return FALSE;
}
// 串口打印:【PUBLISH】报文长度,(队列装填数量/队列大小)
INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)rn", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
// 将报文写入队列,并返回写入字节数(包括特殊码)
while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1)
{
INFO("MQTT: Queue fullrn"); // 队列已满
// 解析队列中的数据包
if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1){
INFO("MQTT: Serious buffer errorrn");
return FALSE;
}
}
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); // 安排任务
return TRUE;
}
5.7.3、解析收到的PUBLISH数据,调用user_main.c中的mqttDataCb
- 首先,TCP接收数据的回调函数mqtt_tcpclient_recv中会收到这个报文;
- 然后判断收到的是PUBLISH报文,执行有关应答的操作,PUBLISH数据内容处理调用deliver_publish;
- deliver_publish中调用的则是user_main.c中的mqttDataCb;
// ESP8266接收到【PUBLISH】报文:发布消息
case MQTT_MSG_TYPE_PUBLISH:
if (msg_qos == 1) // 【服务端->客户端】发布消息 Qos=1
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); // 配置【PUBACK】报文
else if (msg_qos == 2) // 【服务端->客户端】发布消息 Qos=2
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); // 配置【PUBREC】报文
if (msg_qos == 1 || msg_qos == 2){
INFO("MQTT: Queue response QoS: %drn", msg_qos);
// 将ESP8266应答报文(【PUBACK】或【PUBREC】),写入队列
if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
INFO("MQTT: Queue fullrn");
}
}
// 获取服务端【PUBLISH】报文的【主题】、【有效载荷】
deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
break;
// ESP8266获取服务端【PUBLISH】报文的【主题】、【有效载荷】
LOCAL void ICACHE_FLASH_ATTR deliver_publish(MQTT_Client* client, uint8_t* message, int length)
{
mqtt_event_data_t event_data;
event_data.topic_length = length; // 主题名长度初始化
event_data.topic = mqtt_get_publish_topic(message, &event_data.topic_length);// 获取【PUBLISH】报文的主题名(指针)、主题名长度
event_data.data_length = length; // 有效载荷长度初始化
event_data.data = mqtt_get_publish_data(message, &event_data.data_length); // 获取【PUBLISH】报文的载荷(指针)、载荷长度
// 进入【接收MQTT的[PUBLISH]数据】函数
if (client->dataCb)
client->dataCb((uint32_t*)client, event_data.topic, event_data.topic_length, event_data.data, event_data.data_length);
}
在mqttDataCb这个函数里,就可以根据收到的主题名,有效载荷的内容执行你想要的操作了;
在INFO打印的信息里,有Topic_len和Data_len,就可以发现这个长度是不包括字符串末尾的'