概述
1. connect
在libemqtt代码中,客户端的connect代码是调用的mqtt_connect函数,代码如下:
int mqtt_connect(mqtt_broker_handle_t* broker)
{
uint8_t flags = 0x00;
uint16_t clientidlen = strlen(broker->clientid);
uint16_t usernamelen = strlen(broker->username);
uint16_t passwordlen = strlen(broker->password);
uint16_t payload_len = clientidlen + 2;
// Preparing the flags
if(usernamelen) {
payload_len += usernamelen + 2;
flags |= MQTT_USERNAME_FLAG;
}
if(passwordlen) {
payload_len += passwordlen + 2;
flags |= MQTT_PASSWORD_FLAG;
}
if(broker->clean_session) {
flags |= MQTT_CLEAN_SESSION;
}
// Variable header
uint8_t var_header[] = {
0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70, // Protocol name: MQIsdp
0x03, // Protocol version
flags, // Connect flags
broker->alive>>8, broker->alive&0xFF, // Keep alive
};
// Fixed header
uint8_t fixedHeaderSize = 2; // Default size = one byte Message Type + one byte Remaining Length
uint8_t remainLen = sizeof(var_header)+payload_len;
if (remainLen > 127) {
fixedHeaderSize++; // add an additional byte for Remaining Length
}
uint8_t fixed_header[fixedHeaderSize];
// Message Type
fixed_header[0] = MQTT_MSG_CONNECT;
// Remaining Length
if (remainLen <= 127) {
fixed_header[1] = remainLen;
} else {
// first byte is remainder (mod) of 128, then set the MSB to indicate more bytes
fixed_header[1] = remainLen % 128;
fixed_header[1] = fixed_header[1] | 0x80;
// second byte is number of 128s
fixed_header[2] = remainLen / 128;
}
uint16_t offset = 0;
uint8_t packet[sizeof(fixed_header)+sizeof(var_header)+payload_len];
memset(packet, 0, sizeof(packet));
memcpy(packet, fixed_header, sizeof(fixed_header));
offset += sizeof(fixed_header);
memcpy(packet+offset, var_header, sizeof(var_header));
offset += sizeof(var_header);
// Client ID - UTF encoded
packet[offset++] = clientidlen>>8;
packet[offset++] = clientidlen&0xFF;
memcpy(packet+offset, broker->clientid, clientidlen);
offset += clientidlen;
if(usernamelen) {
// Username - UTF encoded
packet[offset++] = usernamelen>>8;
packet[offset++] = usernamelen&0xFF;
memcpy(packet+offset, broker->username, usernamelen);
offset += usernamelen;
}
if(passwordlen) {
// Password - UTF encoded
packet[offset++] = passwordlen>>8;
packet[offset++] = passwordlen&0xFF;
memcpy(packet+offset, broker->password, passwordlen);
offset += passwordlen;
}
// Send the packet
if(broker->send(broker->socket_info, packet, sizeof(packet)) < sizeof(packet)) {
return -1;
}
return 1;
}
首先是fixed header,fixed header为两字节,按照协议来说第一个字节应该是0x10,第二字节是剩余数据长度,也就是remain length。然后是variable header,variable header定义如下:
uint8_t var_header[] = {
0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70, // Protocol name: MQIsdp
0x03, // Protocol version
flags, // Connect flags
broker->alive>>8, broker->alive&0xFF, // Keep alive
};
首先是协议的名字,长度为6个字节,协议名字这里为"MQIsdp",同mqtt v3.1.1协议上有点区别,在v3.1.1上是"MQTT"。然后是协议版本和flags,这里协议版本是0x03。最后是keep alive,keep alive这里为30。
注意,上面variable header长度总共为12字节。
最后是payload,payload_len为clientidlen + 2(当然这里的username和password都是没有的),而这里的clientid为"client-id",长度为9字节,加上2字节,那么payload_len值就为11,也就是说前面的remain length就为12+11=23,那么payload又是什么呢?payload部分,前两字节为payload的长度,也就是9字节,后面是"client-id",那么packet组成了,调用socket的send函数将数据发送出去。
2. connack
发送了connect数据包之后,自然是等待服务器端向客户端发送connack数据包,这里调用的是read_packet函数,代码如下:
int read_packet(int timeout)
{
if(timeout > 0)
{
fd_set readfds;
struct timeval tmv;
// Initialize the file descriptor set
FD_ZERO (&readfds);
FD_SET (socket_id, &readfds);
// Initialize the timeout data structure
tmv.tv_sec = timeout;
tmv.tv_usec = 0;
// select returns 0 if timeout, 1 if input available, -1 if error
if(select(1, &readfds, NULL, NULL, &tmv))
return -2;
}
int total_bytes = 0, bytes_rcvd, packet_length;
memset(packet_buffer, 0, sizeof(packet_buffer));
if((bytes_rcvd = recv(socket_id, (packet_buffer+total_bytes), RCVBUFSIZE, 0)) <= 0) {
return -1;
}
total_bytes += bytes_rcvd; // Keep tally of total bytes
if (total_bytes < 2)
return -1;
// now we have the full fixed header in packet_buffer
// parse it for remaining length and number of bytes
uint16_t rem_len = mqtt_parse_rem_len(packet_buffer);
uint8_t rem_len_bytes = mqtt_num_rem_len_bytes(packet_buffer);
//packet_length = packet_buffer[1] + 2; // Remaining length + fixed header length
// total packet length = remaining length + byte 1 of fixed header + remaning length part of fixed header
packet_length = rem_len + rem_len_bytes + 1;
while(total_bytes < packet_length) // Reading the packet
{
if((bytes_rcvd = recv(socket_id, (packet_buffer+total_bytes), RCVBUFSIZE, 0)) <= 0)
return -1;
total_bytes += bytes_rcvd; // Keep tally of total bytes
}
return packet_length;
}
这个函数主要是从服务器端读取数据的,核心是socket的recv函数。首先是超时处理,这里使用的是select系统调用,然后调用recv接收数据,解析出remain length和remain length所占字节数,如果还有数据未接收完毕,继续调用recv函数,最终数据是读取到全局变量packet_buffer中的,并返回读取的数据长度。
这里我把接收到的数据给dump出来了,数据是:
20 02 00 00
对照协议呢,第一个字节为packet的类型,这里为CONNACK,然后是remain length,这里为2。然后是variable header,第一个字节flags为0,第二字节为return code,return code有:
通过返回值可以看出,我们这里是连接成功了的。
3. subscribe
既然连接已经成功了,自然是发送订阅消息,这里调用的是mqtt_subscribe函数,代码如下:
int mqtt_subscribe(mqtt_broker_handle_t* broker, const char* topic, uint16_t* message_id) {
uint16_t topiclen = strlen(topic);
// Variable header
uint8_t var_header[2]; // Message ID
var_header[0] = broker->seq>>8;
var_header[1] = broker->seq&0xFF;
if(message_id) { // Returning message id
*message_id = broker->seq;
}
broker->seq++;
// utf topic
uint8_t utf_topic[topiclen+3]; // Topic size (2 bytes), utf-encoded topic, QoS byte
memset(utf_topic, 0, sizeof(utf_topic));
utf_topic[0] = topiclen>>8;
utf_topic[1] = topiclen&0xFF;
memcpy(utf_topic+2, topic, topiclen);
// Fixed header
uint8_t fixed_header[] = {
MQTT_MSG_SUBSCRIBE | MQTT_QOS1_FLAG, // Message Type, DUP flag, QoS level, Retain
sizeof(var_header)+sizeof(utf_topic)
};
uint8_t packet[sizeof(var_header)+sizeof(fixed_header)+sizeof(utf_topic)];
memset(packet, 0, sizeof(packet));
memcpy(packet, fixed_header, sizeof(fixed_header));
memcpy(packet+sizeof(fixed_header), var_header, sizeof(var_header));
memcpy(packet+sizeof(fixed_header)+sizeof(var_header), utf_topic, sizeof(utf_topic));
// Send the packet
if(broker->send(broker->socket_info, packet, sizeof(packet)) < sizeof(packet)) {
return -1;
}
return 1;
}
协议上说了fixed header第一字节为0x82,第二字节为remain length。
variable header这里只占两字节,含义是Packet Identifier,这里是从1开始的,每订阅一条消息这个值会加1(broker->seq++)。
payload部分,前两字节为payload数据的长度,然后是订阅的主题,这里是"public/test/topic",最后一字节是同QoS相关的。那么payload总共就有17+3=20字节,那么fixed header的reamin length就为20+2=22。
4. suback
发送了subscribe消息之后,还需要等待一个ack消息进行确认,这里还是调用的read_packet函数,接收到的数据如下:
90 03 00 01 00 第一字节为fixed header中的包类型,然后是reamin length为3。variable header有两字节,含义是Packet Identifier,即服务器端又把这个数据返回给我们了(从1开始)。payload是一个返回代码,这里为0x00。
在客户端接收到这个数据包之后,首先判断这个包是不是SUBACK类型包,然后判断发送过去的Packet Identifier和接收到的是否相同,如果一切OK,那么客户端这部分有个while死循环,来接收服务器端发送过来的消息,这部分代码如下:
while(1)
{
// <<<<<
packet_length = read_packet(0);
if(packet_length == -1)
{
fprintf(stderr, "Error(%d) on read packet!n", packet_length);
return -1;
}
else if(packet_length > 0)
{
printf("Packet Header: 0x%x...n", packet_buffer[0]);
if(MQTTParseMessageType(packet_buffer) == MQTT_MSG_PUBLISH)
{
uint8_t topic[255], msg[1000];
uint16_t len;
len = mqtt_parse_pub_topic(packet_buffer, topic);
topic[len] = '