概述
前言
MQTT(Message Queue Telemetry Transport)遥测传输协议,提供订阅/发布模式,更为简约、轻量,易于使用,针对受限环
境(带宽低、网络延迟高、网络通信不稳定),可以简单概括为物联网打造,官方总结特点如下:
1.使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2.对负载内容屏蔽的消息传输。
3.使用 TCP/IP 提供网络连接。
4.有三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,
丢失一次读记录无所谓,因为不久后还会有第二次发送。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5. 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
6. 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
固定头部
固定头部,使用两个字节,共16位:
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | Message Type | DUP flag | QoS level | RETAIN | ||||
byte 2 | Remaining Length |
第一个字节(byte 1)
消息类型(4-7),使用4位二进制表示,可代表16种消息类型:
Mnemonic | Enumeration | Description |
---|---|---|
Reserved | 0 | Reserved |
CONNECT | 1 | Client request to connect to Server |
CONNACK | 2 | Connect Acknowledgment |
PUBLISH | 3 | Publish message |
PUBACK | 4 | Publish Acknowledgment |
PUBREC | 5 | Publish Received (assured delivery part 1) |
PUBREL | 6 | Publish Release (assured delivery part 2) |
PUBCOMP | 7 | Publish Complete (assured delivery part 3) |
SUBSCRIBE | 8 | Client Subscribe request |
SUBACK | 9 | Subscribe Acknowledgment |
UNSUBSCRIBE | 10 | Client Unsubscribe request |
UNSUBACK | 11 | Unsubscribe Acknowledgment |
PINGREQ | 12 | PING Request |
PINGRESP | 13 | PING Response |
DISCONNECT | 14 | Client is Disconnecting |
Reserved | 15 | Reserved |
除去0和15位置属于保留待用,共14种消息事件类型。
DUP flag(打开标志)
保证消息可靠传输,默认为0,只占用一个字节,表示第一次发送。不能用于检测消息重复发送等。
只适用于客户端或服务器端尝试重发PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,
注意需要满足以下条件:当QoS > 0,消息需要回复确认。此时,在可变头部需要包含消息ID。
当值为1时,表示当前消息先前已经被传送过。
QoS(Quality of Service,服务质量)
使用两个二进制表示PUBLISH类型消息:
QoS value | bit 2 | bit 1 | Description | ||
---|---|---|---|---|---|
0 | 0 | 0 | 至多一次 | 发完即丢弃 | <=1 |
1 | 0 | 1 | 至少一次 | 需要确认回复 | >=1 |
2 | 1 | 0 | 只有一次 | 需要确认回复 | =1 |
3 | 1 | 1 | 待用,保留位置 |
RETAIN(保持)
仅针对PUBLISH消息。不同值,不同含义:
1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,
并且以后新来的订阅了此Topic name的订阅者会马上得到推送。
备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。
0:仅仅为当前订阅者推送此消息。
假如服务器收到一个空消息体(zero-length payload)、RETAIN = 1、已存在Topic name的
PUBLISH消息,服务器可以删除掉对应的已被持久化的PUBLISH消息。
处理Remaining Length(剩余长度)
在当前消息中剩余的byte(字节)数,包含可变头部和负荷(称之为内容/body,更为合适)。
单个字节最大值:01111111,16进制:0x7F,10进制为127。
单个字节为什么不能是11111111(0xFF)呢?
因为MQTT协议规定,第八位(最高位)若为1,则表示还有后续字节存在。
同时MQTT协议最多允许4个字节表示剩余长度。
那么最大长度为:0xFF,0xFF,0xFF,0x7F,
二进制表示为:11111111,11111111,11111111,01111111,
十进制:268435455 byte=261120KB=256MB=0.25GB 四个字节之间值的范围:
Digits | From | To |
---|---|---|
1 | 0 (0x00) | 127 (0x7F) |
2 | 128 (0x80, 0x01) | 16 383 (0xFF, 0x7F) |
3 | 16 384 (0x80, 0x80, 0x01) | 2 097 151 (0xFF, 0xFF, 0x7F) |
4 | 2 097 152 (0x80, 0x80, 0x80, 0x01) | 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F) |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
可变头部
固定头部仅定义了消息类型和一些标志位,一些消息的原数据,需要放入可变头部中。
可变头部内容字节长度 + Payload字节长度 = 剩余长度。
可变头部,包含了协议名称,版本号,连接标志,用户授权,心跳时间等内容。
Payload/消息体/负荷
消息体主要是为配合固定/可变头部命令(比如CONNECT可变头部User name标记若为1,则需要在
消息体中附加用户名称字符串)而存在。
CONNECT/SUBSCRIBE/SUBACK/PUBLISH等消息有消息体。PUBLISH的消息体以二进制形式对待。
MQTT协议只允许在PUBLISH类型消息体中使用自定义特性。这也是为了协议免于流于形式,变得很分
裂也为了兼顾现有客户端等。比如支持压缩等,那就可以在Payload中定义数据支持,在应用中进行读取处理。
消息标识符/消息ID
固定头中的QoS level标志值为1或2时才会在:PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,
SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK等消息的可变头中出现。
一个16位无符号位的short类型值(值不能为 0,0做保留作为无效的消息ID),仅仅要求在一个特定方
向(服务器发往客户端为一个方向,客户端发送到服务器端为另一个方向)的通信消息中必须唯一。比
如客户端发往服务器,有可能存在服务器发往客户端会同时存在重复,但不碍事。
可变头部中,需要两个字节的顺序是MSB、LSB(最高有效位,最低有效位)。
最高有效位在最低有效位左边/上面,表示这是一个大端字节/网络字节序,高位在最左边。
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
Message Identifier MSB | ||||||||
Message Identifier LSB |
但凡如此表示的,都可以视为一个16位无符号short类型整数,两个字节表示。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
UTF-8编码
有关字符串,MQTT采用的是修改版的UTF-8编码,一般形式为如下,需要牢记:
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | String Length MSB | |||||||
byte 2 | String Length LSB | |||||||
bytes 3 ... | Encoded Character Data |
比如使用writeUTF()方法写入一串文字“OTWP”,头两个字节为一个完整的无符号数字,代表字符串字节长度,后面四个字节才是字符串真正的长度,共六个字节:
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | Message Length MSB (0x00) | |||||||
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 2 | Message Length LSB (0x04) | |||||||
0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 | |
byte 3 | 'O' (0x4F) | |||||||
0 | 1 | 0 | 0 | 1 | 1 | 1 | 1 | |
byte 4 | 'T' (0x54) | |||||||
0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 | |
byte 5 | 'W' (0x57) | |||||||
0 | 1 | 0 | 1 | 0 | 1 | 1 | 1 | |
byte 6 | 'P' (0x50) | |||||||
0 | 1 | 0 | 1 | 0 | 0 | 0 | 0 |
MQTT无论是可变头部还是消息体中,只要是字符串部分,都是采用了修改版的UTF-8编码。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
CONNECT
CONNECT可变头部中协议名称,消息体都是采用修改版的UTF-8编码。下面是一个较为完整的CONNECT消息结构:
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | ||
---|---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | ||||||||||
| Message Type(1) | DUP flag | QoS level | RETAIN | ||||||
byte 1 | 0 | 0 | 0 | 1 | x | x | x | x | ||
byte 2 | Remaining Length | |||||||||
Variable header/可变头部 | ||||||||||
Protocol Name 协议名称 | ||||||||||
byte 1 | Length MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 2 | Length LSB (6) | 0 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | |
byte 3 | 'M' | 0 | 1 | 0 | 0 | 1 | 1 | 0 | 1 | |
byte 4 | 'Q' | 0 | 1 | 0 | 1 | 0 | 0 | 0 | 1 | |
byte 5 | 'I' | 0 | 1 | 0 | 0 | 1 | 0 | 0 | 1 | |
byte 6 | 's' | 0 | 1 | 1 | 1 | 0 | 0 | 1 | 1 | |
byte 7 | 'd' | 0 | 1 | 1 | 0 | 0 | 1 | 0 | 0 | |
byte 8 | 'p' | 0 | 1 | 1 | 1 | 0 | 0 | 0 | 0 | |
Protocol Version Number 协议版本号 | ||||||||||
byte 9 | Version (3) | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 1 | |
Connect Flags 连接标志 | ||||||||||
| User Name Flag | Password Flag | Will Retain | Will QoS | Will Flag | Clean Session | Reserved | |||
byte 10 | 1 | 1 | 0 | 0 | 1 | 1 | 1 | x | ||
Keep Alive timer 心跳时间 | ||||||||||
byte 11 | Keep Alive MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 12 | Keep Alive LSB (10) | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 | |
Payload/消息体 | ||||||||||
Client Identifier(客户端ID) 1-23个字符长度,客户端到服务器的全局唯一标志,如果客户端ID超出23个字符长度,服务器需要返回码为2,标识符被拒绝响应的CONNACK消息。处理QoS级别1和2的消息ID中,可以使用到。必填项。 | ||||||||||
Will Topic Will Flag值为1,这里便是Will Topic的内容。QoS级别通过Will QoS字段定义,RETAIN值通过Will RETAIN标识,都定义在可变头里面。 | ||||||||||
Will Message Will Flag若设为1,这里便是Will Message定义消息的内容,对应的主题为Will Topic。如果客户端意外的断开触发服务器PUBLISH此消息。长度有可能为0。 在CONNECT消息中的Will Message是UTF-8编码的,当被服务器发布时则作为二进制的消息体。 | ||||||||||
User Name 如果设置User Name标识,可以在此读取用户名称。一般可用于身份验证。协议建议用户名为不多于12个字符,不是必须。 | ||||||||||
Password 如果设置Password标识,便可读取用户密码。建议密码为12个字符或者更少,但不是必须。 |
可变头部
协议名称和协议版本都是固定的。
连接标志(Connect Flags)
一个字节表示,除了第1位是保留未使用,其它7位都具有不同含义。业务上很重要,对消息总体流程影响很大,需要牢记。
Clean Session
0,表示如果订阅的客户机断线了,要保存为其要推送的消息(QoS为1和QoS为2),若其重新连接时,需将这些消息推
送(若客户端长时间不连接,需要设置一个过期值)。
1,断线服务器即清理相关信息,重新连接上来之后,会再次订阅。
Will Flag
定义了客户端(没有主动发送DISCONNECT消息)出现网络异常导致连接中断的情况下,服务器需要做的一些措施。
简而言之,就是客户端预先定义好,在自己异常断开的情况下,所留下的最后遗愿(Last Will),也称之为遗嘱(Testament)。
这个遗嘱就是一个由客户端预先定义好的主题和对应消息,附加在CONNECT的可变头部中,在客户端连接出现异常的
情况下,由服务器主动发布此消息。
只有在Will Flag位为1时,Will Qos和Will Retain才会被读取,此时消息体payload中要出现Will Topic和Will Message具
体内容,否则,Will QoS和Will Retain值会被忽略掉。
Will Qos
两位表示,和PUBLISH消息固定头部的QoS level含义一样。若标识Will Flag值为1,那么Will QoS就会生效,否则被忽略。
Will RETAIN
如果设置Will Flag,Will Retain标志就是有效的,否则它将被忽略。
当客户端意外断开服务器发布其Will Message之后,服务器是否应该继续保存。这个属性和PUBLISH固定头部的RETAIN标志含义一样,这里先掠过。
User name 和 password Flag:
用于授权,两者要么为0要么为1,否则都是无效。都为0,表示客户端可自由连接/订阅,都为1,表示连接/订阅需要授权。
Payload/消息体
消息体定义的消息顺序(如上表所示),约定俗成,不得更改,否则将可能引起混乱。
若Will Flag值为0,那么在payload中,Client Identifer后面就不会存在Will Topic和Will Message内容。
若User Name和Password都为0,意味着Payload/消息体中,找不到User Name和password的值,就算有,也是无效。标志决定着是否读取与否。
心跳时间(Keep Alive timer)
以秒为单位,定义服务器端从客户端接收消息的最大时间间隔。
一般应用服务会在业务层次检测客户端网络是否连接,不是TCP/IP协议层面的心跳机制(比如开启SOCKET的SO_KEEPALIVE选项)。
一般来讲,在一个心跳间隔内,客户端发送一个PINGREQ消息到服务器,服务器返回PINGRESP消息,完成一次心跳交互,继而
等待下一轮。若客户端没有收到心跳反馈,会关闭掉TCP/IP端口连接,离线。
16位两个字节,可看做一个无符号的short类型值。最大值,2^16-1 = 65535秒 = 18小时。最小值可以为0,表示客户端不断开。
一般设为几分钟,比如微信心跳周期为300秒。
Will Message编码
Will Message在CONNECT Payload中,使用UTF-8编码。假设内容为“abcd”,大概如下:
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
byte 1 | Length MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 2 | Length LSB (4) | 0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 |
byte 3 | 'a' (0x61) | 0 | 1 | 1 | 0 | 0 | 0 | 0 | 1 |
byte 4 | 'b' (0x62) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 0 |
byte 5 | 'c' (0x63) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 1 |
byte 6 | 'd' (0x64) | 0 | 1 | 1 | 0 | 0 | 1 | 0 | 0 |
有一点需要记住,PUBLISH的Payload/消息体中以二进制编码保存。
某刻客户端异常关闭触发服务器会PUBLISH此消息。服务器会直接把byte3-byte6之间字符取出,保存为二进制,附加到PUBLISH消息体中,大概存储如下:
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
byte 1 | 'a' (0x61) | 0 | 1 | 1 | 0 | 0 | 0 | 0 | 1 |
byte 2 | 'b' (0x62) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 0 |
byte 3 | 'c' (0x63) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 1 |
byte 4 | 'd' (0x64) | 0 | 1 | 1 | 0 | 0 | 1 | 0 | 0 |
连接异常中断通知机制
CONNECT消息一旦设置在可变头部设置了Will flag标记,那就启用了Last-Will-And-Testament特性。
一旦客户端出现异常中断,便会触发服务器发布Will Message消息到Will Topic主题上去,通知Will Topic订阅者,对方因异常退出。
接收CONNECT后的响应动作
接收到CONNECT消息之后,服务器应该返回一个CONNACK消息作为响应:
- 若客户端绕过CONNECT消息直接发送其它类型消息,服务器应关闭此非法连接 。
- 若客户端发送CONNECT之后未收到CONNACT,需要关闭当前连接,然后重新连接。
- 相同Client ID客户端已连接到服务器,先前客户端必须断开连接后,服务器才能完成新的客户端CONNECT连接 。
- 客户端发送无效非法CONNECT消息,服务器需要关闭。
CONNACK
一个完整的CONNACK消息大致如下:
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | |||||||||
byte 1 | Message type (2) | DUP flag | QoS flags | RETAIN | |||||
0 | 0 | 1 | 0 | x | x | x | x | ||
byte 2 | Remaining Length (2) | ||||||||
0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 | ||
Variable header/可变头部 | |||||||||
Topic Name Compression Response | |||||||||
byte 1 | Reserved values. Not used. | x | x | x | x | x | x | x | x |
Connect Return Code | |||||||||
byte 2 | Return Code |
可变头部第一个字节为保留。第二个字节为连接握手返回码:
返回值 | 16进制 | 含义 |
0 | 0x00 | Connection Accepted |
1 | 0x01 | Connection Refused: unacceptable protocol version |
2 | 0x02 | Connection Refused: identifier rejected |
3 | 0x03 | Connection Refused: server unavailable |
4 | 0x04 | Connection Refused: bad user name or password |
5 | 0x05 | Connection Refused: not authorized |
6-255 | Reserved for future use |
只有0-5目前被使用到,其他值有待日后使用。一般返回值为0x00,表示连接建立。非法的请求,需要返回相应的数值。
从上面看出,一个CONNACT,四个字节表示。一个正常的CONNACT消息实际内容可能如下: 0x20 0x02 0x00 0x00
很多时候,客户端和服务器端在没有消息传递时,会一直保持着连接。
虽然不能依靠TCP心跳机制(比如SO_KEEPALIVE选项),业务层面定义心跳机制,会让连接状态检测、控制更为直观。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
PINGREQ
由客户端发送到服务器端,证明自己还在一直连接着。两个字节,固定值。
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | |||||||||
byte 1 | Message type (12) | DUP flag | QoS flags | RETAIN | |||||
1 | 1 | 0 | 0 | x | x | x | x | ||
byte 2 | Remaining Length (0) | ||||||||
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
客户端会在一个心跳周期内发送一条PINGREQ消息到服务器端。
心跳频率在CONNECT可变头部“Keep Alive timer”中定义时间,单位为秒,无符号16位short表示。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
PINGRESP
服务器收到PINGREQ请求之后,会立即响应一个两个字节固定格式的PINGRESP消息。
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | |||||||||
byte 1 | Message type (13) | DUP flag | QoS flags | RETAIN | |||||
1 | 1 | 0 | 1 | x | x | x | x | ||
byte 2 | Remaining Length (0) | ||||||||
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
服务器一般若在1.5倍的心跳周期内接收不到客户端发送的PINGREQ,可考虑关闭客户端的连接描述符。
此时的关闭连接的行为和接收到客户端发送DISCONNECT消息的处理行为一致,但对客户端的订阅不会产生影响(不会清除客户端订阅数据),这个需要牢记。
若客户端发送PINGREQ之后的一个心跳周期内接收不到PINGRESP消息,可考虑关闭TCP/IP套接字连接。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
DISCONNECT
客户端主动发送到服务器端,表明即将关闭TCP/IP连接。
此时要求服务器要完整、干净的进行断开处理,不能仅仅类似于关闭连接描述符类似草草处理之。 需要两个字节,值固定:
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | |||||||||
byte 1 | Message type (14) | DUP flag | QoS flags | RETAIN | |||||
1 | 1 | 1 | 0 | x | x | x | x | ||
byte 2 | Remaining Length (0) | ||||||||
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
服务器要根据先前此客户端在发送CONNECT消息可变头部Connect flag中的“Clean session flag”所设置值:
-
值为0,服务器必须在客户端断开之后继续存储/保持客户端的订阅状态。这些状态包括:
- 存储订阅的消息QoS1和QoS2消息
- 正在发送消息期间连接丢失导致发送失败的消息,以便当客户端重新连接时以上消息可以被重新传递。
-
值为1,服务器需要立刻清理连接状态数据。
有一点需要牢记,服务器在接收到客户端发送的DISCONNECT消息之后,需要主动关闭TCP/IP连接。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
PUBLISH
客户端发布消息经由服务器分发到所有对应的订阅者那里。
一个订阅者可以订阅若干个主题(Topic name),但一个PUBLISH消息只能拥有一个主题。
消息架构一览:
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | ||
---|---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | ||||||||||
byte 1 | Message Type(3) | DUP flag | QoS level | RETAIN | ||||||
0 | 0 | 1 | 1 | 0 | 0 | 1 | 0 | |||
byte 2 | Remaining Length | |||||||||
Variable header/可变头部 | ||||||||||
Topic name | ||||||||||
byte 1 | Length MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 2 | Length LSB (3) | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 1 | |
byte 3 | 'a' (0x61) | 0 | 1 | 1 | 0 | 0 | 0 | 0 | 1 | |
byte 4 | '/' (0x2F) | 0 | 0 | 1 | 0 | 1 | 1 | 1 | 1 | |
byte 5 | 'b' (0x62) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 0 | |
Message Identifier | ||||||||||
byte 6 | Message ID MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 7 | Message ID LSB (10) | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 | |
Payload/消息体 | ||||||||||
BLOB,二进制对象形式。二进制具体包含的内容和格式,可有应用程序自身定义。若消息体为空(0长度)也是可能的。 |
固定头部
DUP flag,设为0,表示当前为第一次发送。
RETAIN flag,只有在PUBLISH消息中才有效。
- 1:表示发送的消息需要一直持久保存,不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。
- 备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送,不是所有。
- 0:仅仅为当前订阅者推送此消息。
可变头部
Topic name,UTF-8编码字符串形式,不支持通配符!
消息体
一般作为UTF-8编码写入接口,但不排除自定义的消息格式。
空的消息体(zero-length)的PUBLISH消息也可以是合法的。
当服务器接收到空消息体(zero-length payload)、retain = 1、具有topic name的一个PUBLISH特殊消息,
表示同时满足retain = 1、相同topic name的这两个特征的被持久化PUBLISH消息,可被删除。
Response/响应
固定头部QoS level决定了消息中间件针对发布者具体需要响应的内容:
QoS Level | Expected response |
QoS 0 | None |
QoS 1 | PUBACK |
QoS 2 | PUBREC |
备注:仅仅针对发布PUBLISH消息的发布者。
Actions:
无论是订阅者还是服务器接收到PUBLISH消息之后,需要根据QoS level执行不同动作。
QoS Level | Expected Action |
QoS 0 | 发送到所有感兴趣者 |
QoS 1 | 持久化记录下来,发送到所有感兴趣的参与者,返回一个PUBACK消息给发送者 |
QoS 2 | 持久化记录下来,暂时不发送所有感兴趣的参与者,返回一个PUBREC消息给发送者 |
如果服务器收到PUBLISH消息,参与者指的是订阅者。
如果订阅者收到PUBLISH消息,参与者就是服务器。
需要注意:
- 发布者发布的PUBLISH消息发送到服务器,在payload/消息体处可能夹带有私货,可能含有自定义的数据格式
- 若兼容MQTT客户端,经由服务器分发到所有对应订阅者处只能是规规矩矩的PUBLISH消息,并且固定头部的RETAIN标志不能被设置成有效值1
授权
未经授权的发布者提交的PUBLISH消息,服务器会忽略掉,客户端不会被通知。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
PUBACK
作为订阅者/服务器接收(QoS level = 1)PUBLISH消息之后对发送者的响应,整个消息不复杂。
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | |||||||||
byte 1 | Message type (4) | DUP flag | QoS flags | RETAIN | |||||
0 | 1 | 0 | 0 | x | x | x | x | ||
byte 2 | Remaining Length (2) | ||||||||
0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 | ||
Variable header/可变头部 | |||||||||
Message Identifier | |||||||||
byte 1 | Message ID MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 2 | Message ID LSB (10) | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 |
虽没有消息体,但可变头部附加一个16位的无符号short类型。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
PUBREC
Assured publish received,作为订阅者/服务器对QoS level = 2的发布PUBLISH消息的发送方的响应,确认已经收到,
为QoS level = 2消息流的第二个消息。 和PUBACK相比,除了消息类型不同外,其它都是一样。
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | |||||||||
byte 1 | Message type (5) | DUP flag | QoS flags | RETAIN | |||||
0 | 1 | 0 | 1 | x | x | x | x | ||
byte 2 | Remaining Length (2) | ||||||||
0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 | ||
Variable header/可变头部 | |||||||||
Message Identifier | |||||||||
byte 1 | Message ID MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 2 | Message ID LSB (10) | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 |
无论是订阅者还是服务器,在收到PUBREC消息之后需要发送一个PUBREL消息给发送者(和PUBREC具有同样的消息ID),确认已收到。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
PUBREL
Qos level = 2的协议流的第三个消息,有PUBLISH消息的发布者发送,参与方接收。完整示范如下:
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | |||||||||
byte 1 | Message type (6) | DUP flag | QoS flags | RETAIN | |||||
0 | 1 | 1 | 0 | 0 | 0 | 1 | x | ||
byte 2 | Remaining Length (2) | ||||||||
0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 | ||
Variable header/可变头部 | |||||||||
Message Identifier | |||||||||
byte 1 | Message ID MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 2 | Message ID LSB (10) | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 |
QoS level 1,PUBREL消息要求如此。
DUP flag 为0,表示消息第一次被发送。
可变头部中,消息ID和发布者接收到的PUBREC所包含的消息ID是一致的。
动作:
1.服务器接收到发布者(a)的PUBREL消息,此时服务器让发布者(a)刚才发布PUBLISH消息可用,
发送此PUBLISH消息给所有订阅此主题的订阅者,然后发送PUBCOMP消息给发布者(a)。
2.可变头部包含消息ID和服务器接收的PUBREL消息ID是一致的。
一个订阅者接收到PUBREL消息,订阅者使PUBLISH消息可用,然后反馈一个PUBCOMP消息给服务器
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
PUBCOMP
作为Qo个S level = 2消息流第四个,也是最后一消息,由收到PUBREL的一方向另一方做出的响应消息。
完整的消息一览,和PUBREL一致,除了消息类型。
Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | |||||||||
byte 1 | Message type (7) | DUP flag | QoS flags | RETAIN | |||||
0 | 1 | 1 | 1 | x | x | x | x | ||
byte 2 | Remaining Length (2) | ||||||||
0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 | ||
Variable header/可变头部 | |||||||||
Message Identifier | |||||||||
byte 1 | Message ID MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 2 | Message ID LSB (10) | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 |
当客户端接收一个PUBCOMP消息时,客户端摒弃原来的消息,因为它已经成功发送消息到服务器。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
小结
消息的发布和确认等一些流程,主要是跟消息发布者所设定的QoS level有关;
稍加整理,绘制了下面一张图,理解起来可能会更清晰些:
上图针对的是客户端发布消息到服务器端的方向。
为了确保消息已经成功传递过去,只有收到了确认,才会让人特别放心。
在QoS level = 2时,通信双方都需要知道各自的确认流程以及所处阶段等,交互很多,数据量大的情况下,可能会造成数据线路传递拥塞。
服务器选择QoS = 0/1,大部分情况都是可以应对的。 比如重要消息,就要确保对方都要收到,然后彼此确认,OK,这个消息是真实、有效的。
无论Qos level为0、1,还是2,服务器(具备所有条件都满足之后)总要把收到的具体内容和topic组装成一个新的PUBLISH Message(也
不一定要重新构造,但要求推送的PUBLISH消息,一定要具有明确的主题和内容,RETAIN标志不能设置为1)推送到所有感兴趣的订阅者。
若是设置了Will flag标记,消息的发布来源还有可能来自CONNECT消息中的Will Topic和Will Message。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
前面已把所有消息类型讲了一遍,这里从消息流的角度解读一下。
网络故障
在任何网络环境下,都会出现一方连接失败,比如离开公司大门那一刻没有了WIFI信号。但持续连接的另一端-服务器可能不能立即知道对方已断开。
类似网络异常情况,都有可能在消息发送的过程中出现,消息发送出去,就丢失了。
MQTT协议假定客户端和服务器端稳定情况一般,彼此之通信管道不可靠,一旦客户端网络断开,情况就会很严重,很难恢复原状。
但别忘记,很多客户端会有永久性存储设备支持,比如闪存ROM、存储卡等,在通信出现异常的情况下可以用于保存关键数据或状态信息等。
总之,异常网络情况很复杂,只能小心处理之。
消息重发策略
在QoS > 0情况下,PUBLISH、PUBREL、SUBSCRIBE、UNSUBSCRIBE等类型消息在发送者发送完之后,需要等待一个响应消息,
若在一个指定时间段内没有收到,发送者可能需要重试。重发的消息,要求DUP标记要设置为1。等待响应的超时应该在消息成功发送之
后开始算起,并且等待超时应该是可以配置选项,以便在下一次重试的时候,适当加大。比如第一次重试超时10秒,下一次可能为20秒,
再一次重试可能为60秒呢。当然,还要有一个重试次数限制的。
还有一种情况,客户端重新连接,但未在可变头部中设置clean session标记,但双方(客户端和服务器端)都应该重试先前未发送的动态
消息(in-flight messages)。客户端不被强制要求发送未被确认的消息,但服务器端就得需要重发那些未被去确认的消息。
QoS level决定的消息流
QoS level为Quality of Service level的缩写,服务质量等级。
MQTT 3.1协议在"4.1 Quality of Service levels and flows"章节中,仅仅讨论了客户端到服务器的发布流程,不太完整。
因为决定消息到达率,能够提升发送质量的,应该是服务器发布PUBLISH消息到订阅者这一消息流方向。
QoS level 0
至多发送一次,发送即丢弃。没有确认消息,也不知道对方是否收到。
Client | Message and direction | Server |
---|---|---|
QoS = 0 | PUBLISH ----------> | Action: Publish message to subscribers then Forget Reception: <=1 |
针对的消息不重要,丢失也无所谓。网络层面,传输压力小。
QoS level 1
所有QoS level 1都要在可变头部中附加一个16位的消息ID。
SUBSCRIBE和UNSUBSCRIBE消息使用QoS level 1。
针对消息的发布,Qos level 1,意味着消息至少被传输一次。
发送者若在一段时间内接收不到PUBACK消息,发送者需要打开DUB标记为1,然后重新发送PUBLISH消息。因此会导致接收方可能会
收到两次PUBLISH消息。
针对客户端发布消息到服务器的消息流:
Client | Message and direction | Server |
---|---|---|
QoS = 1 DUP = 0 Message ID = x Action: Store message | PUBLISH ----------> |
Actions:
Reception: >=1
|
Action: Discard message | PUBACK <---------- | Message ID = x |
针对服务器发布到订阅者的消息流:
Server | Message and direction | Subscriber |
---|---|---|
QoS = 1 DUP = 0 Message ID = x | PUBLISH ----------> |
Actions:
Reception: >=1
|
| PUBACK <---------- | Message ID = x |
发布者(客户端/服务器)若因种种异常接收不到PUBACK消息,会再次重新发送PUBLISH消息,同时设置DUP标记为1。
接收者以服务器为例,这可能会导致服务器收到重复消息,按照流程,broker(服务器)发布消息到订阅者(会导致订阅者接收到重复消息),
然后发送一条PUBACK确认消息到发布者。
在业务层面,或许可以弥补MQTT协议的不足之处:重试的消息ID一定要一致接收方一定判断当前接收的消息ID是否已经接收过。
但一样不能够完全确保,消息一定到达了。
QoS level 2
仅仅在PUBLISH类型消息中出现,要求在可变头部中要附加消息ID。
级别高,通信压力稍大些,但确保了仅仅传输接收一次。
先看协议中流程图:
Subscriber->Server :
Client | Message and direction | Server |
---|---|---|
QoS = 2 DUP = 0 Message ID = x Action: Store message | PUBLISH ----------> | Action(a) Store message or Actions(b):
|
PUBREC <---------- | Message ID = x | |
Message ID = x | PUBREL ----------> | Actions(a):
or Action(b): Delete message ID |
Action: Discard message | PUBCOMP <---------- | Message ID = x |
Server -> Subscriber:
Server | Message and direction | Subscriber |
---|---|---|
QoS = 2 DUP = 0 Message ID = x | PUBLISH ----------> | Action: Store message |
PUBREC <---------- | Message ID = x | |
Message ID = x | PUBREL ----------> | Actions:
|
| PUBCOMP <---------- | Message ID = x |
Server端采取的方案a和b,都包含了何时消息有效,何时处理消息。两个方案二选一,Server端自己决定。
但无论采取哪一种方式,都是在QoS level 2协议范畴下,不受影响。
若一方没有接收到对应的确认消息,会从最近一次需要确认的消息重试,以便整个(QoS level 2)流程打通。
消息顺序
消息顺序会受许多因素的影响,但对于服务器程序,必须保证消息传递流程的每个阶段要和开始的顺序一致。
例如,在QoS level 2定义的消息流中,PUBREL流必须和PUBLISH流具有相同的顺序发送:
Client | Message and direction | Server |
---|---|---|
PUBLISH 1----------> PUBLISH 2 ----------> PUBLISH 3 ----------> | ||
PUBREC 1<---------- PUBREC 2 <---------- | ||
PUBREL 1----------> | ||
PUBREC 3<---------- | ||
PUBREL 2----------> | ||
PUBCOMP 1<---------- | ||
PUBREL 3----------> | ||
PUBCOMP 2<---------- PUBCOMP 3 <---------- |
流动消息(in-flight messages)数量允许有一个可保证的效果:
- 在流动消息(in-flight)窗口1中,每个传递流在下一个流开始之前完成,这保证消息以提交的顺序传递。
- 在流动消息(in-flight)大于1的窗口,只能在QoS level内被保证消息的顺序。
消息的持久化
在MQTT协议中,PUBLISH消息固定头部RETAIN标记,只有为1才要求服务器需要持久保存此消息,除非新的PUBLISH覆盖。
对于持久的、最新一条PUBLISH消息,服务器不但要发送给当前的订阅者,并且新的订阅者(同样需要订阅了此消息对应的Topic name)会马上得到推送。
Tip:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送,不是所有。
消息流的编码/解码
MQTT协议中,由目前定义的14种类型消息在客户端和服务器端之间数据进行交互。
若以JAVA语言构建MQTT服务器,可选择Netty作为基础。
在Netty中,数据的进入和流出,代表了一次完整的交互。
无论是要进入的还是要流出的数据(单独以服务器为例),都可看做字节流。
若把每种类型消息抽象为一个具体对象,那么处理起来就不难了。
客户端->服务器,进入的字节流,逐个字节/单位读取,可还原成一个具体的消息对象(解码的过程)。
要发送到客户端的消息对象,转换(编码)成字节流,然后由TCP通道流转到接收者。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SUBSCRIBE
一般来讲,客户端在成功建立TCP连接之后,发送CONNECT消息,在得到服务器端授权允许建立彼此连接的CONNACK消息之后,
客户端会发送SUBSCRIBE消息,订阅感兴趣的Topic主题列表(至少一个主题),一个完整示范如下:
| Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | ||||||||||
byte 1 | Message Type(8) | DUP flag | QoS level | RETAIN | ||||||
1 | 0 | 0 | 0 | 0 | 0 | 1 | x | |||
byte 2 | Remaining Length | |||||||||
Variable header/可变头部 | ||||||||||
Message Identifier | ||||||||||
byte 1 | Message ID MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 2 | Message ID LSB (10) | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 | |
Playload/消息体 | ||||||||||
Topic name | ||||||||||
byte 1 | Length MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 2 | Length LSB (3) | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 1 | |
byte 3 | 'a' (0x61) | 0 | 1 | 1 | 0 | 0 | 0 | 0 | 1 | |
byte 4 | '/' (0x2F) | 0 | 0 | 1 | 0 | 1 | 1 | 1 | 1 | |
byte 5 | 'b' (0x62) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 0 | |
Requested QoS | ||||||||||
byte 6 | Requested QoS (1) | x | x | x | x | x | x | 0 | 1 | |
Topic Name | ||||||||||
byte 7 | Length MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 8 | Length LSB (3) | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 1 | |
byte 9 | 'c' (0x63) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 1 | |
byte 10 | '/' (0x2F) | 0 | 0 | 1 | 0 | 1 | 1 | 1 | 1 | |
byte 11 | 'd' (0x64) | 0 | 1 | 1 | 0 | 0 | 1 | 0 | 0 | |
Requested QoS | ||||||||||
byte 12 | Requested QoS (2) | x | x | x | x | x | x | 1 | 0 |
固定头部
Qos Level,可根据实际情况进行调整为0/1/2等。一般设为0表示最多一次。客户端可设置OoS Level值。
DUP flag,值为0表示第一次发送。
可变头部
因为上面示范QoS level值为1,因此需要客户端传递消息ID,16位,无符号的short类型。
消息体
订阅的主题名称采用修改版UTF-8编码,然后紧跟着对应的QoS值。下面的次序,可能更为形象:
Topic name | "a/b" |
Requested QoS | 1 |
Topic name | "c/d" |
Requested QoS | 2 |
订阅者的Topic name支持通配符#和+ :
- #支持一个主题内任意级别话题
- +只匹配一个主题级别的通配符
例如:
finance/stock/#
finance/sotkc/ibm/+
都是有效,更具体规则,请参阅协议附加部分。
在服务器接收处理时,按照顺序读取即可:
String topicName = readUTF();
int qosVal = read();
服务器可以发送QoS不大于客户端设置OoS的消息,尤其是服务器不提供良好的持久化机制的时候。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SUBACK
服务器会对发出SUBSCRIBE的消息返回一个确认消息。
| Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | |||||||||
byte 1 | Message type (9) | DUP flag | QoS flags | RETAIN | |||||
1 | 0 | 0 | 1 | x | x | x | x | ||
byte 2 | Remaining Length | ||||||||
Variable header/可变头部 | |||||||||
Message Identifier | |||||||||
byte 1 | Message ID MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 2 | Message ID LSB (10) | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 |
Playload/消息体 | |||||||||
byte 1 | Granted QoS (0) | x | x | x | x | x | x | 0 | 0 |
byte 1 | Granted QoS (2) | x | x | x | x | x | x | 1 | 0 |
可变头部
Message Identifier,服务器需要附加,客户端需要处理。
消息体
QoS,为服务器根据实际情况授予的QoS级别列表,和客户端发送的SUBSCRIBE的订阅Topic Name顺序完全一致。
客户端订阅几个TOPIC,服务器端一一给出各个TOPIC的QoS具体值。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
UNSUBSCRIBE
服务器需要支持客户端取消订阅功能,UNSUBSCRIBE消息格式和SUBSCRIBE消息格式差不多,除了消息类型不同,消息体中没有了QoS字节,其它没有区别。
| Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | ||||||||||
byte 1 | Message Type(10) | DUP flag | QoS level | RETAIN | ||||||
1 | 0 | 1 | 0 | 0 | 0 | 1 | x | |||
byte 2 | Remaining Length | |||||||||
Variable header/可变头部 | ||||||||||
Message Identifier | ||||||||||
byte 1 | Message ID MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 2 | Message ID LSB (10) | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 | |
Playload/消息体 | ||||||||||
Topic name | ||||||||||
byte 1 | Length MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 2 | Length LSB (3) | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 1 | |
byte 3 | 'a' (0x61) | 0 | 1 | 1 | 0 | 0 | 0 | 0 | 1 | |
byte 4 | '/' (0x2F) | 0 | 0 | 1 | 0 | 1 | 1 | 1 | 1 | |
byte 5 | 'b' (0x62) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 0 | |
Topic Name | ||||||||||
byte 6 | Length MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 7 | Length LSB (3) | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 1 | |
byte 8 | 'c' (0x63) | 0 | 1 | 1 | 0 | 0 | 0 | 1 | 1 | |
byte 9 | '/' (0x2F) | 0 | 0 | 1 | 0 | 1 | 1 | 1 | 1 | |
byte 10 | 'd' (0x64) | 0 | 1 | 1 | 0 | 0 | 1 | 0 | 0 |
可变头部的消息ID的出现还是由固定头部的QoS Level(1)决定是否存在。
一般来讲,客户端发布退订,服务器端需要返回退订确认。MQTT没讲是否允许客户端退订所有TOPIC。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
UNSUBACK
服务器返回的UNSUBSCRIBE消息UNSUBACK相应很简单,没有消息体。
| Description | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|---|
Fixed header/固定头部 | |||||||||
byte 1 | Message type (9) | DUP flag | QoS flags | RETAIN | |||||
1 | 0 | 1 | 1 | x | x | x | x | ||
byte 2 | Remaining length (2) | ||||||||
0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 | ||
Variable header/可变头部 | |||||||||
Message Identifier | |||||||||
byte 1 | Message ID MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 2 | Message ID LSB (10) | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 |
小结
订阅部分,共有四个消息,分别一一对应。
命令 | 响应 | 备注 | 建议 |
---|---|---|---|
SUBSCRIBE | SUBACK | 协议没有涉及最多运行订阅TOPIC数目,隐藏的隐患 | 建议至多10个 |
UNSUBSCRIBE | UNSUBACK | 是否可以退订所有订阅,不详 | 建议保留至少一个Topic |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
MQTT 3.1协议在弱网络环境下(比如2G/3G等)表现不够好,因此才有了反思。
弱网环境下表现
手机等终端在弱网络环境下丢包情况会非常明显,连接MQTT Server成功率很低。相比单纯的请求-响应模型的HTTP,其成功率会比MQTT订阅成功高很多。
手机终端在每次TCP断开或断网后,会即刻发起TCP重连,连接成功,会重复以前步骤依次发送连接命令(CONNECT),订阅命令(SUBSCRIBLE),
表面上看,这些过程没有任何问题,但问题就在于从终端成功建立到服务器的连接,到发送订阅命令,在弱网情况下,这个过程将会变得很昂贵:
从TCP建立开始的三次握手到完整的订阅命令发送完毕,考虑到TCP堆栈的每次接收数据方响应ACK,这中间终端和服务器端至少产生了10次数据交互。
在网络变化频繁或者不太稳定的2G/3G网络环境下,这种过程显得有些冗长和不适应,同时会加重已经不堪的弱网络负载的负担。
弱网下,在任何一个阶段的执行过程中,都有可能产生突发性的网络中断的问题:
-
无法成功建立TCP链接,或死在三次握手期间,或数据包丢失在握手之后,或客户端连接超时过小。
-
建立连接后,发送CONNECT命令后,或没接收到TCP ACK确认包,或客户端等待延时太小,导致订阅命令交互失败。
-
发送SUBSCRIBLE命令后,但服务器端没收到,或因为丢包,或网络已断开,导致发送SUBSCRIBLE命令失败。
-
成功发送SUBSCRIBLE命令后,或移动网络断开了(有些运营商针对认为HTTP的请求有超时判断),或等待超时,导致订阅失败。
TCP是无感知的虚拟连接,中间断开两端不会立刻得到通知,否则就用不着心跳保活机制了。
举一个例子,线上的服务器根据日志分析,只接收到连接命令(CONNECT)但没有后续的订阅命令(SUBSCRIBLE)的情况,每天有上百万级别的数量。
总之,针对低速率弱网络环境,MQTT表现不怎么好。
改进点
业务改进点:
1.客户端的连接超时、等待超时设大一点,两秒太短,可设置长一些,比如10秒。
2.服务器端支持在接收到用户发送CONNECT命令后,瞬间发送一些live data/hot data(早已缓存的数据),类似于HTTP请求-相应模型,
一些热数据发送给终端要趁早,越快越好;这个需要客户端、服务器端同时支持。
协议改进点:
1.CONNECT命令可变头部包含"MQisdp"太多余了,学院派风格嘛。
2.允许在连接命令中负载(payload)中携带订阅Topic字符串。
3.允许在连接命令中表示上次连接订阅的Topic发生变化否,携带订阅业务,虽冗余,但实用。
例如:订阅的Topic没有发生变化,TOPICCHANGE:0;
退订,UNSUBSCRIBE:TOPICONE;
SUBSCRIBE:TOPIC_TWO
4.PUBLISH、PUBACK等支持的 Message Identifier 才16位,太短,实际业务无法做到全局唯一。
引入mid和业务id的映射对应关系?
那是状态,需要维护,代价还是蛮高的。
业界流行看法,无状态化的架构才是便于横向、竖向、纵向、四方向的扩展,
最好方式就是修改使之支持字符串形式,否则维护代价高!
5.心跳命令PINGREQ/PINGREQ可以做到一个字节传输,节省一个字节。
6.低速率网络需要做一些兼容和调整
有些建议看似冗余,批量或打包处理总比单个处理更高效一些、更节省资源,弱网络环境要求交互要尽可能的少,数据嘛要的是瞬间抵达,越快越好。
严格的分层和业务解耦,会导致性能问题。好比当前Linux内核的TCP/IP网络堆栈分层很清晰,每一层都各司其职,但和直接略过内核态直接运行在
用户态(User Space)的Packet I/O相比,处理性能不是在一个档次上,比如Netmap 、DPDK等。
MQTT-SN
针对没有TCP/IP等网络堆栈支持的终端环境,MQTT爱莫能助。
在一些类似于传感器电子元件中,资源十分受限,计算能力不足,嵌入TCP/IP网络堆栈不现实,比较好的方式基于IEEE 802.15.4用于低速无线
个人域网(LR-WPAN)的物理层和媒体接入控制层规范之上发送UDP数据包,每一个数据包最大128个字节。
MQTT-SN(MQTT For Sensor Networks)协议就是为了非常受限类似传感器而设的,协议流程架构比较有趣:
TCP不是最适合的移动网络传输协议
先来算一下网络传输的字节数。
以太网帧头至少18个字节,IP头固定20个字节,TCP头20个字节(UDP头部8个字节),再加上电信宽带计费的PPPoE的8个字节:
- TCP数据包头部信息至少占有66个字节
- UDP数据报头部信息至少占有54个字节
UDP可以比TCP节省12个字节。MQTT-SN协议选择使用UDP,可以看出其在节省资源方面的努力。
再看看弱网环境。
- 在网络可达情况下,UDP可以在TCP建立第一次握手期间就已经把数据送达目的地。
- 完成三次握手期间,UDP客户端和UDP服务器在数据层面可以完成一次完整的交互(PING-PONG)。
在网络不好的情况下,UDP的时效性会好于TCP,TCP长连接中间交换过多、使之建立完整交互的过程成功率就很低。
此种情况UDP的低延迟和实时性呈现的结果会表现的很突出。
TCP或HTTP理论上是可靠连接,但是在网络不好的时候,也不是那么可靠。
客户端一般提交HTTP请求之后,没有确认是否提交成功,在弱网环境下会产生丢包,服务器端嘛收不到。
另TCP网络堆栈会存在数据包重发机制 + 应用层重发请求,可能会导致内核处理多次数据包的重发(还有拥塞窗口会收缩,发包速度减慢),
可能会加重弱网络的负载。
和TCP相比,UDP的无连接,代表了它快速,资源消耗小,突出表现就是延迟较小。
至于数据包丢失没有重传,上层的业务层面应用协议/机制可以确保丢失的数据包重发或补发等,并且会更透明,安全的控性权。
而TCP的包重发,上层应用没有控制权限。
连接协议方面:
- TCP面向连接会产生状态管理和维护,成本不小,比如经常看到的客户端reset异常等。一次完整的请求周期必须固定在一台服务器上。
- UDP无连接的特性。每次请求的数据包可以随机分配到不同的机器上进行处理,可以做到完全无状态化横向扩展。
总之,要实时性特诊,或者快速抵达终端的特性,不妨考虑一下UDP。不过呢,很多时候UDP和TCP大家会混合着使用,会互相弥补其不足。
小结
若MQTT协议不能够满足业务需求,或许可考虑选择定制,或简化流程,或使用UDP重新实现,或者使用TCP/HTTP作为补充等。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
MQTT 3.1.1 6个新特性
QTT 3.1.1规范于2014年10月30号正式发布,与此同时MQTT 3.1.1已成为OASIS(结构化信息标准促进组织)开放物联网消息传递协议标准。
与MQTT 3.1规范相比,MQTT 3.1.1目标在于消除歧义,尽可能的向后兼容,事实上一些大众所需的新特性被包含在这个版本(更多的是物联网
标准推动),因此不仅是一个维护版本,也是一种巨大的进步。除了概念的澄清和陈旧规范重写外,有一些很有趣的变化是值得注意的。
会话表示标志(Session Present Flag)
如果一个终端与服务器之间建立一个持久会话连接(假设这个终端没有使用到一个“clean session”标记清除已有回话标志), 一个新增的
“Session Present”标志(会话表示标志,逻辑值为true或false)会在CONNACK中出现,表明MQTT服务器已经拥有当前客户端上次
连接会话信息,比如订阅的主题,排队信息和其它信息等。
会话表示标志若为true,客户端可减少了一次发送订阅SUBSCRIBLE交互步骤,有助于更有效的数据通信;
会话表示标志若为为false,客户端需要再次发送订阅SUBSCRIBLE消息,不可略过。
新增订阅失败代码反馈
MQTT 3.1.1之前,终端连接之后无法知道其发送的订阅主题是否被MQTT服务器接受与否。
此新特性较适用于细粒度权限MQTT主题管理;若无授权,服务器会把错误代码(0x80)附加在SUBACK中,客户端就可以知道订阅失败。
MQTT匿名客户端
需要支持临时或匿名?
客户端仅仅需要在发送CONNECT时把客户端标识符( client identifier )置空(零长度)即可,MQTT服务器会为此类请求生成一个随
机、唯一客户端标记符。但这要求客户端必须设置Clean Session标记为1,否则服务器端会直接返回包含0x02 (Identifier rejected)
代码的CONNACK,同时关闭连接。
可用于后端程序(不需要维护回话状态)向终端发送消息的客户端,MQTT服务器程序可区别对待。
快速发布无等待
这是一个新增的特别有用的特性,客户端可以在发送CONNECT之后,可无须等待MQTT服务器返回的CONNACK,根据需要即刻发送
PUBLISH、SUBSCRIBLE、DISCONNEECT等消息,可避免客户端资源等待。此特性也适用于突发模式(burst-mode)客户端需求,
只关心数据要尽快的发送出去,而不是去担心是否需要维护一个长连接。
这需要MQTT服务器实现在分发消息之前检查客户端是否有权限发布到这些主题上。
客户端标识符可以变长一些
MQTT 3.1针对客户端标识符( client identifier)限制是23个字节,实际环境下会有所不便,已有遗留系统可能使用UUID作为客户端
的标识符,这样服务器端需要做一些彼此之间的MAP映射。
MQTT 3.1.1中上限为65535个字节,毕竟成为业界标准,需要兼容大量的遗留设备和基础设施。
其它小改变
- CONNECT消息可变头部协议名称MQIsdp被改为MQTT,语义更准确
- 所有字符串明确规定使用UTF-8编码,包括客户端标识符(Client Identifier)
- CONNECT消息可变头部协议版本号,由0x03变成了0x04。 QoS 0类型PUBLISH消息DUP标记必须被设置为0。
- MQTT Over WebSocket 被定义,互联网地址编码分配机构(Internet Assigned Numbers Authority)分配标识符为mqtt。。
术语变化
- MQTT代理 -> MQTT服务器(MQTT Broker is now MQTT Server)
- 消息ID -> 包ID(Message ID is now Packet ID)
- 消息类型 -> 包类型(Message Type is now Packet Type)
- 主题路径 -> 主题名称(Subscribe and Unsubscribe take Topic Paths, rather than Topic names)
- 以前在固定头部,现在在包类型中( Flags in the fixed header are now specific to the packet type
- 0字节保留信息需要清除 (A zero byte retained message MUST NOT be stored as a retained message on the Server )
小结
当前MQTT 3.1.1已经在很多活跃开源项目/商业产品得到支持。
比如Eclipse Paho,Mosquitto,JBoss A-MQ 6.1, Apache ActiveMQ 5.10-SNAPSHOT,Apache Camel 2.13.0,HiveMQ等。
关于Eclipse Paho:
- Eclipse Paho 1.0支持MQTT 3.1.1和MQTT 3.1规范
- Eclipse Paho 0.9仅支持 MQTT 3.1规范
包含MQTT 3.1.1和MQTT 3.1的客户端可以混合使用,彼此可以共存于同一个MQTT服务器下,在基本消息传输层面没有多大修改,
同样的PUBLISH消息可以在MQTT客户端中自由流转,这个需要服务器端编码支持。
已有的MQTT 3.1客户端可以用着急升级,但升级之后可以从新增特性中收益良多。
最后
以上就是无私高跟鞋为你收集整理的MQTT 3.1.1 协议的全部内容,希望文章能够帮你解决MQTT 3.1.1 协议所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复