概述
一、MQTT - MQ Telemetry Transport
- 轻量级的 machine-to-machine 通信协议。
- publish/subscribe模式。
- 基于TCP/IP。
- 支持QoS。
- 适合于低带宽、不可靠连接、嵌入式设备、CPU内存资源紧张。
- 是一种比较不错的Android消息推送方案。
- FacebookMessenger采用了MQTT。
- MQTT有可能成为物联网的重要协议。
消息体
MessageType
0x01 Connection Refused: unacceptable protocol version
0x02 Connection Refused: identifier rejected
0x03 Connection Refused: server unavailable
0x04 Connection Refused: bad user name or password
0x05 Connection Refused: not authorized
QoS
Clean Session
二、协议初解
先说一下整个协议的构造,整体上协议可拆分为:
固定头部+可变头部+消息体
协议说白了就是对于双方通信的一个约定,比如传过来一段字符流,第1个字节表示什么,第2个字节表示什么。。。。一个约定。
所以在固定头部的构造如下:
1、MessageType(0和15保留,共占4位)
public $operations=array(
"MQTT_CONNECT"=>1,//请求连接
"MQTT_CONNACK"=>2,//请求应答
"MQTT_PUBLISH"=>3,//发布消息
"MQTT_PUBACK"=>4,//发布应答
"MQTT_PUBREC"=>5,//发布已接收,保证传递1
"MQTT_PUBREL"=>6,//发布释放,保证传递2
"MQTT_PUBCOMP"=>7,//发布完成,保证传递3
"MQTT_SUBSCRIBE"=>8,//订阅请求
"MQTT_SUBACK"=>9,//订阅应答
"MQTT_UNSUBSCRIBE"=>10,//取消订阅
"MQTT_UNSUBACK"=>11,//取消订阅应答
"MQTT_PINGREQ"=>12,//ping请求
"MQTT_PINGRESP"=>13,//ping响应
"MQTT_DISCONNECT"=>14//断开连接
);
2、DUP flag
其是用来在保证消息传输可靠的,如果设置为1,则在下面的变长头部里多加MessageId,并需要回复确认,保证消息传输完成,但不能用于检测消息重复发送。
3、Qos
主要用于PUBLISH(发布态)消息的,保证消息传递的次数。
00表示最多一次 即<=1
01表示至少一次 即>=1
10表示一次,即==1
11保留后用
4、Retain
主要用于PUBLISH(发布态)的消息,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它。如果不设那么推送至当前订阅的就释放了。
5、固定头部的byte 2
是用来保存接下去的变长头部+消息体的总大小的。
但是不是并不是直接保存的,同样也是可以扩展的,其机制是,前7位用于保存长度,后一部用做标识。
我举个例了,即如果计算出后面的大小为0<length<=127的,正常保存
如果是127<length<16383的,则需要二个字节保存了,将第一个字节的最大的一位置1,表示未完。然后第二个字节继续存。
拿130来说,第一个字节存10000011,第二个字节存000000001,也就是0x83,0x01,把两个字节连起来看,第二个字节权重从2的8次开始。
同起可以加第3个字节,最多可以加至第4个字节。故MQTT协议最多可以实现268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)将近256M的数据。可谓能伸能缩。
可变头部
这个是可变头部的全貌。
1、首先最上面的8个字节是Protocol Name(编码名),UTF编码的字符“MQIsdp”,头两个是编码名提长为6。
这里多说一些,接下去的协议多采用这种方式组合,即头两个字节表示下一部分的长,然后后面跟上内容。这里头两个字节长为6,下面跟6个字符“MQIsdp”。
2、Protocol Version,协议版本号,v3 也是固定的。
3、Connect Flag,连接标识,有点像固定头部的。8位分别代表不同的标志。第1个字节保留。
Clean Session,Will flag,Will Qos, Will Retain都是相对于CONNECT消息来说的。
Clean Session:0表示如果订阅的客户机断线了,那么要保存其要推送的消息,如果其重新连接时,则将这些消息推送。
1表示消除,表示客户机是第一次连接,消息所以以前的连接信息。
Will Flag,表示如果客户机在不是在发送DISCONNECT消息中断,比如IO错误等,将些置为1,要求重传。并且下且的WillQos和WillRetain也要设置,消息体中的Topic和MessageID也要设置,就是表示发生了错误,要重传。
Will Qos,在CONNECT非正常情况下设置,一般如果标识了WillFlag,那么这个位置也要标识。
Will RETAIN:同样在CONNECT中,如果标识了WillFlag,那么些位也一定要标识
usename flag和passwordflag,用来标识是否在消息体中传递用户和密码,只有标识了,消息体中的用户名和密码才用效,只标记密码而不标记用户名是不合法的。
4、Keep Alive,表示响应时间,如果这个时间内,连接或发送操作未完成,则断开tcp连接,表示离线。
5、Connect Return Code即通常于CONNACK消息中,表示返回的连接情况,我可以通过此检验连接情况。
6、Topic Name,订阅消息标识,MQTT是基于订阅/发布的消息,那么这个就是消息订阅的标识,像新闻客户端里的订阅不同的栏目一样。用于区别消息的推送类别。
主要用于PUBLISH和SUBSCRIBE中。最大可支持32767个字符,即4个字节。
消息体(PayLoad)
只有3种消息有消息体CONNECT,SUBSCRIBE,SUBACK
CONNECT主要是客户机的ClientID,订阅的Topic和Message以及用户名和密码,其于变长头部中的will是对应的。
SUBSCRIBE是包含了一系列的要订阅的主题以及QOS。
SUBACK是用服务器对于SUBSCRIBE所申请的主题及QOS进行确认和回复。
而PUBLISH是消息体中则保存推送的消息,以二进制形式,当然这里的编辑可自定义。
7、Message Identifier
包含于PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK.
其为16位字符表示,用于在Qos为1或2时标识Message的,保证Message传输的可靠性。
至于具体的消息例子,我们在后面的代码中慢慢体现。
三、MQTT协议笔记之头部信息
前言
MQTT(Message Queue Telemetry Transport),遥测传输协议,提供订阅/发布模式,更为简约、轻量,易于使用,针对受限环境(带宽低、网络延迟高、网络通信不稳定),可以简单概括为物联网打造,官方总结特点如下:
1.使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2. 对负载内容屏蔽的消息传输。
3. 使用 TCP/IP 提供网络连接。
4. 有三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5. 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
6. 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
MQTT 3.1协议在线版本: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
官方下载地址: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/MQTT_V3.1_Protocol_Specific.pdf
PDF版本,42页,不算多。
另外,目前MQTT大家都用在了手机推送,可能还有很多的使用方式,有待进一步的探索。
协议方面,以前曾简单实现过一点HTTP协议,基于HTTP上构建若干种通信管道的socket.io协议,不过socket.io 0.9版本的协议才两三页而已。面对领域不同,自然解决的方式也不一样。
阅读完毕MQTT协议,有一个想法,其实可以基于MQTT协议,打造更加私有、精简(协议一些地方,略显多余)的传输协议,比如一个字节的传输开销。有时间,会详细说一下。
固定头部
固定头部,使用两个字节,共16位:
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | Message Type | DUP flag | QoS level | RETAIN | ||||
byte 2 | Remaining Length |
第一个字节(byte 1)
消息类型(4-7),使用4位二进制表示,可代表16种消息类型:
Mnemonic | Enumeration | Description |
---|---|---|
Reserved | 0 | Reserved |
CONNECT | 1 | Client request to connect to Server |
CONNACK | 2 | Connect Acknowledgment |
PUBLISH | 3 | Publish message |
PUBACK | 4 | Publish Acknowledgment |
PUBREC | 5 | Publish Received (assured delivery part 1) |
PUBREL | 6 | Publish Release (assured delivery part 2) |
PUBCOMP | 7 | Publish Complete (assured delivery part 3) |
SUBSCRIBE | 8 | Client Subscribe request |
SUBACK | 9 | Subscribe Acknowledgment |
UNSUBSCRIBE | 10 | Client Unsubscribe request |
UNSUBACK | 11 | Unsubscribe Acknowledgment |
PINGREQ | 12 | PING Request |
PINGRESP | 13 | PING Response |
DISCONNECT | 14 | Client is Disconnecting |
Reserved | 15 | Reserved |
除去0和15位置属于保留待用,共14种消息事件类型。
DUP flag(打开标志)
保证消息可靠传输,默认为0,只占用一个字节,表示第一次发送。不能用于检测消息重复发送等。只适用于客户端或服务器端尝试重发PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意需要满足以下条件:
当QoS > 0
消息需要回复确认
此时,在可变头部需要包含消息ID。当值为1时,表示当前消息先前已经被传送过。
QoS(Quality of Service,服务质量)
使用两个二进制表示PUBLISH类型消息:
QoS value | bit 2 | bit 1 | Description | ||
---|---|---|---|---|---|
0 | 0 | 0 | 至多一次 | 发完即丢弃 | <=1 |
1 | 0 | 1 | 至少一次 | 需要确认回复 | >=1 |
2 | 1 | 0 | 只有一次 | 需要确认回复 | =1 |
3 | 1 | 1 | 待用,保留位置 |
RETAIN(保持)
仅针对PUBLISH消息。不同值,不同含义:
1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。
备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。
0:仅仅为当前订阅者推送此消息。
假如服务器收到一个空消息体(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服务器可以删除掉对应的已被持久化的PUBLISH消息。
如何解析
因为java使用有符号(最高位为符号位)数据表示,byte范围:-128-127。该字节的最高位(左边第一位),可能为1。若直接转换为byte类型,会出现负数,这是一个雷区。DataInputStream提供了int readUnsignedByte()读取方式,请注意。下面演示了,如何从一个字节中,获取到所有定义的信息,同时绕过雷区:
public static void main(String[] args) {
byte publishFixHeader = 50;// 0 0 1 1 0 0 1 0
doGetBit(publishFixHeader);
int ori = 224;//1110000,DISCONNECT ,Message Type (14)
byte flag = (byte) ori; //有符号byte
doGetBit(flag);
doGetBit_v2(ori);
}
public static void doGetBit(byte flags) {
boolean retain = (flags & 1) > 0;
int qosLevel = (flags & 0x06) >> 1;
boolean dupFlag = (flags & 8) > 0;
int messageType = (flags >> 4) & 0x0f;
System.out.format(
"Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%sn",
messageType, dupFlag, qosLevel, retain);
}
public static void doGetBit_v2(int flags) {
boolean retain = (flags & 1) > 0;
int qosLevel = (flags & 0x06) >> 1;
boolean dupFlag = (flags & 8) > 0;
int messageType = flags >> 4;
System.out.format(
"Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%sn",
messageType, dupFlag, qosLevel, retain);
}
处理Remaining Length(剩余长度)
在当前消息中剩余的byte(字节)数,包含可变头部和负荷(称之为内容/body,更为合适)。单个字节最大值:01111111,16进制:0x7F,10进制为127。单个字节为什么不能是11111111(0xFF)呢?因为MQTT协议规定,第八位(最高位)若为1,则表示还有后续字节存在。同时MQTT协议最多允许4个字节表示剩余长度。那么最大长度为:0xFF,0xFF,0xFF,0x7F,二进制表示为:11111111,11111111,11111111,01111111,十进制:268435455 byte=261120KB=256MB=0.25GB 四个字节之间值的范围:
Digits | From | To |
---|---|---|
1 | 0 (0x00) | 127 (0x7F) |
2 | 128 (0x80, 0x01) | 16 383 (0xFF, 0x7F) |
3 | 16 384 (0x80, 0x80, 0x01) | 2 097 151 (0xFF, 0xFF, 0x7F) |
4 | 2 097 152 (0x80, 0x80, 0x80, 0x01) | 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F) |
如何换算成十进制呢 ? 使用java语言表示如下:
public static void main(String[] args) throws IOException {
// 模拟客户端写入
ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(arrayOutputStream);
dataOutputStream.write(0xff);
dataOutputStream.write(0xff);
dataOutputStream.write(0xff);
dataOutputStream.write(0x7f);
InputStream arrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray());
// 模拟服务器/客户端解析
System. out.println( "result is " + bytes2Length(arrayInputStream));
}
/**
* 转化字节为 int类型长度
* @param in
* @return
* @throws IOException
*/
private static int bytes2Length(InputStream in) throws IOException {
int multiplier = 1;
int length = 0;
int digit = 0;
do {
digit = in.read(); //一个字节的有符号或者无符号,转换转换为四个字节有符号 int类型
length += (digit & 0x7f) * multiplier;
multiplier *= 128;
} while ((digit & 0x80) != 0);
return length;
}
一般最后一个字节小于127(01111111),和0x80(10000000)进行&操作,最终结果都为0,因此计算会终止。代理中间件和请求者,中间传递的是字节流Stream,自然要从流中读取,逐一解析出来。
那么如何将int类型长度解析为不确定的字节值呢?
public static void main(String[] args) throws IOException {
// 模拟服务器/客户端写入
ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(
arrayOutputStream);
// 模拟服务器/客户端解析
length2Bytes(dataOutputStream, 128);
}
/**
* int类型长度解析为1-4个字节
* @param out
* @param length
* @throws IOException
*/
private static void length2Bytes(OutputStream out, int length)
throws IOException {
int val = length;
do {
int digit = val % 128;
val = val / 128;
if (val > 0)
digit = digit | 0x80;
out.write(digit);
} while (val > 0);
}
digit对val求模,最大值可能是127,一旦127 | 10000000 = 11111111 = 0xff = 255 请注意:剩余长度,只在固定头部中,无论是一个字节,还是四个字节,不能被算作可变头部中。
可变头部
固定头部仅定义了消息类型和一些标志位,一些消息的元数据,需要放入可变头部中。可变头部内容字节长度 + Playload/负荷字节长度 = 剩余长度,这个是需要牢记的。可变头部,包含了协议名称,版本号,连接标志,用户授权,心跳时间等内容,这部分和后面要讲到的CONNECT消息类型,有重复,暂时略过。
Playload/消息体/负荷
消息体主要是为配合固定/可变头部命令(比如CONNECT可变头部User name标记若为1则需要在消息体中附加用户名称字符串)而存在。
CONNECT/SUBSCRIBE/SUBACK/PUBLISH等消息有消息体。PUBLISH的消息体以二进制形式对待。
请记住,MQTT协议只允许在PUBLISH类型消息体中使用自定义特性,在固定/可变头部想加入自定义私有特性,就免了吧。这也是为了协议免于流于形式,变得很分裂也为了兼顾现有客户端等。比如支持压缩等,那就可以在Playload中定义数据支持,在应用中进行读取处理。
这部分会在后面详细论述。
消息标识符/消息ID
固定头中的QoS level标志值为1或2时才会在:PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK等消息的可变头中出现。
一个16位无符号位的short类型值(值不能为 0,0做保留作为无效的消息ID),仅仅要求在一个特定方向(服务器发往客户端为一个方向,客户端发送到服务器端为另一个方向)的通信消息中必须唯一。比如客户端发往服务器,有可能存在服务器发往客户端会同时存在重复,但不碍事。
可变头部中,需要两个字节的顺序是MSB(Most Significant Bit) LSB(Last/Least Significant Bit),翻译成中文就是,最高有效位,最低有效位。最高有效位在最低有效位左边/上面,表示这是一个大端字节/网络字节序,符合人的阅读习惯,高位在最左边。
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
Message Identifier MSB | ||||||||
Message Identifier LSB |
但凡如此表示的,都可以视为一个16位无符号short类型整数,两个字节表示。在JAVA中处理比较简单:
DataInputStream.readUnsignedShort
或者
in.read() * 0xFF + in.read();
最大长度可为: 65535
UTF-8编码
有关字符串,MQTT采用的是修改版的UTF-8编码,一般形式为如下,需要牢记:
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | String Length MSB | |||||||
byte 2 | String Length LSB | |||||||
bytes 3 ... | Encoded Character Data |
比如AVA,使用writeUTF()方法写入一串文字“OTWP”,头两个字节为一个完整的无符号数字,代表字符串字节长度,后面四个字节才是字符串真正的长度,共六个字节:
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | Message Length MSB (0x00) | |||||||
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 2 | Message Length LSB (0x04) | |||||||
0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 | |
byte 3 | 'O' (0x4F) | |||||||
0 | 1 | 0 | 0 | 1 | 1 | 1 | 1 | |
byte 4 | 'T' (0x54) | |||||||
0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 | |
byte 5 | 'W' (0x57) | |||||||
0 | 1 | 0 | 1 | 0 | 1 | 1 | 1 | |
byte 6 | 'P' (0x50) | |||||||
0 | 1 | 0 | 1 | 0 | 0 | 0 | 0 |
这点,在程序中,可不用单独处理默认,直接使用readUTF()方法,可自动省去了处理字符串长度的麻烦。当然,可以手动读取字符串:
// 模拟写入
dataOutputStream.writeUTF( "abcd");// 2 + 4 = 6 byte
......
// 模拟读取
int decodedLength = dataInputStream.readUnsignedShort();//2 byte
byte[] decodedString = new byte[decodedLength]; // 4 bytes
dataInputStream.read(decodedString);
String target = new String(decodedString, "UTF-8");
等同于:
String target = dataInputStream.readUTF();
MQTT无论是可变头部还是消息体中,只要是字符串部分,都是采用了修改版的UTF-8编码,读取和写入,借助DataInputStream/DataOutputStream的帮助,一行语句,略去了手动处理的麻烦。
小结
总之,掌握固定头部的QoS level、RETAIN标记、可变头部的Connect flags作用和意义,对总体理解MQTT作用很大。
四、MQTT协议笔记之连接和心跳
前言
本篇会把连接(CONNECT)、心跳(PINGREQ/PINGRESP)、确认(CONNACK)、断开连接(DISCONNECT)和在一起。
CONNECT
像前面所说,MQTT有关字符串部分采用的修改版的UTF-8编码,CONNECT可变头部中协议名称、消息体都是采用修改版的UTF-8编码。前面基本上可变头部内容不多,下面是一个较为完整的CONNECT消息结构:
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | ||
---|---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | ||||||||||
Message Type(1) | DUP flag | QoS level | RETAIN | |||||||
byte 1
| 0 | 0 | 0 | 1 | x | x | x | x | ||
byte 2 | Remaining Length | |||||||||
Variable header /可变头部 | ||||||||||
Protocol Name | ||||||||||
byte 1 | Length MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 2 | Length LSB (6) | 0 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | |
byte 3 | 'M' | 0 | 1 | 0 | 0 | 1 | 1 | 0 | 1 | |
byte 4 | 'Q' | 0 | 1 | 0 | 1 | 0 | 0 | 0 | 1 | |
byte 5 | 'I' | 0 | 1 | 0 | 0 | 1 | 0 | 0 | 1 | |
byte 6 | 's' | 0 | 1 | 1 | 1 | 0 | 0 | 1 | 1 | |
byte 7 | 'd' | 0 | 1 | 1 | 0 | 0 | 1 | 0 | 0 | |
byte 8 | 'p' | 0 | 1 | 1 | 1 | 0 | 0 | 0 | 0 | |
Protocol Version Number | ||||||||||
byte 9 | Version (3) | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 1 | |
Connect Flags | ||||||||||
User Name Flag | Password Flag | Will Retain | Will QoS | Will Flag | Clean Session | Reserved | ||||
byte 10
| 1 | 1 | 0 | 0 | 1 | 1 | 1 | x | ||
Keep Alive timer | ||||||||||
byte 11 | Keep Alive MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 12 | Keep Alive LSB (10) | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 | |
Playload/消息体 | ||||||||||
Client Identifier(客户端ID) 1-23个字符长度,客户端到服务器的全局唯一标志,如果客户端ID超出23个字符长度,服务器需要返回码为2,标识符被拒绝响应的CONNACK消息。处理QoS级别1和2的消息ID中,可以使用到。 必填项。 | ||||||||||
Will Topic Will Flag值为1,这里便是Will Topic的内容。QoS级别通过Will QoS字段定义,RETAIN值通过Will RETAIN标识,都定义在可变头里面。 | ||||||||||
Will Message Will Flag若设为1,这里便是Will Message定义消息的内容,对应的主题为Will Topic。如果客户端意外的断开触发服务器PUBLISH此消息。长度有可能为0。 在CONNECT消息中的Will Message是UTF-8编码的,当被服务器发布时则作为二进制的消息体。 | ||||||||||
User Name 如果设置User Name标识,可以在此读取用户名称。一般可用于身份验证。协议建议用户名为不多于12个字符,不是必须。 | ||||||||||
Password 如果设置Password标识,便可读取用户密码。建议密码为12个字符或者更少,但不是必须。 |
可变头部
协议名称和协议版本都是固定的。
连接标志(Connect Flags)
一个字节表示,除了第1位是保留未使用,其它7位都具有不同含义。
业务上很重要,对消息总体流程影响很大,需要牢记。
Clean Session
0,表示如果订阅的客户机断线了,要保存为其要推送的消息(QoS为1和QoS为2),若其重新连接时,需将这些消息推送(若客户端长时间不连接,需要设置一个过期值)。 1,断线服务器即清理相关信息,重新连接上来之后,会再次订阅。
Will Flag
定义了客户端(没有主动发送DISCONNECT消息)出现网络异常导致连接中断的情况下,服务器需要做的一些措施。
简而言之,就是客户端预先定义好,在自己异常断开的情况下,所留下的最后遗愿(Last Will),也称之为遗嘱(Testament)。 这个遗嘱就是一个由客户端预先定义好的主题和对应消息,附加在CONNECT的可变头部中,在客户端连接出现异常的情况下,由服务器主动发布此消息。
只有在Will Flag位为1时,Will Qos和Will Retain才会被读取,此时消息体Playload中要出现Will Topic和Will Message具体内容,否则,Will QoS和Will Retain值会被忽略掉。
Will Qos
两位表示,和PUBLISH消息固定头部的QoS level含义一样。这里先掠过,到PUBLISH消息再回过头来看看,会更明白些。
若标识了Will Flag值为1,那么Will QoS就会生效,否则会被忽略掉。
Will RETAIN
如果设置Will Flag,Will Retain标志就是有效的,否则它将被忽略。
当客户端意外断开服务器发布其Will Message之后,服务器是否应该继续保存。这个属性和PUBLISH固定头部的RETAIN标志含义一样,这里先掠过。
User name 和 password Flag:
用于授权,两者要么为0要么为1,否则都是无效。都为0,表示客户端可自由连接/订阅,都为1,表示连接/订阅需要授权。
Playload/消息体
消息体定义的消息顺序(如上表所示),约定俗成,不得更改,否则将可能引起混乱。
若Will Flag值为0,那么在Playload中,Client Identifer后面就不会存在Will Topic和Will Message内容。
若User Name和Password都为0,意味着Playload/消息体中,找不到User Name和password的值,就算有,也是无效。标志决定着是否读取与否。
心跳时间(Keep Alive timer)
以秒为单位,定义服务器端从客户端接收消息的最大时间间隔。一般应用服务会在业务层次检测客户端网络是否连接,不是TCP/IP协议层面的心跳机制(比如开启SOCKET的SO_KEEPALIVE选项)。 一般来讲,在一个心跳间隔内,客户端发送一个PINGREQ消息到服务器,服务器返回PINGRESP消息,完成一次心跳交互,继而等待下一轮。若客户端没有收到心跳反馈,会关闭掉TCP/IP端口连接,离线。 16位两个字节,可看做一个无符号的short类型值。最大值,2^16-1 = 65535秒 = 18小时。最小值可以为0,表示客户端不断开。一般设为几分钟,比如微信心跳周期为300秒。
Will Message编码
Will Message在CONNECT Payload/息体中,使用UTF-8编码。假设内容为“abcd”,大概如下:
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
byte 1 | Length MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 2 | Length LSB (4) | 0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 |
byte 3 | 'a' (0x61) | 0 | 1 | 1 | 0 | 0 | 0 | 0 | 1 |
byte 4 | 'b' (0x62) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 0 |
byte 5 | 'c' (0x63) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 1 |
byte 6 | 'd' (0x64) | 0 | 1 | 1 | 0 | 0 | 1 | 0 | 0 |
有一点需要记住,PUBLISH的Payload/消息体中以二进制编码保存。
某刻客户端异常关闭触发服务器会PUBLISH此消息。那么服务器会直接把byte3-byte6之间字符取出,保存为二进制,附加到PUBLISH消息体中,大概存储如下:
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
byte 1 | 'a' (0x61) | 0 | 1 | 1 | 0 | 0 | 0 | 0 | 1 |
byte 2 | 'b' (0x62) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 0 |
byte 3 | 'c' (0x63) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 1 |
byte 4 | 'd' (0x64) | 0 | 1 | 1 | 0 | 0 | 1 | 0 | 0 |
另外,MQTT 3.1协议对Will message的说明很容易引起误解,3.1.1草案已经得到修正。
相关说明:
http://mqtt.org/wiki/doku.php/will message utf8_support
https://tools.oasis-open.org/issues/browse/MQTT-2
连接异常中断通知机制
CONNECT消息一旦设置在可变头部设置了Will flag标记,那就启用了Last-Will-And-Testament特性,此特性很赞。
一旦客户端出现异常中断,便会触发服务器发布Will Message消息到Will Topic主题上去,通知Will Topic订阅者,对方因异常退出。
接收CONNECT后的响应动作
接收到CONNECT消息之后,服务器应该返回一个CONNACK消息作为响应:
- 若客户端绕过CONNECT消息直接发送其它类型消息,服务器应关闭此非法连接 若客户端发送CONNECT之后未收到CONNACT,需要关闭当前连接,然后重新连接
- 相同Client ID客户端已连接到服务器,先前客户端必须断开连接后,服务器才能完成新的客户端CONNECT连接 客户端发送无效非法CONNECT消息,服务器需要关闭
CONNACK
一个完整的CONNACK消息大致如下:
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | |||||||||
byte 1 | Message type (2) | DUP flag | QoS flags | RETAIN | |||||
0 | 0 | 1 | 0 | x | x | x | x | ||
byte 2 | Remaining Length (2) | ||||||||
0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 | ||
Variable header /可变头部 | |||||||||
Topic Name Compression Response | |||||||||
byte 1 | Reserved values. Not used. | x | x | x | x | x | x | x | x |
Connect Return Code | |||||||||
byte 2 | Return Code |
可变头部第一个字节为保留,无甚用处。第二个字节为连接握手返回码:
返回值 | 16进制 | 含义 |
0 | 0x00 | Connection Accepted |
1 | 0x01 | Connection Refused: unacceptable protocol version |
2 | 0x02 | Connection Refused: identifier rejected |
3 | 0x03 | Connection Refused: server unavailable |
4 | 0x04 | Connection Refused: bad user name or password |
5 | 0x05 | Connection Refused: not authorized |
6-255 | Reserved for future use |
只有0-5目前被使用到,其他值有待日后使用。一般返回值为0x00,表示连接建立。非法的请求,需要返回相应的数值。
从上面看出,一个CONNACT,四个字节表示。一个正常的CONNACT消息实际内容可能如下: 0x20 0x02 0x00 0x00
若是在私有协议中,两个字节就足够了。
很多时候,客户端和服务器端在没有消息传递时,会一直保持着连接。虽然不能依靠TCP心跳机制(比如SO_KEEPALIVE选项),业务层面定义心跳机制,会让连接状态检测、控制更为直观。
PINGREQ
由客户端发送到服务器端,证明自己还在一直连接着呢。两个字节,固定值。
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | |||||||||
byte 1 | Message type (12) | DUP flag | QoS flags | RETAIN | |||||
1 | 1 | 0 | 0 | x | x | x | x | ||
byte 2 | Remaining Length (0) | ||||||||
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
客户端会在一个心跳周期内发送一条PINGREQ消息到服务器端。
心跳频率在CONNECT可变头部“Keep Alive timer”中定义时间,单位为秒,无符号16位short表示。
PINGRESP
服务器收到PINGREQ请求之后,会立即响应一个两个字节固定格式的PINGRESP消息。
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | |||||||||
byte 1 | Message type (13) | DUP flag | QoS flags | RETAIN | |||||
1 | 1 | 0 | 1 | x | x | x | x | ||
byte 2 | Remaining Length (0) | ||||||||
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
服务器一般若在1.5倍的心跳周期内接收不到客户端发送的PINGREQ,可考虑关闭客户端的连接描述符。此时的关闭连接的行为和接收到客户端发送DISCONNECT消息的处理行为一致,但对客户端的订阅不会产生影响(不会清除客户端订阅数据),这个需要牢记。
若客户端发送PINGREQ之后的一个心跳周期内接收不到PINGRESP消息,可考虑关闭TCP/IP套接字连接。
DISCONNECT
客户端主动发送到服务器端,表明即将关闭TCP/IP连接。此时要求服务器要完整、干净的进行断开处理,不能仅仅类似于关闭连接描述符类似草草处理之。 需要两个字节,值固定:
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | |||||||||
byte 1 | Message type (14) | DUP flag | QoS flags | RETAIN | |||||
1 | 1 | 1 | 0 | x | x | x | x | ||
byte 2 | Remaining Length (0) | ||||||||
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
服务器要根据先前此客户端在发送CONNECT消息可变头部Connect flag中的“Clean session flag”所设置值,再次复习一下:
-
值为0,服务器必须在客户端断开之后继续存储/保持客户端的订阅状态。这些状态包括:
- 存储订阅的消息QoS1和QoS2消息
- 正在发送消息期间连接丢失导致发送失败的消息
- 以便当客户端重新连接时以上消息可以被重新传递。
-
值为1,服务器需要立刻清理连接状态数据。
有一点需要牢记,服务器在接收到客户端发送的DISCONNECT消息之后,需要主动关闭TCP/IP连接。
最后
以上就是可爱马里奥为你收集整理的MQTT-新一代物联网协议 四、MQTT协议笔记之连接和心跳的全部内容,希望文章能够帮你解决MQTT-新一代物联网协议 四、MQTT协议笔记之连接和心跳所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复