我是靠谱客的博主 满意小海豚,最近开发中收集的这篇文章主要介绍潘多拉 IOT 开发板学习(RT-Thread)—— 实验19 MQTT 协议通信实验(学习笔记)实验功能代码剖析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

本文代码参考 RT-Thread 官方 BSP

文章目录

  • 实验功能
  • 代码剖析
    • rt_wlan_register_event_handler()
    • mq_start()
    • mqtt_sub_callback()
    • mqtt_sub_default_callback()
    • mqtt_connect_callback()
    • mqtt_online_callback()
    • mqtt_offline_callback()
    • LOG_D()

实验功能

例程源码:(main函数)

该实验实现的功能:WiFi 初始化后创建 mqtt 客户端,然后开启 WiFi 自动连接(WiFi 底层代码本文不研究)。在网络连接上的情况下,mqtt 客户端会订阅一个主题,同时向主题发送和接收数据。

int main(void)
{
    /* 注册 wlan 回调函数 */
    rt_wlan_register_event_handler(RT_WLAN_EVT_READY, (void (*)(int, struct rt_wlan_buff *, void *))mq_start, RT_NULL);
    /* 初始化自动连接功能 */
    wlan_autoconnect_init();
    /* 使能 wlan 自动连接 */
    rt_wlan_config_autoreconnect(RT_TRUE);
}

代码剖析

rt_wlan_register_event_handler()

Wlan 事件回调注册函数,其实就是给 event_tabe[event] 的 handler() 和 parameter 成员赋值。

rt_err_t rt_wlan_register_event_handler(rt_wlan_event_t event, rt_wlan_event_handler handler, void *parameter)
{
    rt_base_t level;

    if (event >= RT_WLAN_EVT_MAX)
    {
        return RT_EINVAL;
    }
    RT_WLAN_LOG_D("%s is run event:%d", __FUNCTION__, event);

    MGNT_LOCK();
    /* Registering Callbacks */
    level = rt_hw_interrupt_disable();
    event_tab[event].handler = handler;
    event_tab[event].parameter = parameter;
    rt_hw_interrupt_enable(level);
    MGNT_UNLOCK();
    return RT_EOK;
}

mq_start()

mqtt 初始化函数,主要配置了一些参数和回调函数,最后调用 paho_mqtt_start() 启动 mqtt 客户端。

/* 创建与配置 mqtt 客户端 */
static void mq_start(void)
{
    /* 初始 condata 参数 */
    MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
    static char cid[20] = {0};

    static int is_started = 0;
    if (is_started)
    {
        return;
    }
    /* 配置 MQTT 文本参数 */
    {
        client.isconnected = 0;
        client.uri = MQTT_URI;

        /* 生成随机客户端 ID */
        rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());
        rt_snprintf(sup_pub_topic, sizeof(sup_pub_topic), "%s%s", MQTT_PUBTOPIC, cid);
        /* 配置连接参数 */
        memcpy(&client.condata, &condata, sizeof(condata));
        client.condata.clientID.cstring = cid;
        client.condata.keepAliveInterval = 60;
        client.condata.cleansession = 1;
        client.condata.username.cstring = MQTT_USERNAME;
        client.condata.password.cstring = MQTT_PASSWORD;

        /* 配置 mqtt 参数 */
        client.condata.willFlag = 0;
        client.condata.will.qos = 1;
        client.condata.will.retained = 0;
        client.condata.will.topicName.cstring = sup_pub_topic;

        client.buf_size = client.readbuf_size = 1024;
        client.buf = malloc(client.buf_size);
        client.readbuf = malloc(client.readbuf_size);
        if (!(client.buf && client.readbuf))
        {
            LOG_E("no memory for MQTT client buffer!");
            goto _exit;
        }

        /* 设置事件回调 */
        client.connect_callback = mqtt_connect_callback;
        client.online_callback = mqtt_online_callback;
        client.offline_callback = mqtt_offline_callback;
        /* 设置要订阅的 topic 和 topic 对应的回调函数 */
        client.messageHandlers[0].topicFilter = sup_pub_topic;
        client.messageHandlers[0].callback = mqtt_sub_callback;
        client.messageHandlers[0].qos = QOS1;

        /* 设置默认订阅回调函数 */
        client.defaultMessageHandler = mqtt_sub_default_callback;
    }

    /* 启动 MQTT 客户端 */
    LOG_D("Start mqtt client and subscribe topic:%s", sup_pub_topic);
    paho_mqtt_start(&client);
    is_started = 1;

_exit:
    return;
}

paho_mqtt_start()

paho_mqtt 客户端启动函数,该函数创建了一个静态线程,线程启动后,会执行 paho_mqtt_thread() 函数,

int paho_mqtt_start(MQTTClient *client)
{
    rt_err_t result;
    rt_thread_t tid;
    int stack_size = RT_PKG_MQTT_THREAD_STACK_SIZE;
    int priority = RT_THREAD_PRIORITY_MAX / 3;
    char *stack;

    static int is_started = 0;
    if (is_started)
    {
        LOG_D("paho mqtt has already started!");
        return 0;
    }    

    tid = rt_malloc(RT_ALIGN(sizeof(struct rt_thread), 8) + stack_size);
    if (!tid)
    {
        LOG_E("no memory for thread: MQTT");
        return -1;
    }

    stack = (char *)tid + RT_ALIGN(sizeof(struct rt_thread), 8);
    result = rt_thread_init(tid,
                            "MQTT",
                            paho_mqtt_thread, client, // fun, parameter
                            stack, stack_size,        // stack, size
                            priority, 2               //priority, tick
                           );

    if (result == RT_EOK)
    {
        rt_thread_startup(tid);
        is_started = 1;
    }

    return 0;
}

paho_mqtt_thread()

paho_mqtt 线程入口函数,里面会执行 mqtt 连接回调函数 connect_callback()、连接成功回调函数 online_callback() 和 连接断开回调函数 offline_callback(),这些函数都已经在 main.c 中定义。

static void paho_mqtt_thread(void *param)
{
    MQTTClient *c = (MQTTClient *)param;
    int i, rc, len;
    int rc_t = 0;

    /* create publish pipe. */
    if (pipe(c->pub_pipe) != 0)
    {
        LOG_E("creat pipe err");
        goto _mqtt_exit;
    }

_mqtt_start:
    if (c->connect_callback)
    {
        c->connect_callback(c);
    }

    rc = net_connect(c);
    if (rc != 0)
    {
        LOG_E("Net connect error(%d)", rc);
        goto _mqtt_restart;
    }

    rc = MQTTConnect(c);
    if (rc != 0)
    {
        LOG_E("MQTT connect error(%d): %s", rc, MQTTSerialize_connack_string(rc));
        goto _mqtt_restart;
    }

    LOG_I("MQTT server connect success");

    for (i = 0; i < MAX_MESSAGE_HANDLERS; i++)
    {
        const char *topic = c->messageHandlers[i].topicFilter;
        enum QoS qos = c->messageHandlers[i].qos;

        if (topic == RT_NULL)
            continue;

        rc = MQTTSubscribe(c, topic, qos);
        LOG_I("Subscribe #%d %s %s!", i, topic, (rc < 0) || (rc == 0x80) ? ("fail") : ("OK"));

        if (rc != 0)
        {
            if (rc == 0x80)
            {
                LOG_E("QoS config err!");
            }
            goto _mqtt_disconnect;
        }
    }

    if (c->online_callback)
    {
        c->online_callback(c);
    }

    c->tick_ping = rt_tick_get();
    while (1)
    {
        int res;
        rt_tick_t tick_now;
        fd_set readset;
        struct timeval timeout;

        tick_now = rt_tick_get();
        if (((tick_now - c->tick_ping) / RT_TICK_PER_SECOND) > (c->keepAliveInterval - 5))
        {
            timeout.tv_sec = 1;
            //LOG_D("tick close to ping.");
        }
        else
        {
            timeout.tv_sec = c->keepAliveInterval - 10 - (tick_now - c->tick_ping) / RT_TICK_PER_SECOND;
            //LOG_D("timeount for ping: %d", timeout.tv_sec);
        }
        timeout.tv_usec = 0;

        FD_ZERO(&readset);
        FD_SET(c->sock, &readset);
        FD_SET(c->pub_pipe[0], &readset);

        /* int select(maxfdp1, readset, writeset, exceptset, timeout); */
        res = select(((c->pub_pipe[0] > c->sock) ? c->pub_pipe[0] : c->sock) + 1,
                     &readset, RT_NULL, RT_NULL, &timeout);
        if (res == 0)
        {
            len = MQTTSerialize_pingreq(c->buf, c->buf_size);
            rc = sendPacket(c, len);
            if (rc != 0)
            {
                LOG_E("[%d] send ping rc: %d ", rt_tick_get(), rc);
                goto _mqtt_disconnect;
            }

            /* wait Ping Response. */
            timeout.tv_sec = 5;
            timeout.tv_usec = 0;

            FD_ZERO(&readset);
            FD_SET(c->sock, &readset);

            res = select(c->sock + 1, &readset, RT_NULL, RT_NULL, &timeout);
            if (res <= 0)
            {
                LOG_E("[%d] wait Ping Response res: %d", rt_tick_get(), res);
                goto _mqtt_disconnect;
            }
        } /* res == 0: timeount for ping. */

        if (res < 0)
        {
            LOG_E("select res: %d", res);
            goto _mqtt_disconnect;
        }

        if (FD_ISSET(c->sock, &readset))
        {
            //LOG_D("sock FD_ISSET");
            rc_t = MQTT_cycle(c);
            //LOG_D("sock FD_ISSET rc_t : %d", rc_t);
            if (rc_t < 0)    goto _mqtt_disconnect;

            continue;
        }

        if (FD_ISSET(c->pub_pipe[0], &readset))
        {
            MQTTMessage *message;
            MQTTString topic = MQTTString_initializer;

            //LOG_D("pub_sock FD_ISSET");

            len = read(c->pub_pipe[0], c->readbuf, c->readbuf_size);

            if (len < sizeof(MQTTMessage))
            {
                c->readbuf[len] = '';
                LOG_D("pub_sock recv %d byte: %s", len, c->readbuf);

                if (strcmp((const char *)c->readbuf, "DISCONNECT") == 0)
                {
                    LOG_D("DISCONNECT");
                    goto _mqtt_disconnect_exit;
                }

                continue;
            }

            message = (MQTTMessage *)c->readbuf;
            message->payload = c->readbuf + sizeof(MQTTMessage);
            topic.cstring = (char *)c->readbuf + sizeof(MQTTMessage) + message->payloadlen;
            //LOG_D("pub_sock topic:%s, payloadlen:%d", topic.cstring, message->payloadlen);

            len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
                                        topic, (unsigned char *)message->payload, message->payloadlen);
            if (len <= 0)
            {
                LOG_D("MQTTSerialize_publish len: %d", len);
                goto _mqtt_disconnect;
            }

            if ((rc = sendPacket(c, len)) != PAHO_SUCCESS) // send the subscribe packet
            {
                LOG_D("MQTTSerialize_publish sendPacket rc: %d", rc);
                goto _mqtt_disconnect;
            }
        } /* pbulish sock handler. */
    } /* while (1) */

_mqtt_disconnect:
    MQTTDisconnect(c);
_mqtt_restart:
    if (c->offline_callback)
    {
        c->offline_callback(c);
    }

    net_disconnect(c);
    rt_thread_delay(RT_TICK_PER_SECOND * 5);
    LOG_D("restart!");
    goto _mqtt_start;

_mqtt_disconnect_exit:
    MQTTDisconnect(c);
    net_disconnect(c);

_mqtt_exit:
    LOG_D("thread exit");

    return;
}

mqtt_sub_callback()

mqtt 订阅回调函数,订阅后进行信息打印。

static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
{
    *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '';
    LOG_D("mqtt sub default callback: %.*s %.*s",
          msg_data->topicName->lenstring.len,
          msg_data->topicName->lenstring.data,
          msg_data->message->payloadlen,
          (char *)msg_data->message->payload);
    return;
}

mqtt_sub_default_callback()

订阅默认回调函数,上面的订阅函数没有订阅成功时,会使用默认的订阅,同时调用该回调函数。

static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
{
    *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '';
    LOG_D("mqtt sub default callback: %.*s %.*s",
          msg_data->topicName->lenstring.len,
          msg_data->topicName->lenstring.data,
          msg_data->message->payloadlen,
          (char *)msg_data->message->payload);
    return;
}

mqtt_connect_callback()

这是 mqtt 线程会执行到的一个回调函数,当 mqtt 开始尝试连接时会自动调用。

static void mqtt_connect_callback(MQTTClient *c)
{
    LOG_I("Start to connect mqtt server");
}

mqtt_online_callback()

这是 mqtt 线程会执行到的一个回调函数,当 mqtt 成功连接时会自动调用。

static void mqtt_online_callback(MQTTClient *c)
{
    LOG_D("Connect mqtt server success");
    LOG_D("Publish message: Hello,RT-Thread! to topic: %s", sup_pub_topic);
    mq_publish("Hello,RT-Thread!");
}

mq_publish()

在 mqtt_online_callback() 中,调用了 mq_publish() 函数,里面完成了信息的包装和发送,发送到订阅的主题。

static void mq_publish(const char *send_str)
{
    MQTTMessage message;
    const char *msg_str = send_str;
    const char *topic = sup_pub_topic;
    message.qos = QOS1;
    message.retained = 0;
    message.payload = (void *)msg_str;
    message.payloadlen = strlen(message.payload);

    MQTTPublish(&client, topic, &message);

    return;
}

MQTTPublish()

更底层的 mqtt 信息发送函数,对 message 的成员进行封包,然后调用最底层的发送函数(write())发送数据。

/**
 * This function publish message to specified mqtt topic.
 * [MQTTMessage] + [payload] + [topic] + ''
 *
 * @param c the pointer of MQTT context structure
 * @param topicFilter topic filter name
 * @param message the pointer of MQTTMessage structure
 *
 * @return the error code, 0 on subscribe successfully.
 */
int MQTTPublish(MQTTClient *c, const char *topicName, MQTTMessage *message)
{
    int rc = PAHO_FAILURE;
    int len, msg_len;
    char *data = 0;

    if (!c->isconnected)
        goto exit;

    msg_len = sizeof(MQTTMessage) + message->payloadlen + strlen(topicName) + 1;
    data = rt_malloc(msg_len);
    if (!data)
        goto exit;

    memcpy(data, message, sizeof(MQTTMessage));
    memcpy(data + sizeof(MQTTMessage), message->payload, message->payloadlen);
    strcpy(data + sizeof(MQTTMessage) + message->payloadlen, topicName);

    len = MQTT_local_send(c, data, msg_len);
    if (len == msg_len)
    {
        rc = 0;
    }
    //LOG_D("MQTTPublish sendto %d", len);

exit:
    if (data)
        rt_free(data);

    return rc;
}

mqtt_offline_callback()

这是 mqtt 线程会执行到的一个回调函数,当 mqtt 连接断开时会自动调用。

static void mqtt_offline_callback(MQTTClient *c)
{
    LOG_I("Disconnect from mqtt server");
}

LOG_D()

本实验中,我们可以将 LOG_D() 视为 rt_kprintf()

#define dbg_log_line(lvl, color_n, fmt, ...)                
    do                                                      
    {                                                       
        _DBG_LOG_HDR(lvl, color_n);                         
        rt_kprintf(fmt, ##__VA_ARGS__);                     
        _DBG_LOG_X_END;                                     
    }                                                       
    while (0)

LOG_D 是 RT-Thread 内核里的一个日志打印函数,详情可见:《RT-Thread 文档中心——ulog 日志》

RT-Thread 的日志 API 包括:

在这里插入图片描述

最后

以上就是满意小海豚为你收集整理的潘多拉 IOT 开发板学习(RT-Thread)—— 实验19 MQTT 协议通信实验(学习笔记)实验功能代码剖析的全部内容,希望文章能够帮你解决潘多拉 IOT 开发板学习(RT-Thread)—— 实验19 MQTT 协议通信实验(学习笔记)实验功能代码剖析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(64)

评论列表共有 0 条评论

立即
投稿
返回
顶部