我是靠谱客的博主 妩媚玉米,最近开发中收集的这篇文章主要介绍ESP8266_MQTT协议1、了解下MQTT协议2、连接相关的报文3、发布相关的报文4、订阅相关的报文5、MQTT的代码,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1、了解下MQTT协议

虽然上一篇用起来了MQTT,但是并不十分了解,基本就局限于,发布主题是发送数据,订阅主题是接收数据,今天就再好好了解一下吧。

分享下网页版的“MQTT协议中文版”链接:Introduction · MQTT协议中文版

1.1、基本的概念

MQTT 是一个 客户端/服务端 架构的 发布/订阅模式 的消息传输协议;
  • 客户端:可以发布和订阅某主题的消息;
  • 服务器:接收消息、转发消息给订阅该主题的客户端;

1.2、主题的表示

1.2.1、主题名用UTF-8编码

UTF-8编码兼容ASCII码,所以自己定义主题名的时候就用字母、数字、下划线就行了;

1.2.2、层级分隔符"/"

主题名之间可以层级分隔符"/"隔开,这样主题名是层次化的,方便管理;

1.2.3、主题通配符

主题通配符,是为了可以一次性订阅多个主题,所以只使用在订阅主题,不能用在发布主题;

  • 多层通配符"#"
    • 在末尾分隔符后,匹配当前层级,以及所有子层级
    • "xxx/#"可匹配:"xxx","xxx/xx1","xxx/xx2","xxx/xx1/xx3";
  • 单层通配符"+"
    • 任意位置,匹配当前层级的下一子层级
    • "xxx/+"可匹配:"xxx/xx1","xxx/xx2";

1.2.4、以$开头的主题

$开头的主题名 不能匹配 通配符"#""+"开头的主题;

P.S. 比如百度云的主题名形式就为:"$iot/device/msg";关于主题名更细节或特别使用方式的说明见文档;


1.3、关于UTF-8编码

非常详细的字符编码讲解,ASCII、GB2312、GBK、Unicode、UTF-8等知识点都有_哔哩哔哩_bilibili

不知道UTF-8编码是啥,所以去瞅了瞅,这个视频主要内容:

  • 最初的ASCII编码,以及衍生出ASCII扩展码;
  • 各国家文字有自己的编码,比如中国的GB2312编码、GBK编码;
  • 为了同一各国字符编码,Unicode标准提出了USC-2、USC-4字符集;
  • UTF-8编码就是根据USC-4字符集而来的编码方式,将字符集划分为4个区间,根据实际字符选择对应的x字节格式,并以二进制填充到此格式中;

比如,王的Unicode编码:U+738B,属于3字节的UTF-8编码范围,将0x738B转换成2进制,填充到1110xxxx 10xxxxxx 10xxxxxx;

为了避免UP主糊弄我,啊不是我开玩笑的,狗头狗头,为了追本溯源我又找到了个查询Unicode的网站,网站很友好,除了Unicode编码还给出了HTML代码、CSS代码、处于哪个编码区域、UTF-8、UTF-16、UTF-32等;

王 - 中日韩象形文字: U+738B - Unicode 字符百科 (unicode-table.com)


1.4、服务质量等级QoS

QoS = 0:至多发送一次消息;

QoS = 1:至少发送一次消息;

QoS = 2:仅发送一次消息;

昨天还是前天刚接触到MQTT,随便搜索的时候看到的例子,链接不知道了,TA说明了共享单车在不同情境下的QoS设置。

  • 当骑行的时候,车辆定期上报数据,所以偶尔丢失也没关系,此时QoS = 0;
  • 当开锁的时候,因为要保证开锁成功,所以至少要有一次收到开锁指令,QoS  = 1;
  • 当骑行完成缴费的时候,就要求有且仅有一次支付,这时QoS = 2;

1.5、MQTT报文格式(固定报头、可变报头、有效载荷)

1.5.1、固定报头(所有报文都有)

 分类下你可以发现,其实控制报文的类型就三大类,连接、订阅、发布

 除发布PUBLISH外,其余报文类型标志位都是与报文类型直接一一对应的;

剩余长度:可变报头和有效载荷的字节数;

采用变长编码方式,每个字节的低7位表示数据,最高位表示有无下一字节,最多4个字节;

也就是128进制,举个例子,比如剩余长度编码为两个字节,第一个字节1111 1111,第二个字节0111 1111;

可以看做1 127、0 127,也就是127*128^1 + 127*128^0 = 16383;


1.5.2、可变报头(部分报文有)

根据报文类型的不同,可变报头存在多个字段,这里只介绍其中的一个“报文标识符”,其余字段根据具体报文类型来看;


1.5.3、有效载荷(部分报文有)


1.6、MQTT协议中字符串的表示

前两个字节表示后面字符串的长度,后面是字符串的UTF-8格式的字符数据;


2、连接相关的报文

2.1、 CONNECT 连接服务端

2.1.1、思维导图

2.1.2、涉及到的概念——会话

 


2.1.2、涉及到的概念——遗嘱

客户端发送DISCONNECT断开连接,服务端会删除遗嘱


2.2、CONNACK 确认连接请求


2.3、DISCONNECT 断开连接

客户端发送完DISCONNECT,断开连接;

服务端发送完,如有则删除遗嘱,如客户端未关闭连接则关闭连接;

2.4、PINGREQ 心跳请求、PINGRESP 心跳响应


3、发布相关的报文

3.1、PUBLISH 发布消息

3.2、PUBACK 发布确认(对QoS=1的PUBLISH响应)


4、订阅相关的报文

4.1、SUBSCRIBE 订阅主题

4.2、SUBACK 订阅确认

4.3、UNSUBSCRIBE 取消订阅、UNSUBACK 取消订阅确认


5、MQTT的代码

5.1、config 加载或更新系统参数

config.h定义了一个系统参数的结构体,用来存放WiFi信息和MQTT服务端域名、端口、本机的用户名密码等;

配置这些系统参数的宏定义在mqtt_config.h;

这个config文件的作用就是,从flash中加载系统参数,代码中利用了两个扇区存放系统参数,一个扇区存放现在的系统参数,一个扇区存放之前的系统参数,算是做一个备份。每次修改cfg_holder的时候,就会读出不一致,然后把默认的参数及新cfg_holder写入另一个扇区

// 系统参数---------------------------------------------------------------
typedef struct{
	uint32_t cfg_holder;		// 持有人标识(只有更新此数值,系统参数才会更新)
	uint8_t device_id[64];		// 客户端ID[64]		【官方例程中是[32],将其改为[64]】

	uint8_t sta_ssid[64];		// WIFI名[64]
	uint8_t sta_pwd[64];		// WIFI密码[64]
	uint32_t sta_type;			// STA类型

	uint8_t mqtt_host[64];		// MQTT服务端域名[64]
	uint32_t mqtt_port;			// MQTT端口

	uint8_t mqtt_user[64];		// MQTT用户名[64]		【官方例程中是[32],将其改为[64]】
	uint8_t mqtt_pass[64];		// MQTT密码[64]		【官方例程中是[32],将其改为[64]】

	uint32_t mqtt_keepalive;	// 保持连接时长
	uint8_t security;			// 安全类型
} SYSCFG;
// 加载/更新系统参数【WIFI参数、MQTT参数】
void ICACHE_FLASH_ATTR CFG_Load()
{
	spi_flash_read,读扇区0x7C的标志位saveFlag.flag

	if (saveFlag.flag == 0){
		spi_flash_read,读出扇区0x79的系统参数
	}else{
		spi_flash_read,读出扇区0x7A的系统参数
	}

	if(sysCfg.cfg_holder != CFG_HOLDER){			// 持有人标识不同
		sysCfg.cfg_holder = CFG_HOLDER;				// 更新持有人标识
		写入默认参数;
		CFG_Save();	将更新后的系统参数烧录到另一扇区,并更新saveFlag.flag
	}
}

5.2、wifi、sntp、开启或断开MQTT连接

WIFI_Connect函数中,设置STA模式、WiFi名和密码,并开启1s的定时器WiFiLinker;

WiFiLinker回调函数中检查连接状态、是否获取到IP

  • 如果获取到IP则定时器改为2s;
  • 如果未获取到IP则定时器改为0.5s;
  • 此外,如果连接状态改变,调用user_main.c中的wifiConnectCb

wifiConnectCb函数:

  • 如果更新的状态为STATION_GOT_IP,意味着WiFi连接成功,则开启SNTP服务,并启动1s的定时器sntp_timer;
  • 如果更新的状态为其他,则是断开了网络连接,调用MQTT_Disconnect(&mqttClient)断开MQTT连接;

sntp_timer回调函数:

  • 如果获取网络时间失败,则打印错误信息、继续保持这个1s的定时器;
  • 如果获取网络时间成功,则打印正确的时间、关闭这个定时器,调用MQTT_Connect(&mqttClient)开启MQTT连接;

前面大概的启动流程就是这样,下面看看有关MQTT的部分了;


5.3、MQTT_Client结构体、user_init中MQTT初始化

// MQTT客户端
typedef struct
{
	struct espconn *pCon;				// TCP连接结构体指针
	uint8_t security;					// 安全类型
	uint8_t* host;						// 服务端域名/地址
	uint32_t port;						// 网络连接端口号
	ip_addr_t ip;						// 32位IP地址

	mqtt_state_t  mqtt_state;			// MQTT状态

	mqtt_connect_info_t connect_info;	// MQTT【CONNECT】报文的连接参数

	MqttCallback connectedCb;			// MQTT连接成功_回调
	MqttCallback disconnectedCb;		// MQTT断开连接_回调
	MqttCallback publishedCb;			// MQTT发布成功_回调
	MqttCallback timeoutCb;				// MQTT超时_回调
	MqttDataCallback dataCb;			// MQTT接收数据_回调

	ETSTimer mqttTimer;					// MQTT定时器

	uint32_t keepAliveTick;				// MQTT客户端(ESP8266)心跳计数
	uint32_t reconnectTick;				// 重连等待计时
	uint32_t sendTimeout;				// 报文发送超时时间

	tConnState connState;				// ESP8266运行状态

	QUEUE msgQueue;						// 消息队列

	void* user_data;					// 用户数据(预留给用户的指针)

} MQTT_Client;
void user_init(void)
{
    .......

    // 网络连接参数赋值:服务端域名、端口【1883】、安全类型【0:NO_TLS】
	MQTT_InitConnection(&mqttClient, sysCfg.mqtt_host, sysCfg.mqtt_port, sysCfg.security);

	// MQTT连接参数赋值:客户端标识符、MQTT用户名、MQTT密钥、保持连接时长【120s】、清除会话【1:clean_session】
	MQTT_InitClient(&mqttClient, sysCfg.device_id, sysCfg.mqtt_user, sysCfg.mqtt_pass, sysCfg.mqtt_keepalive, 1);

	// 设置遗嘱参数(如果云端没有对应的遗嘱主题,则MQTT连接会被拒绝)
//	MQTT_InitLWT(&mqttClient, "Will", "ESP8266_offline", 0, 0);

	// 设置MQTT相关函数
	MQTT_OnConnected(&mqttClient, mqttConnectedCb);			// 设置【MQTT成功连接】函数的另一种调用方式
	MQTT_OnDisconnected(&mqttClient, mqttDisconnectedCb);	// 设置【MQTT成功断开】函数的另一种调用方式
	MQTT_OnPublished(&mqttClient, mqttPublishedCb);			// 设置【MQTT成功发布】函数的另一种调用方式
	MQTT_OnData(&mqttClient, mqttDataCb);					// 设置【接收MQTT数据】函数的另一种调用方式

	.......
}
  • MQTT_InitConnection
    • 为mqttClient申请内存;
    • 赋值服务器域名、端口、安全类型;
  • MQTT_InitClient
    • mqttClient赋值,除了参数列表中的还有mqtt_state、connect_info、msgQueue等信息;
    • 安排任务MQTT_Task;
  • MQTT_Onxxx,这部分是设置mqttClient中的回调函数;

5.4、MQTT_Connect、DNS域名解析、mqttTimer定时器

5.4.1、MQTT_Connect

MQTT_Connect 是在WiFi已连接、SNTP获取成功后调用的,下面的代码省略了很多,主要内容:

  • 设置TCP连接的参数,如服务器和ESP8266的端口号;
  • 注册TCP相关的回调函数:mqtt_tcpclient_connect_cb、mqtt_tcpclient_recon_cb;
  • 开启mqttTimer定时器;
  • 设置DNS域名解析,解析MQTT服务器的域名;
void ICACHE_FLASH_ATTR MQTT_Connect(MQTT_Client *mqttClient)
{
	// 开始MQTT连接前,判断是否存在MQTT的TCP连接。如果有,则清除之前的TCP连接-
    if (mqttClient->pCon){
        mqtt_tcpclient_delete(mqttClient);	// 删除TCP连接、释放pCon内存、清除TCP连接指针
    }

    // TCP连接设置---------------------------------------------------------
    mqttClient->pCon->proto.tcp->local_port = espconn_port();					// 获取ESP8266可用端口
    mqttClient->pCon->proto.tcp->remote_port = mqttClient->port;				// 设置端口号

    espconn_regist_connectcb(mqttClient->pCon, mqtt_tcpclient_connect_cb);		// 注册TCP连接成功的回调函数
    espconn_regist_reconcb(mqttClient->pCon, mqtt_tcpclient_recon_cb);			// 注册TCP异常中断的回调函数

    mqttClient->keepAliveTick = 0;	// MQTT客户端(ESP8266)心跳计数
    mqttClient->reconnectTick = 0;	// 重连等待计时:当进入重连请求状态后,需等待5秒,之后进行重新连接

    // 设置MQTT定时(1秒重复)【功能:心跳计时、重连计时、TCP发送计时】
    os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient);										

    // 解析域名----------------------------------------------------
    espconn_gethostbyname(mqttClient->pCon, mqttClient->host, &mqttClient->ip, mqtt_dns_found);

    mqttClient->connState = TCP_CONNECTING;		// TCP正在连接
}

5.4.2、DNS域名解析回调

  • 失败
    • 状态设为 TCP_RECONNECT_REQ,表示TCP重连请求;
  • 成功:
    • 调用espconn_connect 连接TCP;
    • 状态TCP_CONNECTING,表示TCP正在连接;
    • 安排任务MQTT_Task;

TCP_CONNECTING状态,任务里并没有执行什么操作,所以下一步5.5中是看TCP是否连接成功

LOCAL void ICACHE_FLASH_ATTR mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
{
    if (ipaddr == NULL){		// 域名解析失败
        client->connState = TCP_RECONNECT_REQ;	// TCP重连请求(等待5秒)
        return;
    }
    // 判断IP地址是否正确(?=0)
    if (client->ip.addr == 0 && ipaddr->addr != 0){
        espconn_connect(client->pCon);		    // TCP连接(作为Client连接Server)
        client->connState = TCP_CONNECTING;		// TCP正在连接
    }
    system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);		// 安排任务MQTT_Task
}

5.4.3、mqttTimer定时器

现在看上面蓝色的,DNS解析失败的后续操作;

mqttTimer定时器回调函数中,如果状态为TCP_RECONNECT_REQ,每秒自增,到5s后切换状态为TCP_RECONNECT,运行任务;

任务中再次调用MQTT_Connect,重新设置TCP连接相关参数,设置DNS域名解析,见5.4.1;

    else if (client->connState == TCP_RECONNECT_REQ){	        // TCP重连请求(等待5秒)
        client->reconnectTick ++;	                            // 重连计时++
        if (client->reconnectTick > MQTT_RECONNECT_TIMEOUT){    // 重连请求超过5秒
            client->reconnectTick = 0;	                        // 重连计时 = 0
            client->connState = TCP_RECONNECT;	                // TCP重新连接
            system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);// 安排任务
        }
    }
    	case TCP_RECONNECT_REQ:		
            break;

    	case TCP_RECONNECT:
    		mqtt_tcpclient_delete(client);	// 删除TCP连接、释放pCon内存、清除TCP连接指针
    		MQTT_Connect(client);			// MQTT连接准备:TCP连接、域名解析等
    		INFO("TCP: Reconnect to: %s:%drn", client->host, client->port);
    		client->connState = TCP_CONNECTING;		// TCP正在连接
    		break;

5.5、TCP连接的相关操作(CONNECT、CONNACK)

当前面DNS域名解析成功,得知MQTT服务器的IP地址后,就会开始TCP连接了,有关注册TCP连接成功、连接失败的回调函数是在MQTT_Connect中;

5.5.1、TCP连接失败 

mqtt_tcpclient_recon_cb,切换状态为TCP_RECONNECT_REQ,进入TCP重连请求状态,上面刚说过,就是在mqttTimer定时器中自增,到5s后切换状态为TCP_RECONNECT,在任务中重新调用MQTT_Connect;

void ICACHE_FLASH_ATTR mqtt_tcpclient_recon_cb(void *arg, sint8 errType)
{
    struct espconn *pCon = (struct espconn *)arg;		// 获取TCP连接指针
    MQTT_Client* client = (MQTT_Client *)pCon->reverse;	// 获取mqttClient指针

    INFO("TCP: Reconnect to %s:%drn", client->host, client->port);

    client->connState = TCP_RECONNECT_REQ;		        // TCP重连请求(等待5秒)

    system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);	// 安排任务MQTT_Task
}

5.5.2、TCP连接成功、开始建立MQTT连接

  • 注册TCP正常断开、收发数据的回调函数;
  • 发送CONNECT报文;

发送CONNECT报文,就是按照MQTT协议,一点点配置固定报头、可变报头、有效载荷的部分,太细节了,这又涉及到了mqtt_msg,好家伙,又是个死长的代码,先不看了,等我写出bug再研究,哈哈哈哈~

void ICACHE_FLASH_ATTR mqtt_tcpclient_connect_cb(void *arg)
{
    struct espconn *pCon = (struct espconn *)arg;		 // 获取TCP连接指针
    MQTT_Client* client = (MQTT_Client *)(pCon->reverse);// 获取mqttClient指针

    // 注册回调函数--------------------------------------------------------------
    espconn_regist_disconcb(client->pCon, mqtt_tcpclient_discon_cb);	// TCP断开成功_回调
    espconn_regist_recvcb(client->pCon, mqtt_tcpclient_recv);			// TCP接收成功_回调
    espconn_regist_sentcb(client->pCon, mqtt_tcpclient_sent_cb);		// TCP发送成功_回调

    INFO("MQTT: Connected to broker %s:%drn", client->host, client->port);

    // 【CONNECT】报文发送准备--------------------------------------------------------------
    // 初始化MQTT报文缓存区
	mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length);

	// 配置【CONNECT】控制报文,并获取【CONNECT】报文[指针]、[长度]
    client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info);

    // 获取待发送的报文类型(此处是【CONNECT】报文)
    client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);

    // 获取待发送报文中的【报文标识符】(【CONNECT】报文中没有)
    client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data,client->mqtt_state.outbound_message->length);

    // TCP发送成功/报文发送5秒计时结束 => 报文发送结束(sendTimeout=0)
    client->sendTimeout = MQTT_SEND_TIMOUT;	// 发送MQTT报文时,sendTimeout=5

    INFO("MQTT: Sending, type: %d, id: %04Xrn", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);

    // TCP:发送【CONNECT】报文----------------------------------------------------------
    espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
   
    // 报文发送完--------------------------------------------------------------
    client->mqtt_state.outbound_message = NULL;		//清除出站报文指针
    client->connState = MQTT_CONNECT_SENDING;		//更新状态
    system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);	// 安排任务MQTT_Task
}

5.5.3、TCP接收数据回调中有关CONNACK连接确认

上面已经发送了CONNECT报文,服务器要返回CONNACK报文,所以看下TCP接收数据的回调,同样这里只截取有关CONNACK的内容;

  • 打印接收数据的长度、前4字节的内容;(长度为4、前4字节为:32、2、0、0)
  • 判断是MQTT_CONNECT_SENDING状态,这一状态在发送CONNECT报文中被设置;
    • 判断当前报文是CONNACK;
      • 修改状态为MQTT_DATA;
      • 执行MQTT连接成功的回调函数:mqttConnectedCb
void ICACHE_FLASH_ATTR mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
{
    INFO("TCP: data received %d bytesrn",len);
    INFO("TCP: data received %d,%d,%d,%d rn", *pdata,*(pdata+1),*(pdata+2),*(pdata+3));

    if (len<MQTT_BUF_SIZE && len>0){				// 接收到的数据长度在允许范围内
        os_memcpy(client->mqtt_state.in_buffer, pdata, len);	// 获取接收数据,存入【入站报文缓存区】
        msg_type = mqtt_get_type(client->mqtt_state.in_buffer);	// 获取【报文类型】
        // 根据ESP8266运行状态,执行相应操作---------------------------------------------
        switch (client->connState){
        	case MQTT_CONNECT_SENDING:				// 【MQTT_CONNECT_SENDING】
                if (msg_type == MQTT_MSG_TYPE_CONNACK){ 	// 判断消息类型!=【CONNACK】
                    // ESP8266发送 == 【CONNECT】报文---------------------------------------
                   INFO("MQTT: Connected to %s:%drn", client->host, client->port);
                   client->connState = MQTT_DATA;	// ESP8266状态改变:【MQTT_DATA】
                   if (client->connectedCb)		// 执行[mqttConnectedCb]函数(MQTT连接成功函数)
                        client->connectedCb((uint32_t*)client);	// 参数 = mqttClient
                }
                break;
            .......
        }
    }
    system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);	// 安排任务MQTT_Task
}

5.6、建立MQTT连接之后

在建立MQTT连接之后会调用这个函数,可以看到:

  • 订阅了主题"$iot/esp8266/user/both";
  • 向主题"$iot/esp8266/user/both"发布"ESP8266_Online"的消息;
// MQTT已成功连接:ESP8266发送【CONNECT】,并接收到【CONNACK】========================
void mqttConnectedCb(uint32_t *args)
{
    MQTT_Client* client = (MQTT_Client*)args;	// 获取mqttClient指针

    INFO("MQTT: Connectedrn");

    // 订阅主题【参数2:主题过滤器 / 参数3:订阅Qos】
	MQTT_Subscribe(client, "$iot/esp8266/user/both", 0);

	// 发布主题【参数2:主题名 / 参数3:发布消息的有效载荷 / 参数4:有效载荷长度 / 参数5:发布Qos / 参数6:Retain】
	MQTT_Publish(client, "$iot/esp8266/user/both", "ESP8266_Online", strlen("ESP8266_Online"), 0, 0);
}

5.7、订阅主题、向主题发布、解析收到的PUBLISH数据

终于到这里了,其实使用的话,只知道怎么订阅、发布,解析就可以,只是我不想。

5.7.1、订阅主题,使用MQTT_Subscribe

  • 根据主题过滤器、订阅Qos,配置SUBSCRIBE报文内容,其实调用的是mqtt_msg_subscribe;
  • 然后将SUBSCRIBE报文写入队列;
  • 发送SUBSCRIBE报文在MQTT_Task的case MQTT_DATA中;
// ESP8266订阅主题【参数2:主题过滤器 / 参数3:订阅Qos】
BOOL ICACHE_FLASH_ATTR MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos)
{
    uint8_t dataBuffer[MQTT_BUF_SIZE];		// 解析后报文缓存(1204字节)
    uint16_t dataLen;						// 解析后报文长度

    // 配置【SUBSCRIBE】报文,并获取【SUBSCRIBE】报文[指针]、[长度]
    client->mqtt_state.outbound_message=mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,topic, qos,&client->mqtt_state.pending_msg_id);
    INFO("MQTT: queue subscribe, topic"%s", id: %drn", topic, client->mqtt_state.pending_msg_id);

    // 将报文写入队列,并返回写入字节数(包括特殊码)
    while(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
    	INFO("MQTT: Queue fullrn");
        // 解析队列中的报文
        if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1){
        	INFO("MQTT: Serious buffer errorrn");
            return FALSE;
        }
    }
    system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);	// 安排任务MQTT_Task
    return TRUE;
}

5.7.2、发布主题,使用MQTT_Publish

  • 根据主题、有效载荷及长度、订阅Qos,retain,配置PUBLISH报文内容,其实调用的是mqtt_msg_publish;
  • 然后将PUBLISH报文写入队列;
  • 发送PUBLISH报文在MQTT_Task的case MQTT_DATA中;
// ESP8266向主题发布消息:【参数2:主题名 / 参数3:发布消息的有效载荷 / 参数4:有效载荷长度 / 参数5:发布Qos / 参数6:Retain】
BOOL ICACHE_FLASH_ATTR MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain)
{
    uint8_t dataBuffer[MQTT_BUF_SIZE];	// 解析后报文缓存(1204字节)
    uint16_t dataLen;					// 解析后报文长度

    // 配置【PUBLISH】报文,并获取【PUBLISH】报文[指针]、[长度]
    client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
                                          topic, data, data_length,
                                          qos, retain,
                                          &client->mqtt_state.pending_msg_id);

    if (client->mqtt_state.outbound_message->length == 0){	// 判断报文是否正确
    	INFO("MQTT: Queuing publish failedrn");
        return FALSE;
    }

    // 串口打印:【PUBLISH】报文长度,(队列装填数量/队列大小)
    INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)rn", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);

    // 将报文写入队列,并返回写入字节数(包括特殊码)
    while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1)
    {
    	INFO("MQTT: Queue fullrn");	// 队列已满
        // 解析队列中的数据包
        if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1){
        	INFO("MQTT: Serious buffer errorrn");
            return FALSE;
        }
    }
    system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);	// 安排任务
    return TRUE;
}

5.7.3、解析收到的PUBLISH数据,调用user_main.c中的mqttDataCb

  • 首先,TCP接收数据的回调函数mqtt_tcpclient_recv中会收到这个报文;
  • 然后判断收到的是PUBLISH报文,执行有关应答的操作,PUBLISH数据内容处理调用deliver_publish
  • deliver_publish中调用的则是user_main.c中的mqttDataCb
         	// ESP8266接收到【PUBLISH】报文:发布消息
            case MQTT_MSG_TYPE_PUBLISH:
                if (msg_qos == 1)	    // 【服务端->客户端】发布消息 Qos=1
                    client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);	// 配置【PUBACK】报文
                else if (msg_qos == 2)	// 【服务端->客户端】发布消息 Qos=2
                    client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);	// 配置【PUBREC】报文

                if (msg_qos == 1 || msg_qos == 2){
                	INFO("MQTT: Queue response QoS: %drn", msg_qos);
                    // 将ESP8266应答报文(【PUBACK】或【PUBREC】),写入队列
                    if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
                    	INFO("MQTT: Queue fullrn");
                    }
                }
                // 获取服务端【PUBLISH】报文的【主题】、【有效载荷】
                deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
                break;
// ESP8266获取服务端【PUBLISH】报文的【主题】、【有效载荷】
LOCAL void ICACHE_FLASH_ATTR deliver_publish(MQTT_Client* client, uint8_t* message, int length)
{
    mqtt_event_data_t event_data;

    event_data.topic_length = length;	    // 主题名长度初始化
    event_data.topic = mqtt_get_publish_topic(message, &event_data.topic_length);// 获取【PUBLISH】报文的主题名(指针)、主题名长度
    event_data.data_length = length;	    // 有效载荷长度初始化
    event_data.data = mqtt_get_publish_data(message, &event_data.data_length);	 // 获取【PUBLISH】报文的载荷(指针)、载荷长度

    // 进入【接收MQTT的[PUBLISH]数据】函数
    if (client->dataCb)
        client->dataCb((uint32_t*)client, event_data.topic, event_data.topic_length, event_data.data, event_data.data_length);
}

在mqttDataCb这个函数里,就可以根据收到的主题名,有效载荷的内容执行你想要的操作了; 

在INFO打印的信息里,有Topic_len和Data_len,就可以发现这个长度是不包括字符串末尾的''的;

os_strcmp是比较两个字符串的内容是否一致,比较一致的结束是以''为条件的,所以要进行缓存添加''的相关操作;

// 【接收MQTT的[PUBLISH]数据】函数		【参数1:主题 / 参数2:主题长度 / 参数3:有效载荷 / 参数4:有效载荷长度】
void mqttDataCb(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t data_len)
{
    char *topicBuf = (char*)os_zalloc(topic_len+1);		// 申请【主题】空间
    char *dataBuf  = (char*)os_zalloc(data_len+1);		// 申请【有效载荷】空间
    MQTT_Client* client = (MQTT_Client*)args;	        // 获取MQTT_Client指针

    os_memcpy(topicBuf, topic, topic_len);	            // 缓存主题
    topicBuf[topic_len] = 0;				            // 最后添''
    os_memcpy(dataBuf, data, data_len);		            // 缓存有效载荷
    dataBuf[data_len] = 0;					            // 最后添''

    INFO("Receive topic: %s, data: %s rn", topicBuf, dataBuf);	// 串口打印【主题】【有效载荷】
    INFO("Topic_len = %d, Data_len = %drn", topic_len, data_len);	// 串口打印【主题长度】【有效载荷长度】

    // 根据接收到的主题名/有效载荷,控制LED的亮/灭
    if( os_strcmp(topicBuf,"$iot/esp8266/user/both") == 0){		    // 主题 == "SW_LED"
    	if( os_strcmp(dataBuf,"LED_ON") == 0){		    // 有效载荷 == "LED_ON"
    		GPIO_OUTPUT_SET(GPIO_ID_PIN(4),0);		    // LED亮
    	}else if( os_strcmp(dataBuf,"LED_OFF") == 0 ){	// 有效载荷 == "LED_OFF"
    		GPIO_OUTPUT_SET(GPIO_ID_PIN(4),1);			// LED灭
    	}
    }
    os_free(topicBuf);	// 释放【主题】空间
    os_free(dataBuf);	// 释放【有效载荷】空间
}

写完了,真开心,虽然才下午三点,我也要去床上躺平!!!我要看电影,玩游戏,吃辣条,吃冰棍,睡大觉!!!溜了溜了。

最后

以上就是妩媚玉米为你收集整理的ESP8266_MQTT协议1、了解下MQTT协议2、连接相关的报文3、发布相关的报文4、订阅相关的报文5、MQTT的代码的全部内容,希望文章能够帮你解决ESP8266_MQTT协议1、了解下MQTT协议2、连接相关的报文3、发布相关的报文4、订阅相关的报文5、MQTT的代码所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(55)

评论列表共有 0 条评论

立即
投稿
返回
顶部