我是靠谱客的博主 聪慧白猫,最近开发中收集的这篇文章主要介绍python连接mqtt服务器--极简示例测试代码,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

python使用paho-mqtt库,  连接mqtt服务器进行发布与订阅消息的极简示例

0 安装库

sudo pip install paho-mqtt

1 连接服务器示例

Paho库采用回调函数的方式来返回连接状态

代码中还设置了遗嘱消息,这条消息会存储在服务器,一旦客户端非正常断开(不使用disconnect断开,最常见的是代码出错卡死),即会发布该消息。

import paho.mqtt.client as mqtt
HOST = "192.168.103.174" #服务器ip地址
PORT = 1883
#服务器端口
USER = 'pc'
#登陆用户名
PASSWORD = '123'
#用户名对应的密码
def on_connect(client, userdata, flags, rc):
rc_status = [ "连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户密码错误", "无授权" ]
print("connect:" , rc_status[rc])
client = mqtt.Client()
client.on_connect = on_connect
#注册返回连接状态的回调函数
client.username_pw_set(USER, PASSWORD)
#如果服务器要求需要账户密码
client.will_set("test/die", "我死了", 0)
#设置遗嘱消息
client.connect(HOST, PORT, keepalive=600) # 连接服务器
#client.disconnect() #断开连接,不会触发遗嘱消息

2 发布消息的API

        较为简单,调用这条API即可发布消息

TOPIC_PUB = "test/py_test" #发布主题
MESSAGE = "这是测试消息" #载荷
client.publish(TOPIC_PUB, MESSAGE, qos=0) #发布消息

3 接收数据

         接收数据是通过登记回调函数,当收到数据时会调用该函数。

def on_message(client, userdata, msg):
print("主题:", msg.topic)
print("消息:", str(msg.payload,'utf-8') ,'n' )
client.on_message = on_message
#定义回调函数
client.subscribe('test/#', qos=0)
#订阅主题test/#
client.loop_start()
#非阻塞,启动接收线程
#client.loop_forever()
#阻塞式,会卡死在这等待接收

测试代码

1 编写发送程序

        以下代码的功能是,每隔3秒发送一句测试消息

import time
import paho.mqtt.client as mqtt
HOST = "192.168.103.174" #服务器ip地址
PORT = 1883
#服务器端口
USER = 'pc'
#登陆用户名
PASSWORD = '123'
#用户名对应的密码
def on_connect(client, userdata, flags, rc):
rc_status = [ "连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户密码错误", "无授权" ]
print("connect:" , rc_status[rc])
client = mqtt.Client()
client.on_connect = on_connect
#注册返回连接状态的回调函数
client.username_pw_set(USER, PASSWORD)
#如果服务器要求需要账户密码
client.will_set("test/die", "我死了", 0)
#设置遗嘱消息
client.connect(HOST, PORT, keepalive=600) # 连接服务器
#client.disconnect() #断开连接,不会触发遗嘱消息
TOPIC_PUB = "test/py_test" #发布主题
MESSAGE = "这是测试消息" #载荷
while 1:
client.publish(TOPIC_PUB, MESSAGE, qos=0) #发布消息
time.sleep(3)

使用软件mqttx连接至mqtt服务器查看消息

                

2 编写接收测试程序

        以下代码的功能是订阅test/#主题,并将收到的消息print出来。

        我是非阻塞党,不能停在那条阻塞接收里,必须停在由自己控制的while内

import paho.mqtt.client as mqtt
HOST = "192.168.103.174" #服务器ip地址
PORT = 1883
#服务器端口
USER = 'pc'
#登陆用户名
PASSWORD = '123'
#用户名对应的密码
def on_connect(client, userdata, flags, rc):
rc_status = [ "连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户密码错误", "无授权" ]
print("connect:" , rc_status[rc])
client = mqtt.Client()
client.on_connect = on_connect
#注册返回连接状态的回调函数
client.username_pw_set(USER, PASSWORD)
#如果服务器要求需要账户密码
client.will_set("test/die", "我死了", 0)
#设置遗嘱消息
client.connect(HOST, PORT, keepalive=600) # 连接服务器
#client.disconnect() #断开连接,不会触发遗嘱消息
def on_message(client, userdata, msg):
print("主题:", msg.topic)
print("消息:", str(msg.payload,'utf-8') ,'n' )
client.on_message = on_message
#定义回调函数
client.subscribe('test/#', qos=0)
#订阅主题test/#
client.loop_start()
#非阻塞,启动接收线程
#client.loop_forever()
#阻塞式,会卡死在这等待接收
while 1:
pass

        

最后

以上就是聪慧白猫为你收集整理的python连接mqtt服务器--极简示例测试代码的全部内容,希望文章能够帮你解决python连接mqtt服务器--极简示例测试代码所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(57)

评论列表共有 0 条评论

立即
投稿
返回
顶部