概述
玩转RT-Thread系列教程(13)–MQTT协议通信
一、了解一下MQTT
1.MQTT介绍
客户端 Client
使用MQTT的程序或设备。客户端总是通过网络连接到服务端。它可以
-
发布应用消息给其它相关的客户端。
-
订阅以请求接受相关的应用消息。
-
取消订阅以移除接受应用消息的请求。
-
从服务端断开连接。
服务端 Server
一个程序或设备,作为发送消息的客户端和请求订阅的客户端之间的中介。服务端 -
接受来自客户端的网络连接。
-
接受客户端发布的应用消息。
-
处理客户端的订阅和取消订阅请求。
-
转发应用消息给符合条件的已订阅客户端。
说白了一方为供应商,一方为消费者(Client),供应商(Server)一旦和消费者产生了联系,那么供应商(Server)就会提供商品给消费者,同时消费者(Client)也可以向供应商提供意见。
订阅 Subscription
订阅包含一个主题过滤器(Topic Filter)和一个最大的服务质量(QoS)等级。订阅与单个会话(Session)关联。会话可以包含多于一个的订阅。会话的每个订阅都有一个不同的主题过滤器。
- QoS0,At most once,至多一次;Sender 发送的一条消息,Receiver 最多能收到一次,如果发送失败,也就算了。
- QoS1,At least once,至少一次;Sender 发送的一条消息,Receiver 至少能收到一次,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但Receiver 有可能会收到重复的消息
- QoS2,Exactly once,确保只有一次。Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。
2.MQTT协议数据包结构
一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、有效载荷(payload)三部分构成。
- (1)固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。
- (2)可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。
- (3)有效载荷(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。
固定报头 |
---|
byte1 | MQTT报文类型(1) | Reserved 保留位 | |||||||
---|---|---|---|---|---|---|---|---|---|
10 | 0 | 0 | 0 | 1 | 0 | 0 | 0 | 0 | |
byte2 | 剩余长度 | ||||||||
27 | 0 | 0 | 1 | 0 | 0 | 1 | 1 | 1 |
可变报头 |
---|
说明 | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
协议名 | |||||||||
byte 1 | 长度 MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 2 | 长度 LSB (4) | 0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 |
byte 3 | ‘M’ | 0 | 1 | 0 | 0 | 1 | 1 | 0 | 1 |
byte 4 | ‘Q’ | 0 | 1 | 0 | 1 | 0 | 0 | 0 | 1 |
byte 5 | ‘T’ | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 |
byte 6 | ‘T’ | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 |
这里我用Onenet的连接报文示例,其他的同理
二、RTT中MQTT组件
1.menuconfig进入env,选择IOT组件
2.选择Paho MQTT组件以及CJSON解析组件
Eclipse paho是eclipse基金会下面的一个开源项目,基于MQTT协议的客户端
json是一种轻量级的数据交换格式,简洁和清晰的层次结构使得 JSON 成为理想的数据交换语言,易于人阅读和编写
cjson组件则是可以对服务器端发来的json数据进行解析,对比于常规的使用C库解析字符串方式,cjson为我们封装好了解析方法,调用更加灵活方便。
3.使用pkgs --update更新、下载软件包
4.使用scons --target=md5生成mdk工程
5.打开mdk工程
可以看见,我们选择的paho以及cjson组件已经添加到了我们的工程中
三、MQTT的使用
3.1、编写MQTT客户端
1.宏定义连接mqtt服务器需要的参数:
#define MQTT_Uri "tcp://xxx.xxx.xxx:1883" // MQTT服务器的地址和端口号
#define ClientId "751061401" // ClientId需要唯一
#define UserName "rb" // 用户名
#define PassWord "123456" // 用户名对应的密码
2.定义一个mqtt客户端结构体变量
/* 定义一个MQTT客户端结构体 */
static MQTTClient client;
3.对MQTT进行配置
/* 用以存放邮件 */
rt_ubase_t* buf;
/* 对MQTT客户端结构体变量进行配置 */
client.isconnected = 0;
client.uri = MQTT_Uri;
/* 配置MQTT的连接参数 */
MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
memcpy(&client.condata, &condata, sizeof(condata));
client.condata.clientID.cstring = ClientId;
client.condata.keepAliveInterval = 30;
client.condata.cleansession = 1;
client.condata.username.cstring = UserName;
client.condata.password.cstring = PassWord;
4.为MQTT的消息缓存申请内存
/* 为mqtt申请内存 */
client.buf_size = client.readbuf_size = 1024;
client.buf = rt_calloc(1, client.buf_size);
client.readbuf = rt_calloc(1, client.readbuf_size);
if (!(client.buf && client.readbuf))
{
rt_kprintf("no memory for MQTT client buffer!rn");
return;
}
5.设置回调函数,以及订阅主题;设置默认的回调函数,是在如果有订阅的 Topic 没有设置回调函数时,则使用该默认回调函数
/* 设置回调函数 */
client.connect_callback = mqtt_connect_callback;
client.online_callback = mqtt_online_callback;
client.offline_callback = mqtt_offline_callback;
/* 订阅一个主题,并设置其回调函数 */
client.messageHandlers[0].topicFilter = rt_strdup(ONENET_MQTT_SUBTOPIC);
client.messageHandlers[0].callback = mqtt_sub_callback;
client.messageHandlers[0].qos = QOS1;
/* 设置默认的回调函数 */
client.defaultMessageHandler = mqtt_sub_default_callback;
6.启动 mqtt client
/* 启动 mqtt client */
paho_mqtt_start(&client);
7.实现各个回调函数
/* 默认的订阅回调函数,如果有订阅的 Topic 没有设置回调函数,则使用该默认回调函数 */
static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
{
*((char *) msg_data->message->payload + msg_data->message->payloadlen) = '