概述
使用STM32 W5500做MQTT Client,使得数据上传broker,并接收broker传来的消息,并支持断网/拔网线再插入网线能够重新连接broker这样的功能,需要具备以下条件:
1、STM32 W5500基础入网配置,使能PC电脑端可以PING通W5500。
2、STM32 W5500的TCP Client收发数据的回环测试没有问题。
3、了解MQTT协议。
关于MQTT的介绍,本文不做重点。需要了解的是MQTT协议是基于TCP协议之上封装的协议。
关于MQTT Client依赖的MQTT支持库函数,下载地址 《MQTT C语言库函数》
这些库函数是干嘛的?
MQTT协议在STM32 W5500中使用的前提,首先通过TCP连接到broker指定的IP和端口。
然后需要发送MQTT连接的指令,这个指令内容是通过 "MQTTConnectClient.c"文件中的
int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectData* options)
这个方法来实现组装的,返回值大于0表示组装后的数组有效内容长度,在通过W5500的send方法,发送给broker。
broker接收到Client端发来的MQTT连接请求后,会返回一组数据,判断是否连接成功,或者各种失败(协议版本错误,用户名密码错误等)。
MQTT Client如果想要接收到broker发来的消息,需要先订阅主题,订阅主题的指令内容是通过"MQTTSubscribeClient.c"文件中的
int MQTTSerialize_subscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, int count,
MQTTString topicFilters[], int requestedQoSs[])
这个方法来实现组装的,同样返回值大于0表示组装后的数组有效内容长度,在通过W5500的send方法,发送给broker。
以上两个举例都是组装指令内容。
那么接收到broker发来消息,如何解析?
"MQTTDeserializePublish.c"这个文件的
int MQTTDeserialize_publish(unsigned char* dup, int* qos, unsigned char* retained, unsigned short* packetid, MQTTString* topicName,
unsigned char** payload, int* payloadlen, unsigned char* buf, int buflen)
这个方法可以实现消息内容的解析。
总之,依赖的MQTT支持库函数几乎可以使我们不用在乎协议的具体内容,就可以实现MQTT Client的功能。
STM32 W5500 MQTT Client端,我通过枚举类型给它定义三种状态
enum MQTT_STATE {MQTT_INIT, MQTT_CONNOK, MQTT_SUBOK};
MQTT_INIT - 初始状态(MQTT未连接,未订阅,注意是MQTT的,而不是TCP连接了没有)
MQTT_CONNOK - MQTT连接成功(MQTT Client端发起MQTT连接,并接收到了broker返回连接成功)
MQTT_SUBOK - MQTT订阅成功(MQTT Client端想broker订阅消息,并受到了broker返回订阅成功)
这几种状态的关连,在程序开始执行时,MQTT Client端处于MQTT_INIT状态,或者程序执行一段时间后,MQTT PING指令发几次broker没有回复,认为MQTT Client端处于MQTT_INIT状态。
MQTT Client端处于MQTT_CONNOK 状态时可以发布数据到broker,但是无法接收来自broker的消息。
MQTT Client端处于MQTT_SUBOK 状态时可以发布数据到broker,也可以接收来自broker的消息。
如果TCP Client处于CLOSE的状态,那么MQTT Client端将处于MQTT_INIT 状态。
做好MQTT Client端的难点在于维系 TCP socket的状态与MQTT Client的状态的关系。
贴出我实现MQTT Client的c代码:
impl_mqtt.c
#ifndef __IMPL_MQTT_H
#define __IMPL_MQTT_H
#include "impl_mqtt.h"
#endif
int mqttstate = MQTT_INIT;
int cnt_ping_not_response = 0;
int cnt_sock_init = 0;
u8 buf_pub[1024];
u32 ping_timestamp, now_timestamp;
int func_tcp_sock_send(u8 sockno, u8 *buf_mqsend, u16 len_mqsend)
{
if(getSn_SR(sockno) == SOCK_ESTABLISHED)
{
return send(sockno, buf_mqsend, len_mqsend);
}
return -1;
}
int func_tcp_sock_read(u8 sockno, u8 *buf_mqrecv, u16 len_mqrecv)
{
if((getSn_SR(sockno) == SOCK_ESTABLISHED))
{
len_mqrecv = getSn_RX_RSR(sockno);
if(len_mqrecv > 0)
{
return recv(sockno, buf_mqrecv, len_mqrecv);
}
}
return -1;
}
void func_judge_timeout_ms(u32 *timespan)
{
delay_ms(1);
*timespan = *timespan + 1;
}
u8 func_judge_mqtt_recvmsg_package_type(u8 *buf_mqrecv, u16 len_mqrecv)
{
MQTTHeader header = {0};
if(len_mqrecv > 0)
{
header.byte = buf_mqrecv[0];
return header.bits.type;
}
return 0;
}
void func_mqtt_client_dealwith_recvmsg(u8 sockno, u8 *buf_mqrecv, u16 len_mqbuf, u16 len_mqrecv)
{
if(len_mqrecv > 0)
{
ping_timestamp = get_systick_timestamp();
// package type to deal
switch(func_judge_mqtt_recvmsg_package_type(buf_mqrecv, len_mqrecv))
{
case CONNACK:
break;
case PUBLISH://analysis msg
{
int rc;
u8 buf_recv[1024];
u8* payload;
int len_payload;
unsigned char retained, dup;
int qos;
unsigned short packetid;
MQTTString topicrecv;
MQTTString topicpub;
payload = buf_recv;
rc = MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicrecv, &payload, &len_payload, buf_mqrecv, len_mqrecv);
if(rc == 1)
{
//TODO ...
//TEST code
topicpub.cstring = (char*)"mytopic";
memset(buf_pub, 0, sizeof(buf_pub));
func_run_mqtt_publish(sockno, buf_pub, sizeof(buf_pub), topicpub, payload, len_payload);
}
}
break;
case PUBACK:
break;
case PUBREC://Qos2 msg receipt
case PUBREL://Qos2 msg receipt
case PUBCOMP://Qos2 msg receipt
{
unsigned char packettype, dup;
unsigned short packetid;
if (MQTTDeserialize_ack(&packettype, &dup, &packetid, buf_mqrecv, len_mqbuf) == 1)
{
memset(buf_mqrecv, 0, len_mqbuf);
len_mqrecv = MQTTSerialize_ack(buf_mqrecv, len_mqbuf, packettype, dup, packetid);
if(len_mqrecv > 0)
{
func_tcp_sock_send(sockno, buf_mqrecv, len_mqrecv);
}
}
}
break;
case SUBACK:
case UNSUBACK:
case PINGREQ:
case PINGRESP:
case DISCONNECT:
break;
default:
break;
}
}
}
void func_mqtt_client_connect_broker(int *state, u8 sockno, u8 *buf_mqsend, u16 len_mqsend, MQTTPacket_connectData *conn_mqtt)
{
u32 timespan;
int len_cont;
int res;
len_cont = MQTTSerialize_connect(buf_mqsend, len_mqsend, conn_mqtt);
if(len_cont > 0)
{
res = func_tcp_sock_send(sockno, buf_mqsend, len_cont);
if(res > 0)
{
timespan = 0;
memset(buf_mqsend, 0, len_mqsend);//reuse buffer
for(;;)
{
if((len_cont = func_tcp_sock_read(sockno, buf_mqsend, len_mqsend)) > 3)
{
if(func_judge_mqtt_recvmsg_package_type(buf_mqsend, len_cont) == CONNACK)
{
*state = MQTT_CONNOK;
ping_timestamp = get_systick_timestamp();
}
break;
}
func_judge_timeout_ms(×pan);
if(timespan > 500)
{
break;
}
}
}
}
}
void func_mqtt_client_ping_broker(int *state, u8 sockno, u8 *buf_mqsend, u16 len_mqsend)
{
u32 timespan;
int len_cont;
int res;
if(get_systick_timestamp() - ping_timestamp > 5*1000)
{
len_cont = MQTTSerialize_pingreq(buf_mqsend, len_mqsend);
if(len_cont > 0)
{
res = func_tcp_sock_send(sockno, buf_mqsend, len_cont);
if(res > 0)
{
timespan = 0;
memset(buf_mqsend, 0, len_mqsend);//reuse buffer
for(;;)
{
len_cont = func_tcp_sock_read(sockno, buf_mqsend, len_mqsend);
// pingrsp or others' published msg
if(len_cont > 0)
{
//recv pingrsp
ping_timestamp = get_systick_timestamp();
cnt_ping_not_response = 0;
// other type msg to deal with
func_mqtt_client_dealwith_recvmsg(sockno, buf_mqsend, len_mqsend, len_cont);
break;
}
func_judge_timeout_ms(×pan);
if(timespan > 10)
{
cnt_ping_not_response ++;
if(cnt_ping_not_response > 1)
{
*state = MQTT_INIT;
close(sockno);
cnt_ping_not_response = 0;
}
break;
}
}
}
else
{
cnt_ping_not_response ++;
if(cnt_ping_not_response > 2)
{
*state = MQTT_INIT;
close(sockno);
cnt_ping_not_response = 0;
}
}
}
}
}
int func_run_mqtt_publish(u8 sockno, u8 *buf_mqsend, u16 len_mqsend, MQTTString topicName, u8* payload, int payloadlen)
{
int len;
int rc;
if(mqttstate >= MQTT_CONNOK)
{
len = MQTTSerialize_publish(buf_mqsend, len_mqsend, 0, 0, 0, 0, topicName, payload, payloadlen);
if(len > 0)
{
memcpy(buf_pub, buf_mqsend, len);
rc = func_tcp_sock_send(sockno, buf_pub, len);
if(rc > 0)
{
ping_timestamp = get_systick_timestamp();
}
return rc;
}
}
return 0;
}
int func_mqtt_client_subtopic_from_broker(u8 sockno, u8 *buf_mqsend, u16 len_mqsend, int count,
MQTTString topicFilters[], int requestedQoSs[])
{
u32 timespan;
int len_cont;
int res;
if(mqttstate != MQTT_SUBOK)
{
len_cont = MQTTSerialize_subscribe(buf_mqsend, len_mqsend, 0, SUBSCRIBE, count, topicFilters, requestedQoSs);
if(len_cont > 0)
{
res = func_tcp_sock_send(sockno, buf_mqsend, len_cont);
if(res > 0)
{
timespan = 0;
memset(buf_mqsend, 0, len_mqsend);//reuse buffer
for(;;)
{
if((len_cont = func_tcp_sock_read(sockno, buf_mqsend, len_mqsend)) > 0 && func_judge_mqtt_recvmsg_package_type(buf_mqsend, len_cont) == SUBACK)//nowtime, ignore other type msg
{
mqttstate = MQTT_SUBOK;
ping_timestamp = get_systick_timestamp();
return 0;
}
func_judge_timeout_ms(×pan);
if(timespan > 500)
{
return -2;
}
}
}
}
}
return -1;
}
void func_mqtt_client_recvmsg_from_broker(u8 sockno, u8 *buf_mqsend, u16 len_mqsend)
{
int len_cont;
memset(buf_mqsend, 0, len_mqsend);
len_cont = func_tcp_sock_read(sockno, buf_mqsend, len_mqsend);
if(len_cont > 0)
{
func_mqtt_client_dealwith_recvmsg(sockno, buf_mqsend, len_mqsend, len_cont);
}
}
u8 func_run_mqtt_tcpsock(u8 sockno, u8 *broker_ip, u16 broker_port, u8 *buf_mqsend, u16 len_mqsend, MQTTPacket_connectData *conn_mqtt)
{
static u16 any_port = 50000;
u8 res;
switch(getSn_SR(sockno))
{
case SOCK_CLOSED:
{
close(sockno);
socket(sockno, Sn_MR_TCP, any_port++, 0x00);
cnt_sock_init++;
if(cnt_sock_init > 30)
{
cnt_sock_init = 0;
close(sockno);
mqttstate = MQTT_INIT;
}
if(any_port > 64000)
{
any_port =50000;
}
}
break;
case SOCK_INIT:
{
res = connect(sockno, broker_ip, broker_port);
if(res)
{
//mqtt connect request
mqttstate = func_run_mqtt_progress(mqttstate, sockno, buf_mqsend, len_mqsend, conn_mqtt);
}
else
{
if(cnt_ping_not_response > 0)
{
mqttstate = MQTT_INIT;
}
}
}
break;
case SOCK_ESTABLISHED:
{
//run mqtt progress
mqttstate = func_run_mqtt_progress(mqttstate, sockno, buf_mqsend, len_mqsend, conn_mqtt);
}
break;
case SOCK_CLOSE_WAIT:
{
mqttstate = MQTT_INIT;
close(sockno);
}
break;
default:
break;
}
return mqttstate;
}
u8 func_run_mqtt_progress(int state, u8 sockno, u8 *buf_mqsend, u16 len_mqsend, MQTTPacket_connectData *conn_mqtt)
{
switch(state)
{
case MQTT_INIT:
{
func_mqtt_client_connect_broker(&state, sockno, buf_mqsend, len_mqsend, conn_mqtt);
}
break;
case MQTT_CONNOK:
{
func_mqtt_client_ping_broker(&state, sockno, buf_mqsend, len_mqsend);
if(state > MQTT_INIT)
{
// func_mqtt_client_subtopic_from_broker(&state, sockno, buf_mqsend, len_mqsend);
}
}
break;
case MQTT_SUBOK:
{
func_mqtt_client_ping_broker(&state, sockno, buf_mqsend, len_mqsend);
if(state != MQTT_INIT)
{
func_mqtt_client_recvmsg_from_broker(sockno, buf_mqsend, len_mqsend);
}
}
break;
default:
break;
}
return state;
}
可能不是十分完美,但是一般工程上使用应该问题不大,我也测试了好久。
测试的主函数,是做了MQTT的回环测试,MQTT Client端连接到broker后,发起订阅主题,并一次性订阅多个主题,分别是字符串subtopic、subtopic2、subtopic3、subtopic4。当其他客户端连接到broker后,向这四个主题发布消息,STM32 W5500 MQTT Client端接收后,会向 mytopic发布一条消息。
#ifndef __STM32F10X_H
#define __STM32F10X_H
#include "stm32f10x.h"
#endif
#ifndef __Z_UTIL_TIME_H
#define __Z_UTIL_TIME_H
#include "z_util_time.h"
#endif
#ifndef __Z_HARDWARE_LED_H
#define __Z_HARDWARE_LED_H
#include "z_hardware_led.h"
#endif
#ifndef __Z_HARDWARE_SPI_H
#define __Z_HARDWARE_SPI_H
#include "z_hardware_spi.h"
#endif
#ifndef __W5500_H
#define __W5500_H
#include "w5500.h"
#endif
#ifndef __SOCKET_H
#define __SOCKET_H
#include "socket.h"
#endif
#ifndef __W5500_CONF_H
#define __W5500_CONF_H
#include "w5500_conf.h"
#endif
#ifndef __DHCP_H
#define __DHCP_H
#include "dhcp.h"
#endif
#ifndef __Z_HARDWARE_USART2_H
#define __Z_HARDWARE_USART2_H
#include "z_hardware_usart2.h"
#endif
#include "MQTTPacket.h"
#ifndef __IMPL_MQTT_H
#define __IMPL_MQTT_H
#include "impl_mqtt.h"
#endif
int main(void)
{
u32 dhcp_timestamp;
u8 ip_broker[] = {192, 168, 1, 127};
u16 port_broker = 1883;
u8 buf_mqtt_send[1024];
u8 mac[6]={0, };
DHCP_Get dhcp_get;
int mqtt_stat;
MQTTString sub_topic = MQTTString_initializer;
MQTTString sub_topic2 = MQTTString_initializer;
MQTTString sub_topic3 = MQTTString_initializer;
MQTTString sub_topic4 = MQTTString_initializer;
MQTTString sub_topics[4];
int nums_sub_topic_qoss[4] = {0, };
char stpc_str[64] = {'t', 'c'};
MQTTPacket_connectData conn_mqtt = MQTTPacket_connectData_initializer;
conn_mqtt.willFlag = 0;
conn_mqtt.MQTTVersion = 3;
conn_mqtt.clientID.cstring = (char*)"dev_abcdef";
conn_mqtt.username.cstring = (char*)"abcdef";
conn_mqtt.password.cstring = (char*)"123456";
conn_mqtt.keepAliveInterval = 60;
conn_mqtt.cleansession = 1;
systick_configuration();
init_led();
init_system_spi();
func_w5500_reset();
init_hardware_usart2_dma(9600);
getMacByLockCode(mac);
setSHAR(mac);
sysinit(txsize, rxsize);
setRTR(2000);
setRCR(3);
//DHCP
for(;func_dhcp_get_ip_sub_gw(1, mac, &dhcp_get, 500) != 0;);
if(func_dhcp_get_ip_sub_gw(1, mac, &dhcp_get, 500) == 0)
{
setSUBR(dhcp_get.sub);
setGAR(dhcp_get.gw);
setSIPR(dhcp_get.lip);
close(1);
}
dhcp_timestamp = get_systick_timestamp();
memcpy(stpc_str, (char*)"subtopic", strlen("subtopic"));
sub_topic.cstring = stpc_str;
sub_topics[0] = sub_topic;
sub_topic2.cstring = "subtopic2";
sub_topics[1] = sub_topic2;
sub_topic3.cstring = "subtopic3";
sub_topics[2] = sub_topic3;
sub_topic4.cstring = "subtopic4";
sub_topics[3] = sub_topic4;
for(;;)
{
if(get_systick_timestamp() - dhcp_timestamp > 59*1000)// 1 min dhcp
{
dhcp_timestamp = get_systick_timestamp();
if(func_dhcp_get_ip_sub_gw(1, mac, &dhcp_get, 500) == 0)
{
setSUBR(dhcp_get.sub);
setGAR(dhcp_get.gw);
setSIPR(dhcp_get.lip);
close(1);
}
}
mqtt_stat = func_run_mqtt_tcpsock(2, ip_broker, port_broker, buf_mqtt_send, sizeof(buf_mqtt_send), &conn_mqtt);
if(mqtt_stat >= MQTT_CONNOK)
{
if(mqtt_stat == MQTT_CONNOK)
{
memset(buf_mqtt_send, 0, sizeof(buf_mqtt_send));
func_mqtt_client_subtopic_from_broker(2, buf_mqtt_send, sizeof(buf_mqtt_send), 4, sub_topics, nums_sub_topic_qoss);
}
func_led1_toggle();
}
delay_ms(500);
}
}
电脑端使用MQTT.fx工具进行测试,测试效果
目前测试还比较稳定,支持热插拔网线,以及路由器断网后再次联网,MQTT Client仍可继续连接broker。
最后
以上就是体贴斑马为你收集整理的STM32 W5500 MQTT Client 发布订阅及断线重连的全部内容,希望文章能够帮你解决STM32 W5500 MQTT Client 发布订阅及断线重连所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复