概述
MQTT
简介:MQTT由IBM公司开发,是一个即时通讯协议,也是一个物联网传输协议,主要用于轻量级的订阅/发布式的消息传输。其设计目的主要是为低带宽和不稳定网络环境下的物联网设备提供服务。
MQTT中的概念
- 订阅(Subscribtion):
订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。
- 会话(Session):
每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。 - 主题名(Topic Name):
连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。
需要注意的是,MQTT中消息主题按照层级命名,使用 ‘/’ 进行分割
此外,主题中可以使用通配符进行多个主题或多层级的订阅,有两种常见的通配符:
1. 单层通配符 +:单层通配符只能匹配一层的主题,例如:China/Beijing/+,可以匹配的只有Beijing这个主题下面一层的主题,例如Xicheng, DongCheng, Xuanwu等等。
2. 多层通配符 #:顾名思义,多层通配符就是可以匹配多个层级的主题,例如:China/#,可以匹配到的主题可能有:China/Beijing/Dongcheng, China/Shanghai/PuDong,等等。 - 主题筛选器(Topic Filter):
一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。 - 负载(Payload):
消息订阅者所具体接收的内容。
MQTT中的角色
- Publisher和Subscriber为客户端,Broker为服务器端,消息主题为消息类型,Broker根据Topic过滤消息,并将消息向客户端推送。
- MQTT中用QoS表示服务质量,MQTT协议中有三种服务质量(QoS):
- QoS =0,至多一次,可能会出现丢包的情况,使用在对实时性要求不高的情况,例如,将此服务质量与通信环境传感器数据一起使用。 对于是否丢失个别读取或是否稍后立即发布新的读取并不重要。
- QoS =1,至少一次,保证包会到达目的地,但是可能出现重包。
- QoS =2, 刚好一次,保证包会到达目的地,且不会出现重包的现象。
客户端
- Publisher和Subscriber都属于客户端。
- 发布应用消息给其它相关的客户端。
- 订阅以请求接受相关的应用消息。
- 取消订阅以移除接受应用消息的请求。
- 从服务端断开连接。
服务器端
- 服务器端即所谓的MQTT Broker服务器。
- 接受来自客户端的网络连接。
- 接受客户端发布的应用消息。
- 处理客户端的订阅和取消订阅请求。
- 转发应用消息给符合条件的已订阅客户端。
- MQTT提供的公共服务器端(Broker)有:
- test.mosquitto.org
- broker.hivemq.com
- iot.eclipse.org
配置私有的MQTT服务器
通常情况,出于安全考虑,一般使用私有的MQTT服务器端,MQTT的本地服务由Mosquitto支持。设置MQTT私有服务器端的方法如下(环境为Ubuntu16.04):
# Install Mosquitto and Mosquitto-clients(optional)
sudo apt-get install mosquitto
# 默认情况下,ubuntu会自动启动Mosquitto服务,所以无需显式启动服务,此时可以查看mosquitto状态:
sudo systemctl satus mosquitto
如果你只是想运行一个本地的MQTT服务,现在已经OK了。在mosquitto服务启动之后,你可以使用服务器的域名或者IP地址访问,MQTT服务器默认端口为1883。问题很明显,虽然我们设置了本地的私有MQTT服务器端,但是任何人都可以通过IP访问这台服务器,所以我们需要为mosquitto设置用户名和密码,只有拥有用户名和密码的客户端才可连接到服务器。
Mosquitto客户端提供了为mosquitto设置密码的命令 mosquitto_passwd,这个命令其实就是将我们设置的用户名和密码copy进/etc/mosquitto/passwd这个文件:
sudo mosquitto_passwd -c /etc/mosquitto/passwd <username>
# 执行上面命令的时候会提示输入两次密码
在生成了密码文件之后,我们需要告诉mosquitto服务,以后如果有客户端想创建连接请验证用户名和密码,具体操作如下:
sudo bash -c 'sudo echo -e "allow_anonymous falsenpassword_file /etc/mosquitto/passwd" > /etc/mosquitto/conf.d/default.conf'
上面的命令创建default.conf并输入引号里面的命令,可以看到我们禁止了anonymous连接,并且指定了密码所在的文件。
然后,重启mosquitto服务,让设置生效
sudo systemctl restart mosquitto
重新测试一下
从上面的测试结果看出,现在我们的mosquitto服务器已经有了username和password的feature了。
MQTT Python API(paho-mqtt)
pip install paho-mqtt
注: paho-mqtt这个库提供的函数主要是客户端的函数
另外,在paho-mqtt库中,有一种重要的函数–回调函数。简单说一下回调函数,通常情况下,我们写应用程序代码时经常引入一些API,我们主动调用这些API里的函数,称为直调。反过来,如果让API调用我们定义好的函数,这就称为回调。在phao-client这个库中,on_connect, on_message, on_subscribe, on_publish等等这些均为回调函数, 这些回调函数由中间函数调用。简单看一下paho-client中的callback 是如何实现的(以subscribe为例):
# 假设我们自定义的on_subscribe回调函数如下
# 先不要管为什么要在函数中指定这些参数,后面会用到
def on_subscribe(client, userdata, mid, granted_qos):
print('Subscribed message: ', str(mid))
# 然后我们使用如下语句设置回调
client.on_subscribe = on_subscribe
# 下面解释上面这行代码,查看paho-mqtt源码
@property
def on_subscribe(self):
"""If implemented, called when the broker responds to a subscribe
request."""
return self._on_subscribe
@on_subscribe.setter
def on_subscribe(self, func):
""" Define the suscribe callback implementation.
Expected signature is:
subscribe_callback(client, userdata, mid, granted_qos)
client: the client instance for this callback
userdata: the private user data as set in Client() or userdata_set()
mid: matches the mid variable returned from the corresponding
subscribe() call.
granted_qos: list of integers that give the QoS level the broker has
granted for each of the different subscription requests.
"""
with self._callback_mutex:
self._on_subscribe = func
从上面的代码可以看出, subscribe为Client类的一个property,我们使用的是subscribe属性的setter方法,设置类成员变量_on_subscribe的值。
接下来,我们发出subscribe请求,下面paho-client处理subscribe请求的函数,函数的前半部分基本是对客户端传入参数topic的检查,忽略。从最后三行代码可以看出,客户端发送了topic_qos_list这条消息给了MQTT服务器端。
def subscribe(self, topic, qos=0):
topic_qos_list = None
if isinstance(topic, tuple):
topic, qos = topic
if isinstance(topic, basestring):
...
elif isinstance(topic, list):
...
if topic_qos_list is None:
raise ValueError("No topic specified, or incorrect topic type.")
if any(self._filter_wildcard_len_check(topic) != MQTT_ERR_SUCCESS for topic, _ in topic_qos_list):
raise ValueError('Invalid subscription filter.')
if self._sock is None:
return (MQTT_ERR_NO_CONN, None)
return self._send_subscribe(False, topic_qos_list)
在paho-mqtt中有一个函数_handle_suback来处理服务器返回给客户端subscribe请求的响应消息。具体消息接收的过程有好几个步骤,大体经过的函数有:loop –> loop_read –> _packet_read –> _packet_handle –> _handle_suback
def _handle_suback(self):
self._easy_log(MQTT_LOG_DEBUG, "Received SUBACK")
pack_format = "!H" + str(len(self._in_packet['packet']) - 2) + 's'
(mid, packet) = struct.unpack(pack_format, self._in_packet['packet'])
pack_format = "!" + "B" * len(packet)
granted_qos = struct.unpack(pack_format, packet)
with self._callback_mutex:
if self.on_subscribe:
with self._in_callback: # Don't call loop_write after _send_publish()
self.on_subscribe(self, self._userdata, mid, granted_qos)
return MQTT_ERR_SUCCESS
好了,终于看到了在哪里调用了回调函数,现在明白了为什么要在创建on_subscribe的时候指定那些参数了吧。因为这些参数可能对回调函数本身没什么用,BUT,中间函数(也就是这里的_handle_suback)认为它们有用,并且在调用回调函数的时候传入了这些参数,所以我们定义的时候需要有这些参数。
下面简单介绍paho-client的一些基本操作,只是简单列举一些函数,具体更多的可以查看官方Documentation
# import mqtt客户端
import paho.mqtt.client as mqtt
# 创建客户端, client_id为必须参数,其余为可选参数
client = mqtt.Client(client_id=””, clean_session=True, userdata=None, protocol=MQTTv311, transport=”tcp”)
'''
# 当客户端与服务器端连接成功后,服务器端会给客户端返回一个Ack消息,这个Ack会调用回调方法on_connect()来显示连接状态,用户可以自定义回调方法的内容
params:
rc: return code,表示服务器端返回的连接状态, 可能的值有:
0: 连接成功
1: 连接拒绝 --– 协议版本错误
2. 连接拒绝 --- 客户端身份验证错误
3. 连接拒绝 --- 服务器不存在
4. 连接拒绝 --- 用户名/密码错误
5. 连接拒绝 --- 未授权错误
6-255. 连接拒绝 --- 当前不可用
'''
def on_connect(client, userdata, flags, rc):
if rc==0:
client.connected_flag = True
print("connected OK Returned code=",rc)
else:
client.connected_flag = False
print("Bad connection Returned code=",rc)
# 设定自定义的on_connect回调函数
client.on_connect = on_connect
'''
on_message()回调函数
当订阅者收到Broker发布的消息之后,on_message()被调用
params:
message:
:type MQTTMessage
:attrs topic, payload, qos, retain
'''
def on_message(client, userdata, message):
print("message received " ,str(message.payload.decode("utf-8")))
print("message topic=",message.topic)
print("message qos=",message.qos)
print("message retain flag=",message.retain)
client.on_message = on_message
'''
连接服务器端, host为broker的IP或者domain name
params:
host: 服务器端的IP地址或者Domain name
keepalive: 客户端和服务器端交互的最长时间,当客户端和Broker之间没有交互的时候,客户端ping服务器端的频率,单位为秒
bind_address: 在多网卡情况下,将客户端和某一局部网卡的IP地址绑定
'''
cient.connect(host, port=1883, keepalive=60, bind_address="")
'''
Loop Start
loop_start()函数调用一次loop()函数
loop()函数的作用为:读取、写入接收缓存区的或者发送缓冲区中的数据,并调用对应的回调函数。此外,loop函数还可以在连接断开的时候,重新建立与服务器端的连接.
'''
client.loop_start()
# 此外,可以通过connect_flag来标记连接状态,主要用于等待连接成功
while client.connected_flag is False:
time.sleep()
'''
Publish Message
只有topic和payload为必须参数,其余可选
当客户端调用publish()方法时,会返回MQTTMessageInfo对象,该对象包含的属性和方法有:
attr:
rc(return code):
MQTT_ERR_SUCCESS, MQTT_ERR_NO_CONN, MQTT_ERR_QUEUE_SIZE
mid(message id)
is_published
function:
wait_for_publish()
当消息被发送给Broker之后,on_publish()回调方法会被调用
'''
client.publish(topic='$topic', payload='$payload', qos=0, retain=False)
'''
Subscribe Message
此函数的参数有三种类型:
1. Simple string and integer
example: subscribe('my/topic', 0)
2. String and integer tuple
example: subscribe(('my/topic', 0))
3. List of string and integer tuples
exmaple: subscribe([('my/topic1', 0), ('my/topic', 2)])
return: (result, mid)
:type tuple
当Broker收到订阅者的订阅请求之后,on_subscribe()回调函数会被调用
'''
client.subscribe(topic, qos=0)
# 结束loop
client.loop_stop()
最后
以上就是留胡子飞鸟为你收集整理的MQTTMQTT的全部内容,希望文章能够帮你解决MQTTMQTT所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复