我是靠谱客的博主 小巧秋天,最近开发中收集的这篇文章主要介绍STM32连接--OneNET,阿里云(MQTT协议)详细教程关于如何连接OneNET关于如何连接阿里云补充信息,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
????对于物联网工程,不可或缺的必然是连接上云,今天本人总结了上云经验,希望对大家起到帮助哦·~~~·有用的话记得点赞收藏哦❤️
关于如何连接OneNET
OneNEThttps://open.iot.10086.cn/console/
1.搭建云平台设备
1.1.协议选择
1.2.添加产品
1.3.添加设备
到这里我们就基本快完成云平台的搭建了
到此云平台已搭建完成
2.查看设备参数(MQTT连接使用)
产品id
设备id
设备鉴权信息
3.下位机代码编写
3.1.ESP8266.h
#ifndef _ESP8266_H_
#define _ESP8266_H_
#include "sys.h"
#define REV_OK
0 //接收完成标志
#define REV_WAIT 1 //接收未完成标志
void ESP8266_Init(void);
void Usart2_Init(unsigned int baud);
void ESP8266_Clear(void);
void ESP8266_SendData(unsigned char *data, unsigned short len);
unsigned char *ESP8266_GetIPD(unsigned short timeOut);
void Usart2_SendString(unsigned char *str, unsigned short len);
#endif
3.2.ESP8266.c
//单片机头文件
#include "stm32f10x.h"
//网络设备驱动
#include "esp8266.h"
/* FreeRTOS头文件 */
#include "bsp_SysTick.h"
#include "usart.h"
//C库
#include <string.h>
#include <stdio.h>
#define ESP8266_WIFI_INFO
"AT+CWJAP="hhh","12345678"rn"
#define ESP8266_ONENET_INFO
"AT+CIPSTART="TCP","183.230.40.39",6002rn"
unsigned char esp8266_buf[128];
unsigned short esp8266_cnt = 0, esp8266_cntPre = 0;
//==========================================================
// 函数名称: ESP8266_Clear
//
// 函数功能: 清空缓存
//
// 入口参数: 无
//
// 返回参数: 无
//
// 说明:
//==========================================================
void ESP8266_Clear(void)
{
memset(esp8266_buf, 0, sizeof(esp8266_buf));
esp8266_cnt = 0;
}
//==========================================================
// 函数名称: ESP8266_WaitRecive
//
// 函数功能: 等待接收完成
//
// 入口参数: 无
//
// 返回参数: REV_OK-接收完成
REV_WAIT-接收超时未完成
//
// 说明:
循环调用检测是否接收完成
//==========================================================
_Bool ESP8266_WaitRecive(void)
{
if(esp8266_cnt == 0)
//如果接收计数为0 则说明没有处于接收数据中,所以直接跳出,结束函数
return REV_WAIT;
if(esp8266_cnt == esp8266_cntPre)
//如果上一次的值和这次相同,则说明接收完毕
{
esp8266_cnt = 0;
//清0接收计数
return REV_OK;
//返回接收完成标志
}
esp8266_cntPre = esp8266_cnt;
//置为相同
return REV_WAIT;
//返回接收未完成标志
}
//==========================================================
// 函数名称: ESP8266_SendCmd
//
// 函数功能: 发送命令
//
// 入口参数: cmd:命令
//
res:需要检查的返回指令
//
// 返回参数: 0-成功 1-失败
//
// 说明:
//==========================================================
_Bool ESP8266_SendCmd(char *cmd, char *res, u16 time)
{
Usart2_SendString((unsigned char *)cmd, strlen((const char *)cmd));
while(time--)
{
if(ESP8266_WaitRecive() == REV_OK)
//如果收到数据
{
if(strstr((const char *)esp8266_buf, res) != NULL)
//如果检索到关键词
{
ESP8266_Clear();
//清空缓存
return 0;
}
}
Delay_ms(10);
}
return 1;
}
//==========================================================
// 函数名称: ESP8266_SendData
//
// 函数功能: 发送数据
//
// 入口参数: data:数据
//
len:长度
//
// 返回参数: 无
//
// 说明:
//==========================================================
void ESP8266_SendData(unsigned char *data, unsigned short len)
{
char cmdBuf[32];
ESP8266_Clear();
//清空接收缓存
sprintf(cmdBuf, "AT+CIPSEND=%drn", len);
//发送命令
if(!ESP8266_SendCmd(cmdBuf, ">", 200))
//收到‘>’时可以发送数据
{
Usart2_SendString(data, len);
//发送设备连接请求数据
}
}
//==========================================================
// 函数名称: ESP8266_GetIPD
//
// 函数功能: 获取平台返回的数据
//
// 入口参数: 等待的时间(乘以10ms)
//
// 返回参数: 平台返回的原始数据
//
// 说明:
不同网络设备返回的格式不同,需要去调试
//
如ESP8266的返回格式为 "+IPD,x:yyy" x代表数据长度,yyy是数据内容
//==========================================================
unsigned char *ESP8266_GetIPD(unsigned short timeOut)
{
char *ptrIPD = NULL;
do
{
if(ESP8266_WaitRecive() == REV_OK)
//如果接收完成
{
ptrIPD = strstr((char *)esp8266_buf, "IPD,");
//搜索“IPD”头
if(ptrIPD == NULL)
//如果没找到,可能是IPD头的延迟,还是需要等待一会,但不会超过设定的时间
{
//printf(""IPD" not foundrn");
}
else
{
ptrIPD = strchr(ptrIPD, ':');
//找到':'
if(ptrIPD != NULL)
{
ptrIPD++;
return (unsigned char *)(ptrIPD);
}
else
return NULL;
}
}
Delay_ms(5);
//延时等待
} while(timeOut--);
return NULL;
//超时还未找到,返回空指针
}
//==========================================================
// 函数名称: USART2_IRQHandler
//
// 函数功能: 串口2收发中断
//
// 入口参数: 无
//
// 返回参数: 无
//
// 说明:
//==========================================================
void USART2_IRQHandler(void)
{
if(USART_GetITStatus(USART2, USART_IT_RXNE) != RESET) //接收中断
{
if(esp8266_cnt >= sizeof(esp8266_buf)) esp8266_cnt = 0; //防止串口被刷爆
esp8266_buf[esp8266_cnt++] = USART2->DR;
USART_ClearFlag(USART2, USART_FLAG_RXNE);
}
}
//==========================================================
// 函数名称: ESP8266_Init
//
// 函数功能: 初始化ESP8266
//
// 入口参数: 无
//
// 返回参数: 无
//
// 说明:
//==========================================================
void ESP8266_Init(void)
{
GPIO_InitTypeDef GPIO_Initure;
RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOB, ENABLE);
//ESP8266复位引脚
GPIO_Initure.GPIO_Mode = GPIO_Mode_Out_PP;
GPIO_Initure.GPIO_Pin = GPIO_Pin_9;
//GPIOB1-复位
GPIO_Initure.GPIO_Speed = GPIO_Speed_50MHz;
GPIO_Init(GPIOB, &GPIO_Initure);
GPIO_WriteBit(GPIOB, GPIO_Pin_9, Bit_RESET);
Delay_ms(100);
GPIO_WriteBit(GPIOB, GPIO_Pin_9, Bit_SET);
Delay_ms(100);
ESP8266_Clear();
printf("ATrn");
while(ESP8266_SendCmd("ATrnr", "OK", 200))
Delay_ms(500);
printf("CWMODErn");
while(ESP8266_SendCmd("AT+CWMODE=1rn", "OK", 200))
Delay_ms(500);
printf("AT+CWDHCPrn");
while(ESP8266_SendCmd("AT+CWDHCP=1,1rn", "OK", 200))
Delay_ms(500);
printf("CWJAPrn");
while(ESP8266_SendCmd(ESP8266_WIFI_INFO, "GOT IP", 200))
Delay_ms(500);
printf("CIPSTARTrn");
while(ESP8266_SendCmd(ESP8266_ONENET_INFO, "CONNECT", 200))
Delay_ms(500);
printf("ESP8266 Init OKrn");
}
/*
************************************************************
* 函数名称: Usart2_Init
*
* 函数功能: 串口2初始化
*
* 入口参数: baud:设定的波特率
*
* 返回参数: 无
*
* 说明:
TX-PA2
RX-PA3
************************************************************
*/
void Usart2_Init(unsigned int baud)
{
GPIO_InitTypeDef gpio_initstruct;
USART_InitTypeDef usart_initstruct;
NVIC_InitTypeDef nvic_initstruct;
RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOA, ENABLE);
RCC_APB1PeriphClockCmd(RCC_APB1Periph_USART2, ENABLE);
//PA2 TXD
gpio_initstruct.GPIO_Mode = GPIO_Mode_AF_PP;
gpio_initstruct.GPIO_Pin = GPIO_Pin_2;
gpio_initstruct.GPIO_Speed = GPIO_Speed_50MHz;
GPIO_Init(GPIOA, &gpio_initstruct);
//PA3 RXD
gpio_initstruct.GPIO_Mode = GPIO_Mode_IN_FLOATING;
gpio_initstruct.GPIO_Pin = GPIO_Pin_3;
gpio_initstruct.GPIO_Speed = GPIO_Speed_50MHz;
GPIO_Init(GPIOA, &gpio_initstruct);
usart_initstruct.USART_BaudRate = baud;
usart_initstruct.USART_HardwareFlowControl = USART_HardwareFlowControl_None;
//无硬件流控
usart_initstruct.USART_Mode = USART_Mode_Rx | USART_Mode_Tx;
//接收和发送
usart_initstruct.USART_Parity = USART_Parity_No;
//无校验
usart_initstruct.USART_StopBits = USART_StopBits_1;
//1位停止位
usart_initstruct.USART_WordLength = USART_WordLength_8b;
//8位数据位
USART_Init(USART2, &usart_initstruct);
USART_Cmd(USART2, ENABLE);
//使能串口
USART_ITConfig(USART2, USART_IT_RXNE, ENABLE);
//使能接收中断
nvic_initstruct.NVIC_IRQChannel = USART2_IRQn;
nvic_initstruct.NVIC_IRQChannelCmd = ENABLE;
nvic_initstruct.NVIC_IRQChannelPreemptionPriority = 0;
nvic_initstruct.NVIC_IRQChannelSubPriority = 0;
NVIC_Init(&nvic_initstruct);
}
/*
************************************************************
* 函数名称: Usart_SendString
*
* 函数功能: 串口数据发送
*
* 入口参数: USARTx:串口组
*
str:要发送的数据
*
len:数据长度
*
* 返回参数: 无
*
* 说明:
************************************************************
*/
void Usart2_SendString(unsigned char *str, unsigned short len)
{
unsigned short count = 0;
for(; count < len; count++)
{
USART_SendData(USART2, *str++);
//发送数据
while(USART_GetFlagStatus(USART2, USART_FLAG_TC) == RESET);
//等待发送完成
}
}
3.3.onenet.h
#ifndef _ONENET_H_
#define _ONENET_H_
_Bool OneNet_DevLink(char *Tip);
void OneNet_SendData(char *Tip);
void OneNet_SendCmd(void);
void OneNet_RevPro(unsigned char *cmd);
_Bool OneNet_Subscribe(const char *topics[], unsigned char topic_cnt);
void OneNet_PacketPing(void);
_Bool OneNet_Publish(const char *topic, const char *msg);
void OneNet_HeartBeat(void);
#endif
3.4.onenet.c
//单片机头文件
#include "stm32f10x.h"
//网络设备
#include "esp8266.h"
//协议文件
#include "onenet.h"
#include "mqttkit.h"
#include "./SysTick/bsp_SysTick.h"
//硬件驱动
#include "usart.h"
//C库
#include <string.h>
#include <stdio.h>
//全局变量
extern char *Tips ; //主题
#define PROID
"515377"
//产品ID
#define AUTH_INFO "123"
//鉴权信息
#define DEVID
"943294381" //设备ID
extern unsigned char esp8266_buf[128];
extern unsigned char esp8266_buf[128];
//==========================================================
// 函数名称: OneNet_DevLink
//
// 函数功能: 与onenet创建连接
//
// 入口参数: 无
//
// 返回参数: 1-成功 0-失败
//
// 说明:
与onenet平台建立连接
//==========================================================
_Bool OneNet_DevLink(char *Tip)
{
MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0};
//协议包
unsigned char *dataPtr;
_Bool status = 1;
printf("OneNet_DevLinkrnPROID: %s, AUIF: %s, DEVID:%srn", PROID, AUTH_INFO, DEVID);
if(MQTT_PacketConnect(PROID, AUTH_INFO, DEVID, 256, 0, MQTT_QOS_LEVEL0, NULL, NULL, 0, &mqttPacket) == 0)
{
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
//上传平台
dataPtr = ESP8266_GetIPD(250);
//等待平台响应
if(dataPtr != NULL)
{
if(MQTT_UnPacketRecv(dataPtr) == MQTT_PKT_CONNACK)
{
switch(MQTT_UnPacketConnectAck(dataPtr))
{
case 0:printf("Tips:%srn",Tip);
status = 0;
//LED2_ON;
//入网成功
;break;
case 1:printf("WARN: 连接失败:协议错误rn");break;
case 2:printf("WARN: 连接失败:非法的clientidrn");break;
case 3:printf("WARN: 连接失败:服务器失败rn");break;
case 4:printf("WARN: 连接失败:用户名或密码错误rn");break;
case 5:printf("WARN: 连接失败:非法链接(比如token非法)rn");break;
default:printf("ERR: 连接失败:未知错误rn");break;
}
}
}
MQTT_DeleteBuffer(&mqttPacket);
//删包
}
else
printf("WARN: MQTT_PacketConnect Failedrn");
return status;
}
u8 velue0 = 0;
u8 velue1 = 0;
unsigned char OneNet_FillBuf(char *buf)
{
char text[32];
memset(text, 0, sizeof(text));
strcpy(buf, ",;");
memset(text, 0, sizeof(text));
sprintf(text, "Blue_Led,%d;", velue0);
strcat(buf, text);
memset(text, 0, sizeof(text));
sprintf(text, "Yellow_Led,%d;", velue1);
strcat(buf, text);
return strlen(buf);
}
//==========================================================
// 函数名称: OneNet_SendData
//
// 函数功能: 上传数据到平台
//
// 入口参数: type:发送数据的格式
//
// 返回参数: 无
//
// 说明:
//==========================================================
void OneNet_SendData(char *Tip)
{
MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0};
//协议包
char buf[128];
short body_len = 0, i = 0;
//printf("Tips:%srn",Tip);
memset(buf, 0, sizeof(buf));
body_len = OneNet_FillBuf(buf);
//获取当前需要发送的数据流的总长度
if(body_len)
{
if(MQTT_PacketSaveData(DEVID, body_len, NULL, 5, &mqttPacket) == 0)
//封包
{
for(; i < body_len; i++)
mqttPacket._data[mqttPacket._len++] = buf[i];
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
//上传数据到平台
//printf("Send %d Bytesrn", mqttPacket._data);
MQTT_DeleteBuffer(&mqttPacket);
//删包
}
else
printf("WARN: EDP_NewBuffer Failedrn");
}
}
//==========================================================
// 函数名称: OneNet_Publish
//
// 函数功能: 发布消息
//
// 入口参数: topic:发布的主题
//
msg:消息内容
//
// 返回参数: 0-成功 1-需要重送
//
// 说明:
//==========================================================
_Bool OneNet_Publish(const char *topic, const char *msg)
{
//Delay_ms(100);
MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0};
//协议包
//printf( "Publish Topic: %s, Msg: %srn", topic, msg);
if(MQTT_PacketPublish(MQTT_PUBLISH_ID, topic, msg, strlen(msg), MQTT_QOS_LEVEL2, 0, 1, &mqttPacket) != 1)
{
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
//向平台发送订阅请求
MQTT_DeleteBuffer(&mqttPacket);
//删包
}
return 0;
}
//==========================================================
// 函数名称: OneNet_Subscribe
//
// 函数功能: 订阅
//
// 入口参数: topics:订阅的topic
//
topic_cnt:topic个数
//
// 返回参数: SEND_TYPE_OK-成功 SEND_TYPE_SUBSCRIBE-需要重发
//
// 说明:
//==========================================================
_Bool OneNet_Subscribe(const char *topics[], unsigned char topic_cnt)
{
unsigned char i = 0;
MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0};
//协议包
for(; i < topic_cnt; i++)
printf( "Subscribe Topic: %srn", topics[i]);
if(MQTT_PacketSubscribe(MQTT_SUBSCRIBE_ID, MQTT_QOS_LEVEL2, topics, topic_cnt, &mqttPacket) == 0)
{
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
//向平台发送订阅请求
MQTT_DeleteBuffer(&mqttPacket);
//删包
}
return 0;
}
//==========================================================
// 函数名称: OneNet_HeartBeat
//
// 函数功能: 心跳检测
//
// 入口参数: 无
//
// 返回参数: 无
//
// 说明:
//==========================================================
void OneNet_HeartBeat(void)
{
MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0};
unsigned char sCount = 3;
//---------------------------------------------步骤一:组包---------------------------------------------
if(MQTT_PacketPing(&mqttPacket))
return;
while(sCount--)
{
//---------------------------------------------步骤二:发送数据-----------------------------------------
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
//while(OneNet_DevLink(Tips))
---------------------------------------------步骤三:解析返回数据-------------------------------------
//
if(MQTT_UnPacketRecv(cmd) == MQTT_PKT_PINGRESP)
//
{
//
printf( "Tips: HeartBeat OKrn");
//
//
break;
//
}
//
else
//
{
//
//ESP8266_Init();
//初始化ESP8266
//
printf("Check Devicern");
//
}
Delay_ms(10);
}
//---------------------------------------------步骤四:删包---------------------------------------------
MQTT_DeleteBuffer(&mqttPacket);
}
/**
* @brief
检测字符长度并提取透传
* @param
无
* @retval 无
*/
char my_strlen(int8 **payload)//?const?????
{
static char cmd[200];
int i=0;
while(**payload)
{
cmd[i] = **payload;
i++;
payload++;
printf("%dn",cmd[i]);
}
printf("%cn",cmd[100]);
printf("%c%c%c%cn",cmd[111],cmd[112],cmd[113],cmd[114]);
return cmd[11];
}
//==========================================================
// 函数名称: OneNet_RevPro
//
// 函数功能: 平台返回数据检测
//
// 入口参数: dataPtr:平台返回的数据
//
// 返回参数: 无
//
// 说明:
//==========================================================
void OneNet_RevPro(unsigned char *cmd)
{
MQTT_PACKET_STRUCTURE mqttPacket = {NULL, 0, 0, 0};
//协议包
char *req_payload = NULL;
char *cmdid_topic = NULL;
unsigned short topic_len = 0;
unsigned short req_len = 0;
unsigned char type = 0;
unsigned char qos = 0;
static unsigned short pkt_id = 0;
short result = 0;
char *dataPtr = NULL;
char numBuf[10];
int num = 0;
uint16_t ID = 0;
type = MQTT_UnPacketRecv(cmd);
switch(type)
{
case MQTT_PKT_CMD:
//命令下发
result = MQTT_UnPacketCmd(cmd, &cmdid_topic, &req_payload, &req_len); //解出topic和消息体
if(result == 0)
{
printf( "cmdid: %s, req: %s, req_len: %drn", cmdid_topic, req_payload, req_len);
if(MQTT_PacketCmdResp(cmdid_topic, req_payload, &mqttPacket) == 0) //命令回复组包
{
printf( "Tips: Send CmdResprn");
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
//回复命令
MQTT_DeleteBuffer(&mqttPacket);
//删包
}
}
break;
case MQTT_PKT_PUBLISH:
//接收的Publish消息
result = MQTT_UnPacketPublish(cmd, &cmdid_topic, &topic_len, &req_payload, &req_len, &qos, &pkt_id);
if(result == 0)
{
printf( "topic: %s, topic_len: %d, payload: %s, payload_len: %drn",
cmdid_topic, topic_len, req_payload, req_len);
switch(qos)
{
case 1:
//收到publish的qos为1,设备需要回复Ack
if(MQTT_PacketPublishAck(pkt_id, &mqttPacket) == 0)
{
printf( "Tips: Send PublishAckrn");
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
MQTT_DeleteBuffer(&mqttPacket);
}
break;
case 2:
//收到publish的qos为2,设备先回复Rec
//平台回复Rel,设备再回复Comp
if(MQTT_PacketPublishRec(pkt_id, &mqttPacket) == 0)
{
printf( "Tips: Send PublishRecrn");
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
MQTT_DeleteBuffer(&mqttPacket);
}
break;
default:
break;
}
}
break;
case MQTT_PKT_PUBACK:
//发送Publish消息,平台回复的Ack
if(MQTT_UnPacketPublishAck(cmd) == 0)
printf( "Tips: MQTT Publish Send OKrn");
break;
case MQTT_PKT_PUBREC:
//发送Publish消息,平台回复的Rec,设备需回复Rel消息
if(MQTT_UnPacketPublishRec(cmd) == 0)
{
printf( "Tips: Rev PublishRecrn");
if(MQTT_PacketPublishRel(MQTT_PUBLISH_ID, &mqttPacket) == 0)
{
printf( "Tips: Send PublishRelrn");
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
MQTT_DeleteBuffer(&mqttPacket);
}
}
break;
case MQTT_PKT_PUBREL:
//收到Publish消息,设备回复Rec后,平台回复的Rel,设备需再回复Comp
if(MQTT_UnPacketPublishRel(cmd, pkt_id) == 0)
{
printf( "Tips: Rev PublishRelrn");
if(MQTT_PacketPublishComp(MQTT_PUBLISH_ID, &mqttPacket) == 0)
{
printf( "Tips: Send PublishComprn");
ESP8266_SendData(mqttPacket._data, mqttPacket._len);
MQTT_DeleteBuffer(&mqttPacket);
}
}
break;
case MQTT_PKT_PUBCOMP:
//发送Publish消息,平台返回Rec,设备回复Rel,平台再返回的Comp
if(MQTT_UnPacketPublishComp(cmd) == 0)
{
printf( "Tips: Rev PublishComprn");
}
break;
case MQTT_PKT_SUBACK:
//发送Subscribe消息的Ack
if(MQTT_UnPacketSubscribe(cmd) == 0)
printf( "Tips: MQTT Subscribe OKrn");
else
printf( "Tips: MQTT Subscribe Errrn");
break;
case MQTT_PKT_UNSUBACK:
//发送UnSubscribe消息的Ack
if(MQTT_UnPacketUnSubscribe(cmd) == 0)
printf( "Tips: MQTT UnSubscribe OKrn");
else
printf( "Tips: MQTT UnSubscribe Errrn");
break;
default:
result = -1;
break;
}
ESP8266_Clear();
//清空缓存
if(result == -1)
return;
dataPtr = strchr(req_payload, ':');
//搜索':'
if(dataPtr != NULL && result != -1)
//如果找到了
{
dataPtr++;
printf("%s",Tips);
while(*dataPtr >= '0' && *dataPtr <= '9')
//判断是否是下发的命令控制数据
{
numBuf[num++] = *dataPtr++;
}
numBuf[num] = 0;
num = atoi((const char *)numBuf);
//转为数值形式
printf( "num:%drn",num);
if(strstr((char *)req_payload, "LED"))
//搜索"redled"
{
//LED
//Mqtt_LED(Tips,num);
}
else if(strstr((char *)req_payload, "TOUCH"))
{
//指纹
dataPtr = strchr(req_payload, '-');
//搜索'-'
if(dataPtr != NULL && result != -1)
//如果找到了
{
dataPtr++;
while(*dataPtr >= '0' && *dataPtr <= '9')
//判断是否是下发的命令控制数据
{
numBuf[ID++] = *dataPtr++;
}
numBuf[ID] = 0;
ID = atoi((const char *)numBuf);
//转为数值形式
printf("ID:%drn",ID);
//Mqtt_Task( Tips, num, ID);
}else{
//Mqtt_Task(Tips, num, ID);
}
}
else if(strstr((char *)req_payload, "DOOR"))
{
//步进电机
//Door(Tips,num);
}
}
if(type == MQTT_PKT_CMD || type == MQTT_PKT_PUBLISH)
{
MQTT_FreeBuffer(cmdid_topic);
MQTT_FreeBuffer(req_payload);
}
}
最后MQTT协议使用引用他人
3.5.MqttKit.h
#ifndef _MQTTKIT_H_
#define _MQTTKIT_H_
#include "Common.h"
//=============================配置==============================
//===========可以提供RTOS的内存管理方案,也可以使用C库的=========
//RTOS
#include <stdlib.h>
#define MQTT_MallocBuffer malloc
#define MQTT_FreeBuffer
free
//==========================================================
#define MOSQ_MSB(A)
(uint8)((A & 0xFF00) >> 8)
#define MOSQ_LSB(A)
(uint8)(A & 0x00FF)
/*--------------------------------内存分配方案标志--------------------------------*/
#define MEM_FLAG_NULL
0
#define MEM_FLAG_ALLOC
1
#define MEM_FLAG_STATIC
2
typedef struct Buffer
{
uint8 *_data;
//协议数据
uint32 _len;
//写入的数据长度
uint32 _size;
//缓存总大小
uint8 _memFlag; //内存使用的方案:0-未分配 1-使用的动态分配
2-使用的固定内存
} MQTT_PACKET_STRUCTURE;
/*--------------------------------固定头部消息类型--------------------------------*/
enum MqttPacketType
{
MQTT_PKT_CONNECT = 1, /**< 连接请求数据包 */
MQTT_PKT_CONNACK,
/**< 连接确认数据包 */
MQTT_PKT_PUBLISH,
/**< 发布数据数据包 */
MQTT_PKT_PUBACK,
/**< 发布确认数据包 */
MQTT_PKT_PUBREC,
/**< 发布数据已接收数据包,Qos 2时,回复MQTT_PKT_PUBLISH */
MQTT_PKT_PUBREL,
/**< 发布数据释放数据包, Qos 2时,回复MQTT_PKT_PUBREC */
MQTT_PKT_PUBCOMP,
/**< 发布完成数据包, Qos 2时,回复MQTT_PKT_PUBREL */
MQTT_PKT_SUBSCRIBE,
/**< 订阅数据包 */
MQTT_PKT_SUBACK,
/**< 订阅确认数据包 */
MQTT_PKT_UNSUBSCRIBE, /**< 取消订阅数据包 */
MQTT_PKT_UNSUBACK,
/**< 取消订阅确认数据包 */
MQTT_PKT_PINGREQ,
/**< ping 数据包 */
MQTT_PKT_PINGRESP,
/**< ping 响应数据包 */
MQTT_PKT_DISCONNECT,
/**< 断开连接数据包 */
//新增
MQTT_PKT_CMD
/**< 命令下发数据包 */
};
/*--------------------------------MQTT QOS等级--------------------------------*/
enum MqttQosLevel
{
MQTT_QOS_LEVEL0,
/**< 最多发送一次 */
MQTT_QOS_LEVEL1,
/**< 最少发送一次
*/
MQTT_QOS_LEVEL2
/**< 只发送一次 */
};
/*--------------------------------MQTT 连接请求标志位,内部使用--------------------------------*/
enum MqttConnectFlag
{
MQTT_CONNECT_CLEAN_SESSION
= 0x02,
MQTT_CONNECT_WILL_FLAG
= 0x04,
MQTT_CONNECT_WILL_QOS0
= 0x00,
MQTT_CONNECT_WILL_QOS1
= 0x08,
MQTT_CONNECT_WILL_QOS2
= 0x10,
MQTT_CONNECT_WILL_RETAIN
= 0x20,
MQTT_CONNECT_PASSORD
= 0x40,
MQTT_CONNECT_USER_NAME
= 0x80
};
/*--------------------------------消息的packet ID,可自定义--------------------------------*/
#define MQTT_PUBLISH_ID
10
#define MQTT_SUBSCRIBE_ID
20
#define MQTT_UNSUBSCRIBE_ID
30
/*--------------------------------删包--------------------------------*/
void MQTT_DeleteBuffer(MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------解包--------------------------------*/
uint8 MQTT_UnPacketRecv(uint8 *dataPtr);
/*--------------------------------登录组包--------------------------------*/
uint8 MQTT_PacketConnect(const int8 *user, const int8 *password, const int8 *devid,
uint16 cTime, uint1 clean_session, uint1 qos,
const int8 *will_topic, const int8 *will_msg, int32 will_retain,
MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------断开连接组包--------------------------------*/
uint1 MQTT_PacketDisConnect(MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------连接响应解包--------------------------------*/
uint8 MQTT_UnPacketConnectAck(uint8 *rev_data);
/*--------------------------------数据点上传组包--------------------------------*/
uint1 MQTT_PacketSaveData(const int8 *devid, int16 send_len, int8 *type_bin_head, uint8 type, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------二进制文件上传组包--------------------------------*/
uint1 MQTT_PacketSaveBinData(const int8 *name, int16 file_len, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------命令下发解包--------------------------------*/
uint8 MQTT_UnPacketCmd(uint8 *rev_data, int8 **cmdid, int8 **req, uint16 *req_len);
/*--------------------------------命令回复组包--------------------------------*/
uint1 MQTT_PacketCmdResp(const int8 *cmdid, const int8 *req, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------订阅主题组包--------------------------------*/
uint8 MQTT_PacketSubscribe(uint16 pkt_id, enum MqttQosLevel qos, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------订阅主题回复解包--------------------------------*/
uint8 MQTT_UnPacketSubscribe(uint8 *rev_data);
/*--------------------------------取消订阅组包--------------------------------*/
uint8 MQTT_PacketUnSubscribe(uint16 pkt_id, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------取消订阅回复解包--------------------------------*/
uint1 MQTT_UnPacketUnSubscribe(uint8 *rev_data);
/*--------------------------------发布主题组包--------------------------------*/
uint8 MQTT_PacketPublish(uint16 pkt_id, const int8 *topic,
const int8 *payload, uint32 payload_len,
enum MqttQosLevel qos, int32 retain, int32 own,
MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------发布消息回复解包--------------------------------*/
uint8 MQTT_UnPacketPublish(uint8 *rev_data, int8 **topic, uint16 *topic_len, int8 **payload, uint16 *payload_len, uint8 *qos, uint16 *pkt_id);
/*--------------------------------发布消息的Ack组包--------------------------------*/
uint1 MQTT_PacketPublishAck(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------发布消息的Ack解包--------------------------------*/
uint1 MQTT_UnPacketPublishAck(uint8 *rev_data);
/*--------------------------------发布消息的Rec组包--------------------------------*/
uint1 MQTT_PacketPublishRec(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------发布消息的Rec解包--------------------------------*/
uint1 MQTT_UnPacketPublishRec(uint8 *rev_data);
/*--------------------------------发布消息的Rel组包--------------------------------*/
uint1 MQTT_PacketPublishRel(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------发布消息的Rel解包--------------------------------*/
uint1 MQTT_UnPacketPublishRel(uint8 *rev_data, uint16 pkt_id);
/*--------------------------------发布消息的Comp组包--------------------------------*/
uint1 MQTT_PacketPublishComp(uint16 pkt_id, MQTT_PACKET_STRUCTURE *mqttPacket);
/*--------------------------------发布消息的Comp解包--------------------------------*/
uint1 MQTT_UnPacketPublishComp(uint8 *rev_data);
/*--------------------------------心跳请求组包--------------------------------*/
uint1 MQTT_PacketPing(MQTT_PACKET_STRUCTURE *mqttPacket);
#endif
3.6.MqttKit.c
/**
************************************************************
************************************************************
************************************************************
* 文件名:
MqttKit.c
*
* 作者:
张继瑞
*
* 日期:
2018-04-27
*
* 版本:
V1.6
*
* 说明:
MQTT协议
*
* 修改记录: V1.1:解决MQTT_PacketSubscribe订阅不为2个topic
*
个数时协议错误的bug
*
V1.2:修复MQTT_PacketCmdResp的bug
*
V1.3:将strncpy替换为memcpy,解决潜在bug
*
V1.4:修复 MQTT_PacketPublishAck
*
MQTT_PacketPublishRel
*
函数封包错误的bug
*
V1.5:增加 MQTT_UnPacketCmd
*
MQTT_UnPacketPublish
*
接口对消息内容长度的提取参数
*
V1.6:增加二进制文件上传接口
************************************************************
************************************************************
************************************************************
**/
//协议头文件
#include "MqttKit.h"
//C库
#include <string.h>
#include <stdio.h>
#define CMD_TOPIC_PREFIX
"$creq"
//==========================================================
// 函数名称: EDP_NewBuffer
//
// 函数功能: 申请内存
//
// 入口参数: edpPacket:包结构体
//
size:大小
//
// 返回参数: 无
//
// 说明:
1.可使用动态分配来分配内存
//
2.可使用局部或全局数组来指定内存
//==========================================================
void MQTT_NewBuffer(MQTT_PACKET_STRUCTURE *mqttPacket, uint32 size)
{
uint32 i = 0;
if(mqttPacket->_data == NULL)
{
mqttPacket->_memFlag = MEM_FLAG_ALLOC;
mqttPacket->_data = (uint8 *)MQTT_MallocBuffer(size);
if(mqttPacket->_data != NULL)
{
mqttPacket->_len = 0;
mqttPacket->_size = size;
for(; i < mqttPacket->_size; i++)
mqttPacket->_data[i] = 0;
}
}
else
{
mqttPacket->_memFlag = MEM_FLAG_STATIC;
for(; i < mqttPacket->_size; i++)
mqttPacket->_data[i] = 0;
mqttPacket->_len = 0;
if(mqttPacket->_size < size)
mqttPacket->_data = NULL;
}
}
//==========================================================
// 函数名称: MQTT_DeleteBuffer
//
// 函数功能: 释放数据内存
//
// 入口参数: edpPacket:包结构体
//
// 返回参数: 无
//
// 说明:
//==========================================================
void MQTT_DeleteBuffer(MQTT_PACKET_STRUCTURE *mqttPacket)
{
if(mqttPacket->_memFlag == MEM_FLAG_ALLOC)
MQTT_FreeBuffer(mqttPacket->_data);
mqttPacket->_data = NULL;
mqttPacket->_len = 0;
mqttPacket->_size = 0;
mqttPacket->_memFlag = MEM_FLAG_NULL;
}
int32 MQTT_DumpLength(size_t len, uint8 *buf)
{
int32 i = 0;
for(i = 1; i <= 4; ++i)
{
*buf = len % 128;
len >>= 7;
if(len > 0)
{
*buf |= 128;
++buf;
}
else
{
return i;
}
}
return -1;
}
int32 MQTT_ReadLength(const uint8 *stream, int32 size, uint32 *len)
{
int32 i;
const uint8 *in = stream;
uint32 multiplier = 1;
*len = 0;
for(i = 0; i < size; ++i)
{
*len += (in[i] & 0x7f) * multiplier;
if(!(in[i] & 0x80))
{
return i + 1;
}
multiplier <<= 7;
if(multiplier >= 2097152)
//128 * *128 * *128
{
return -2;
// error, out of range
}
}
return -1;
// not complete
}
//==========================================================
// 函数名称: MQTT_UnPacketRecv
//
// 函数功能: MQTT数据接收类型判断
//
// 入口参数: dataPtr:接收的数据指针
//
// 返回参数: 0-成功
其他-失败原因
//
// 说明:
//==========================================================
uint8 MQTT_UnPacketRecv(uint8 *dataPtr)
{
uint8 status = 255;
uint8 type = dataPtr[0] >> 4;
//类型检查
if(type < 1 || type > 14)
return status;
if(type == MQTT_PKT_PUBLISH)
{
uint8 *msgPtr;
uint32 remain_len = 0;
msgPtr = dataPtr + MQTT_ReadLength(dataPtr + 1, 4, &remain_len) + 1;
if(remain_len < 2 || dataPtr[0] & 0x01)
//retain
return 255;
if(remain_len < ((uint16)msgPtr[0] << 8 | msgPtr[1]) + 2)
return 255;
if(strstr((int8 *)msgPtr + 2, CMD_TOPIC_PREFIX) != NULL) //如果是命令下发
status = MQTT_PKT_CMD;
else
status = MQTT_PKT_PUBLISH;
}
else
status = type;
return status;
}
//==========================================================
// 函数名称: MQTT_PacketConnect
//
// 函数功能: 连接消息组包
//
// 入口参数: user:用户名:产品ID
//
password:密码:鉴权信息或apikey
//
devid:设备ID
//
cTime:连接保持时间
//
clean_session:离线消息清除标志
//
qos:重发标志
//
will_topic:异常离线topic
//
will_msg:异常离线消息
//
will_retain:消息推送标志
//
mqttPacket:包指针
//
// 返回参数: 0-成功
其他-失败
//
// 说明:
//==========================================================
uint8 MQTT_PacketConnect(const int8 *user, const int8 *password, const int8 *devid,
uint16 cTime, uint1 clean_session, uint1 qos,
const int8 *will_topic, const int8 *will_msg, int32 will_retain,
MQTT_PACKET_STRUCTURE *mqttPacket)
{
uint8 flags = 0;
uint8 will_topic_len = 0;
uint16 total_len = 15;
int16 len = 0, devid_len = strlen(devid);
if(!devid)
return 1;
total_len += devid_len + 2;
//断线后,是否清理离线消息:1-清理 0-不清理--------------------------------------------
if(clean_session)
{
flags |= MQTT_CONNECT_CLEAN_SESSION;
}
//异常掉线情况下,服务器发布的topic------------------------------------------------------
if(will_topic)
{
flags |= MQTT_CONNECT_WILL_FLAG;
will_topic_len = strlen(will_topic);
total_len += 4 + will_topic_len + strlen(will_msg);
}
//qos级别--主要用于PUBLISH(发布态)消息的,保证消息传递的次数-----------------------------
switch((unsigned char)qos)
{
case MQTT_QOS_LEVEL0:
flags |= MQTT_CONNECT_WILL_QOS0;
//最多一次
break;
case MQTT_QOS_LEVEL1:
flags |= (MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_WILL_QOS1); //最少一次
break;
case MQTT_QOS_LEVEL2:
flags |= (MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_WILL_QOS2); //只有一次
break;
default:
return 2;
}
//主要用于PUBLISH(发布态)的消息,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它。如果不设那么推送至当前订阅的就释放了
if(will_retain)
{
flags |= (MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_WILL_RETAIN);
}
//账号为空 密码为空---------------------------------------------------------------------
if(!user || !password)
{
return 3;
}
flags |= MQTT_CONNECT_USER_NAME | MQTT_CONNECT_PASSORD;
total_len += strlen(user) + strlen(password) + 4;
//分配内存-----------------------------------------------------------------------------
MQTT_NewBuffer(mqttPacket, total_len);
if(mqttPacket->_data == NULL)
return 4;
memset(mqttPacket->_data, 0, total_len);
/*************************************固定头部***********************************************/
//固定头部----------------------连接请求类型---------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_CONNECT << 4;
//固定头部----------------------剩余长度值-----------------------------------------------
len = MQTT_DumpLength(total_len - 5, mqttPacket->_data + mqttPacket->_len);
if(len < 0)
{
MQTT_DeleteBuffer(mqttPacket);
return 5;
}
else
mqttPacket->_len += len;
/*************************************可变头部***********************************************/
//可变头部----------------------协议名长度 和 协议名--------------------------------------
mqttPacket->_data[mqttPacket->_len++] = 0;
mqttPacket->_data[mqttPacket->_len++] = 4;
mqttPacket->_data[mqttPacket->_len++] = 'M';
mqttPacket->_data[mqttPacket->_len++] = 'Q';
mqttPacket->_data[mqttPacket->_len++] = 'T';
mqttPacket->_data[mqttPacket->_len++] = 'T';
//可变头部----------------------protocol level 4-----------------------------------------
mqttPacket->_data[mqttPacket->_len++] = 4;
//可变头部----------------------连接标志(该函数开头处理的数据)-----------------------------
mqttPacket->_data[mqttPacket->_len++] = flags;
//可变头部----------------------保持连接的时间(秒)----------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(cTime);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(cTime);
/*************************************消息体************************************************/
//消息体----------------------------devid长度、devid-------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(devid_len);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(devid_len);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, devid, devid_len);
mqttPacket->_len += devid_len;
//消息体----------------------------will_flag 和 will_msg---------------------------------
if(flags & MQTT_CONNECT_WILL_FLAG)
{
unsigned short mLen = 0;
if(!will_msg)
will_msg = "";
mLen = strlen(will_topic);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(mLen);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(mLen);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, will_topic, mLen);
mqttPacket->_len += mLen;
mLen = strlen(will_msg);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(mLen);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(mLen);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, will_msg, mLen);
mqttPacket->_len += mLen;
}
//消息体----------------------------use---------------------------------------------------
if(flags & MQTT_CONNECT_USER_NAME)
{
unsigned short user_len = strlen(user);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(user_len);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(user_len);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, user, user_len);
mqttPacket->_len += user_len;
}
//消息体----------------------------password----------------------------------------------
if(flags & MQTT_CONNECT_PASSORD)
{
unsigned short psw_len = strlen(password);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(psw_len);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(psw_len);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, password, psw_len);
mqttPacket->_len += psw_len;
}
return 0;
}
//==========================================================
// 函数名称: MQTT_PacketDisConnect
//
// 函数功能: 断开连接消息组包
//
// 入口参数: mqttPacket:包指针
//
// 返回参数: 0-成功
1-失败
//
// 说明:
//==========================================================
uint1 MQTT_PacketDisConnect(MQTT_PACKET_STRUCTURE *mqttPacket)
{
MQTT_NewBuffer(mqttPacket, 2);
if(mqttPacket->_data == NULL)
return 1;
/*************************************固定头部***********************************************/
//固定头部----------------------头部消息-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_DISCONNECT << 4;
//固定头部----------------------剩余长度值-----------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = 0;
return 0;
}
//==========================================================
// 函数名称: MQTT_UnPacketConnectAck
//
// 函数功能: 连接消息解包
//
// 入口参数: rev_data:接收的数据
//
// 返回参数: 1、255-失败
其他-平台的返回码
//
// 说明:
//==========================================================
uint8 MQTT_UnPacketConnectAck(uint8 *rev_data)
{
if(rev_data[1] != 2)
return 1;
if(rev_data[2] == 0 || rev_data[2] == 1)
return rev_data[3];
else
return 255;
}
//==========================================================
// 函数名称: MQTT_PacketSaveData
//
// 函数功能: 数据点上传组包
//
// 入口参数: devid:设备ID(可为空)
//
send_buf:json缓存buf
//
send_len:json总长
//
type_bin_head:bin文件的消息头
//
type:类型
//
// 返回参数: 0-成功
1-失败
//
// 说明:
//==========================================================
uint1 MQTT_PacketSaveData(const int8 *devid, int16 send_len, int8 *type_bin_head, uint8 type, MQTT_PACKET_STRUCTURE *mqttPacket)
{
if(MQTT_PacketPublish(MQTT_PUBLISH_ID, "$dp", NULL, send_len + 3, MQTT_QOS_LEVEL1, 0, 1, mqttPacket) == 0)
{
mqttPacket->_data[mqttPacket->_len++] = type;
//类型
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(send_len);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(send_len);
}
else
return 1;
return 0;
}
//==========================================================
// 函数名称: MQTT_PacketSaveBinData
//
// 函数功能: 为禁止文件上传组包
//
// 入口参数: name:数据流名字
//
file_len:文件长度
//
mqttPacket:包指针
//
// 返回参数: 0-成功
1-失败
//
// 说明:
//==========================================================
uint1 MQTT_PacketSaveBinData(const int8 *name, int16 file_len, MQTT_PACKET_STRUCTURE *mqttPacket)
{
uint1 result = 1;
int8 *bin_head = NULL;
uint8 bin_head_len = 0;
int8 *payload = NULL;
int32 payload_size = 0;
bin_head = (int8 *)MQTT_MallocBuffer(13 + strlen(name));
if(bin_head == NULL)
return result;
sprintf(bin_head, "{"ds_id":"%s"}", name);
bin_head_len = strlen(bin_head);
payload_size = 7 + bin_head_len + file_len;
payload = (int8 *)MQTT_MallocBuffer(payload_size - file_len);
if(payload == NULL)
{
MQTT_FreeBuffer(bin_head);
return result;
}
payload[0] = 2;
//类型
payload[1] = MOSQ_MSB(bin_head_len);
payload[2] = MOSQ_LSB(bin_head_len);
memcpy(payload + 3, bin_head, bin_head_len);
payload[bin_head_len + 3] = (file_len >> 24) & 0xFF;
payload[bin_head_len + 4] = (file_len >> 16) & 0xFF;
payload[bin_head_len + 5] = (file_len >> 8) & 0xFF;
payload[bin_head_len + 6] = file_len & 0xFF;
if(MQTT_PacketPublish(MQTT_PUBLISH_ID, "$dp", payload, payload_size, MQTT_QOS_LEVEL1, 0, 1, mqttPacket) == 0)
result = 0;
MQTT_FreeBuffer(bin_head);
MQTT_FreeBuffer(payload);
return result;
}
//==========================================================
// 函数名称: MQTT_UnPacketCmd
//
// 函数功能: 命令下发解包
//
// 入口参数: rev_data:接收的数据指针
//
cmdid:cmdid-uuid
//
req:命令
//
// 返回参数: 0-成功
其他-失败原因
//
// 说明:
//==========================================================
uint8 MQTT_UnPacketCmd(uint8 *rev_data, int8 **cmdid, int8 **req, uint16 *req_len)
{
int8 *dataPtr = strchr((int8 *)rev_data + 6, '/'); //加6是跳过头信息
uint32 remain_len = 0;
if(dataPtr == NULL)
//未找到'/'
return 1;
dataPtr++;
//跳过'/'
MQTT_ReadLength(rev_data + 1, 4, &remain_len);
//读取剩余字节
*cmdid = (int8 *)MQTT_MallocBuffer(37);
//cmdid固定36字节,多分配一个结束符的位置
if(*cmdid == NULL)
return 2;
memset(*cmdid, 0, 37);
//全部清零
memcpy(*cmdid, (const int8 *)dataPtr, 36);
//复制cmdid
dataPtr += 36;
*req_len = remain_len - 44;
//命令长度 = 剩余长度(remain_len) - 2 - 5($creq) - 1() - cmdid长度
*req = (int8 *)MQTT_MallocBuffer(*req_len + 1);
//分配命令长度+1
if(*req == NULL)
{
MQTT_FreeBuffer(*cmdid);
return 3;
}
memset(*req, 0, *req_len + 1);
//清零
memcpy(*req, (const int8 *)dataPtr, *req_len);
//复制命令
return 0;
}
//==========================================================
// 函数名称: MQTT_PacketCmdResp
//
// 函数功能: 命令回复组包
//
// 入口参数: cmdid:cmdid
//
req:命令
//
mqttPacket:包指针
//
// 返回参数: 0-成功
1-失败
//
// 说明:
//==========================================================
uint1 MQTT_PacketCmdResp(const int8 *cmdid, const int8 *req, MQTT_PACKET_STRUCTURE *mqttPacket)
{
uint16 cmdid_len = strlen(cmdid);
uint16 req_len = strlen(req);
_Bool status = 0;
int8 *payload = MQTT_MallocBuffer(cmdid_len + 6);
if(payload == NULL)
return 1;
memset(payload, 0, cmdid_len + 6);
memcpy(payload, "$crsp/", 6);
strncat(payload, cmdid, cmdid_len);
if(MQTT_PacketPublish(MQTT_PUBLISH_ID, payload, req, strlen(req), MQTT_QOS_LEVEL0, 0, 1, mqttPacket) == 0)
status = 0;
else
status = 1;
MQTT_FreeBuffer(payload);
return status;
}
//==========================================================
// 函数名称: MQTT_PacketSubscribe
//
// 函数功能: Subscribe消息组包
//
// 入口参数: pkt_id:pkt_id
//
qos:消息重发次数
//
topics:订阅的消息
//
topics_cnt:订阅的消息个数
//
mqttPacket:包指针
//
// 返回参数: 0-成功
其他-失败
//
// 说明:
//==========================================================
uint8 MQTT_PacketSubscribe(uint16 pkt_id, enum MqttQosLevel qos, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket)
{
uint32 topic_len = 0, remain_len = 0;
int16 len = 0;
uint8 i = 0;
if(pkt_id == 0)
return 1;
//计算topic长度-------------------------------------------------------------------------
for(; i < topics_cnt; i++)
{
if(topics[i] == NULL)
return 2;
topic_len += strlen(topics[i]);
}
//2 bytes packet id + topic filter(2 bytes topic + topic length + 1 byte reserve)------
remain_len = 2 + 3 * topics_cnt + topic_len;
//分配内存------------------------------------------------------------------------------
MQTT_NewBuffer(mqttPacket, remain_len + 5);
if(mqttPacket->_data == NULL)
return 3;
/*************************************固定头部***********************************************/
//固定头部----------------------头部消息-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_SUBSCRIBE << 4 | 0x02;
//固定头部----------------------剩余长度值-----------------------------------------------
len = MQTT_DumpLength(remain_len, mqttPacket->_data + mqttPacket->_len);
if(len < 0)
{
MQTT_DeleteBuffer(mqttPacket);
return 4;
}
else
mqttPacket->_len += len;
/*************************************payload***********************************************/
//payload----------------------pkt_id---------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(pkt_id);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(pkt_id);
//payload----------------------topic_name-----------------------------------------------
for(i = 0; i < topics_cnt; i++)
{
topic_len = strlen(topics[i]);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(topic_len);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(topic_len);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, topics[i], topic_len);
mqttPacket->_len += topic_len;
mqttPacket->_data[mqttPacket->_len++] = qos & 0xFF;
}
return 0;
}
//==========================================================
// 函数名称: MQTT_UnPacketSubscrebe
//
// 函数功能: Subscribe的回复消息解包
//
// 入口参数: rev_data:接收到的信息
//
// 返回参数: 0-成功
其他-失败
//
// 说明:
//==========================================================
uint8 MQTT_UnPacketSubscribe(uint8 *rev_data)
{
uint8 result = 255;
if(rev_data[2] == MOSQ_MSB(MQTT_SUBSCRIBE_ID) && rev_data[3] == MOSQ_LSB(MQTT_SUBSCRIBE_ID))
{
switch(rev_data[4])
{
case 0x00:
case 0x01:
case 0x02:
//MQTT Subscribe OK
result = 0;
break;
case 0x80:
//MQTT Subscribe Failed
result = 1;
break;
default:
//MQTT Subscribe UnKnown Err
result = 2;
break;
}
}
return result;
}
//==========================================================
// 函数名称: MQTT_PacketUnSubscribe
//
// 函数功能: UnSubscribe消息组包
//
// 入口参数: pkt_id:pkt_id
//
qos:消息重发次数
//
topics:订阅的消息
//
topics_cnt:订阅的消息个数
//
mqttPacket:包指针
//
// 返回参数: 0-成功
其他-失败
//
// 说明:
//==========================================================
uint8 MQTT_PacketUnSubscribe(uint16 pkt_id, const int8 *topics[], uint8 topics_cnt, MQTT_PACKET_STRUCTURE *mqttPacket)
{
uint32 topic_len = 0, remain_len = 0;
int16 len = 0;
uint8 i = 0;
if(pkt_id == 0)
return 1;
//计算topic长度-------------------------------------------------------------------------
for(; i < topics_cnt; i++)
{
if(topics[i] == NULL)
return 2;
topic_len += strlen(topics[i]);
}
//2 bytes packet id, 2 bytes topic length + topic + 1 byte reserve---------------------
remain_len = 2 + (topics_cnt << 1) + topic_len;
//分配内存------------------------------------------------------------------------------
MQTT_NewBuffer(mqttPacket, remain_len + 5);
if(mqttPacket->_data == NULL)
return 3;
/*************************************固定头部***********************************************/
//固定头部----------------------头部消息-------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MQTT_PKT_UNSUBSCRIBE << 4 | 0x02;
//固定头部----------------------剩余长度值-----------------------------------------------
len = MQTT_DumpLength(remain_len, mqttPacket->_data + mqttPacket->_len);
if(len < 0)
{
MQTT_DeleteBuffer(mqttPacket);
return 4;
}
else
mqttPacket->_len += len;
/*************************************payload***********************************************/
//payload----------------------pkt_id---------------------------------------------------
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(pkt_id);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(pkt_id);
//payload----------------------topic_name-----------------------------------------------
for(i = 0; i < topics_cnt; i++)
{
topic_len = strlen(topics[i]);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_MSB(topic_len);
mqttPacket->_data[mqttPacket->_len++] = MOSQ_LSB(topic_len);
strncat((int8 *)mqttPacket->_data + mqttPacket->_len, topics[i], topic_len);
mqttPacket->_len += topic_len;
}
return 0;
}
//==========================================================
// 函数名称: MQTT_UnPacketUnSubscribe
//
// 函数功能: UnSubscribe的回复消息解包
//
// 入口参数: rev_data:接收到的信息
//
// 返回参数: 0-成功
其他-失败
//
// 说明:
//==========================================================
uint1 MQTT_UnPacketUnSubscribe(uint8 *rev_data)
{
uint1 result = 1;
if(rev_data[2] == MOSQ_MSB(MQTT_UNSUBSCRIBE_ID) && rev_data[3] == MOSQ_LSB(MQTT_UNSUBSCRIBE_ID))
{
result = 0;
}
return result;
}
//==========================================================
// 函数名称: MQTT_PacketPublish
//
// 函数功能: Pulish消息组包
//
// 入口参数: pkt_id:pkt_id
//
topic:发布的topic
//
payload:消息体
//
payload_len:消息体长度
//
qos:重发次数
//
retain:离线消息推送
//
own:
//
mqttPacket:包指针
//
// 返回参数: 0-成功
其他-失败
//
// 说明:
//==========================================================
uint8 MQTT_PacketPublish(uint16 pkt_id, const int8 *topic,
const int8 *payload, uint32 payload_len,
enum MqttQosLevel qos, int32 retain, int32 own,
MQTT_PACKET_STRUCTURE *mqttPacket)
{
uint32 total_len = 0, topic_len = 0;
uint32 data_len = 0;
int32 len = 0;
uint8 flags = 0;
//pkt_id检查----------------------------------------------------------------------------
if(pkt_id == 0)
return 1;
//$dp为系统上传数据点的指令--------------------------------------------------------------
for(topic_len = 0; topic[topic_len] != '