我是靠谱客的博主 满意小海豚,最近开发中收集的这篇文章主要介绍潘多拉 IOT 开发板学习(RT-Thread)—— 实验19 MQTT 协议通信实验(学习笔记)实验功能代码剖析,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
本文代码参考 RT-Thread 官方 BSP
文章目录
- 实验功能
- 代码剖析
- rt_wlan_register_event_handler()
- mq_start()
- mqtt_sub_callback()
- mqtt_sub_default_callback()
- mqtt_connect_callback()
- mqtt_online_callback()
- mqtt_offline_callback()
- LOG_D()
实验功能
例程源码:(main函数)
该实验实现的功能:WiFi 初始化后创建 mqtt 客户端,然后开启 WiFi 自动连接(WiFi 底层代码本文不研究)。在网络连接上的情况下,mqtt 客户端会订阅一个主题,同时向主题发送和接收数据。
int main(void)
{
/* 注册 wlan 回调函数 */
rt_wlan_register_event_handler(RT_WLAN_EVT_READY, (void (*)(int, struct rt_wlan_buff *, void *))mq_start, RT_NULL);
/* 初始化自动连接功能 */
wlan_autoconnect_init();
/* 使能 wlan 自动连接 */
rt_wlan_config_autoreconnect(RT_TRUE);
}
代码剖析
rt_wlan_register_event_handler()
Wlan 事件回调注册函数,其实就是给 event_tabe[event]
的 handler() 和 parameter 成员赋值。
rt_err_t rt_wlan_register_event_handler(rt_wlan_event_t event, rt_wlan_event_handler handler, void *parameter)
{
rt_base_t level;
if (event >= RT_WLAN_EVT_MAX)
{
return RT_EINVAL;
}
RT_WLAN_LOG_D("%s is run event:%d", __FUNCTION__, event);
MGNT_LOCK();
/* Registering Callbacks */
level = rt_hw_interrupt_disable();
event_tab[event].handler = handler;
event_tab[event].parameter = parameter;
rt_hw_interrupt_enable(level);
MGNT_UNLOCK();
return RT_EOK;
}
mq_start()
mqtt 初始化函数,主要配置了一些参数和回调函数,最后调用 paho_mqtt_start() 启动 mqtt 客户端。
/* 创建与配置 mqtt 客户端 */
static void mq_start(void)
{
/* 初始 condata 参数 */
MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
static char cid[20] = {0};
static int is_started = 0;
if (is_started)
{
return;
}
/* 配置 MQTT 文本参数 */
{
client.isconnected = 0;
client.uri = MQTT_URI;
/* 生成随机客户端 ID */
rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());
rt_snprintf(sup_pub_topic, sizeof(sup_pub_topic), "%s%s", MQTT_PUBTOPIC, cid);
/* 配置连接参数 */
memcpy(&client.condata, &condata, sizeof(condata));
client.condata.clientID.cstring = cid;
client.condata.keepAliveInterval = 60;
client.condata.cleansession = 1;
client.condata.username.cstring = MQTT_USERNAME;
client.condata.password.cstring = MQTT_PASSWORD;
/* 配置 mqtt 参数 */
client.condata.willFlag = 0;
client.condata.will.qos = 1;
client.condata.will.retained = 0;
client.condata.will.topicName.cstring = sup_pub_topic;
client.buf_size = client.readbuf_size = 1024;
client.buf = malloc(client.buf_size);
client.readbuf = malloc(client.readbuf_size);
if (!(client.buf && client.readbuf))
{
LOG_E("no memory for MQTT client buffer!");
goto _exit;
}
/* 设置事件回调 */
client.connect_callback = mqtt_connect_callback;
client.online_callback = mqtt_online_callback;
client.offline_callback = mqtt_offline_callback;
/* 设置要订阅的 topic 和 topic 对应的回调函数 */
client.messageHandlers[0].topicFilter = sup_pub_topic;
client.messageHandlers[0].callback = mqtt_sub_callback;
client.messageHandlers[0].qos = QOS1;
/* 设置默认订阅回调函数 */
client.defaultMessageHandler = mqtt_sub_default_callback;
}
/* 启动 MQTT 客户端 */
LOG_D("Start mqtt client and subscribe topic:%s", sup_pub_topic);
paho_mqtt_start(&client);
is_started = 1;
_exit:
return;
}
paho_mqtt_start()
paho_mqtt 客户端启动函数,该函数创建了一个静态线程,线程启动后,会执行 paho_mqtt_thread() 函数,
int paho_mqtt_start(MQTTClient *client)
{
rt_err_t result;
rt_thread_t tid;
int stack_size = RT_PKG_MQTT_THREAD_STACK_SIZE;
int priority = RT_THREAD_PRIORITY_MAX / 3;
char *stack;
static int is_started = 0;
if (is_started)
{
LOG_D("paho mqtt has already started!");
return 0;
}
tid = rt_malloc(RT_ALIGN(sizeof(struct rt_thread), 8) + stack_size);
if (!tid)
{
LOG_E("no memory for thread: MQTT");
return -1;
}
stack = (char *)tid + RT_ALIGN(sizeof(struct rt_thread), 8);
result = rt_thread_init(tid,
"MQTT",
paho_mqtt_thread, client, // fun, parameter
stack, stack_size, // stack, size
priority, 2 //priority, tick
);
if (result == RT_EOK)
{
rt_thread_startup(tid);
is_started = 1;
}
return 0;
}
paho_mqtt_thread()
paho_mqtt 线程入口函数,里面会执行 mqtt 连接回调函数 connect_callback()、连接成功回调函数 online_callback() 和 连接断开回调函数 offline_callback(),这些函数都已经在 main.c 中定义。
static void paho_mqtt_thread(void *param)
{
MQTTClient *c = (MQTTClient *)param;
int i, rc, len;
int rc_t = 0;
/* create publish pipe. */
if (pipe(c->pub_pipe) != 0)
{
LOG_E("creat pipe err");
goto _mqtt_exit;
}
_mqtt_start:
if (c->connect_callback)
{
c->connect_callback(c);
}
rc = net_connect(c);
if (rc != 0)
{
LOG_E("Net connect error(%d)", rc);
goto _mqtt_restart;
}
rc = MQTTConnect(c);
if (rc != 0)
{
LOG_E("MQTT connect error(%d): %s", rc, MQTTSerialize_connack_string(rc));
goto _mqtt_restart;
}
LOG_I("MQTT server connect success");
for (i = 0; i < MAX_MESSAGE_HANDLERS; i++)
{
const char *topic = c->messageHandlers[i].topicFilter;
enum QoS qos = c->messageHandlers[i].qos;
if (topic == RT_NULL)
continue;
rc = MQTTSubscribe(c, topic, qos);
LOG_I("Subscribe #%d %s %s!", i, topic, (rc < 0) || (rc == 0x80) ? ("fail") : ("OK"));
if (rc != 0)
{
if (rc == 0x80)
{
LOG_E("QoS config err!");
}
goto _mqtt_disconnect;
}
}
if (c->online_callback)
{
c->online_callback(c);
}
c->tick_ping = rt_tick_get();
while (1)
{
int res;
rt_tick_t tick_now;
fd_set readset;
struct timeval timeout;
tick_now = rt_tick_get();
if (((tick_now - c->tick_ping) / RT_TICK_PER_SECOND) > (c->keepAliveInterval - 5))
{
timeout.tv_sec = 1;
//LOG_D("tick close to ping.");
}
else
{
timeout.tv_sec = c->keepAliveInterval - 10 - (tick_now - c->tick_ping) / RT_TICK_PER_SECOND;
//LOG_D("timeount for ping: %d", timeout.tv_sec);
}
timeout.tv_usec = 0;
FD_ZERO(&readset);
FD_SET(c->sock, &readset);
FD_SET(c->pub_pipe[0], &readset);
/* int select(maxfdp1, readset, writeset, exceptset, timeout); */
res = select(((c->pub_pipe[0] > c->sock) ? c->pub_pipe[0] : c->sock) + 1,
&readset, RT_NULL, RT_NULL, &timeout);
if (res == 0)
{
len = MQTTSerialize_pingreq(c->buf, c->buf_size);
rc = sendPacket(c, len);
if (rc != 0)
{
LOG_E("[%d] send ping rc: %d ", rt_tick_get(), rc);
goto _mqtt_disconnect;
}
/* wait Ping Response. */
timeout.tv_sec = 5;
timeout.tv_usec = 0;
FD_ZERO(&readset);
FD_SET(c->sock, &readset);
res = select(c->sock + 1, &readset, RT_NULL, RT_NULL, &timeout);
if (res <= 0)
{
LOG_E("[%d] wait Ping Response res: %d", rt_tick_get(), res);
goto _mqtt_disconnect;
}
} /* res == 0: timeount for ping. */
if (res < 0)
{
LOG_E("select res: %d", res);
goto _mqtt_disconnect;
}
if (FD_ISSET(c->sock, &readset))
{
//LOG_D("sock FD_ISSET");
rc_t = MQTT_cycle(c);
//LOG_D("sock FD_ISSET rc_t : %d", rc_t);
if (rc_t < 0) goto _mqtt_disconnect;
continue;
}
if (FD_ISSET(c->pub_pipe[0], &readset))
{
MQTTMessage *message;
MQTTString topic = MQTTString_initializer;
//LOG_D("pub_sock FD_ISSET");
len = read(c->pub_pipe[0], c->readbuf, c->readbuf_size);
if (len < sizeof(MQTTMessage))
{
c->readbuf[len] = '