我是靠谱客的博主 冷静百褶裙,这篇文章主要介绍Python模拟智能开关设备MQTT接入阿里云物联网平台 - PyCharm paho.mqtt概要Python脚本使用说明Demo源码(IDE推荐用 PyCharm)运行现象(IDE使用 PyCharm)异常处理,现在分享给大家,希望可以做个参考。

概要

Python 使用 paho.mqtt 库,利用阿里云物联网平台的设备证书:productKey、deviceName、deviceSecret,自动合成 userName、passWord。以MQTT通信协议接入阿里云物联网平台,并模拟智能开关设备上报开关消息。

非常适合作为MQTT物联网设备的客户端模拟。在此Demo基础上可非常方便进行二次开发。

MQTT.fx做客户端固然方便,但如果想对流程或任务进行定制、让其模拟物联网设备的功能、或者多开自动化脚本,应该没什么比Python更方便了吧。


Python脚本使用说明

  • MQTT接入阿里云物联网平台Demo,使用一机一密的方式。
  • 我的代码运行环境为PyCharm,运行时,需安装 paho.mqtt。
  • 在 PyCharm 的 File - Settings - Projectxxx - Python Interpreter 中,搜索并安装 paho.mqtt。
  • Demo中需要根据个人设备进行改动的仅5项:productKeydeviceNamedeviceSecretregionIdmodelName
  • 代码运行后,会使用设备证书的信息,自动连接阿里云物联网平台。并以5s为间隔,自动上报开关的状态消息。
  • 在阿里云在线调试界面,下发消息,可在客户端收到对应的json报文。

阿里云设备注册 的过程,请参照链接:阿里云MQTT物联网设备注册


Demo源码(IDE推荐用 PyCharm)

  • Demo中需要根据个人设备进行改动的仅5项:productKeydeviceNamedeviceSecretregionIdmodelName。一定要保证这5项与个人注册的设备相匹配。
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import hmac from hashlib import sha1 import time from paho.mqtt.client import MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING, MQTT_LOG_ERR, MQTT_LOG_DEBUG from paho.mqtt import client as mqtt import json import random import threading ''' # 原文链接 - 我的博客,更多内容可查看我的主页。 # MQTT接入阿里云物联网平台Demo,使用一机一密的方式 # 运行时,需安装 paho.mqtt # 在PyCharm 的 File - Settings - Projectxxx - Python Interpreter 中,搜索并安装 paho.mqtt # 需要根据个人设备进行改动的仅5项:productKey、deviceName、deviceSecret、regionId、modelName ''' # 设备证书(ProductKey、DeviceName和DeviceSecret),三元组 productKey = 'a1wFylTxYeD' deviceName = 'co_0001' deviceSecret = '7ab0c4b3532b5783df5fdc58a2895d7a' # ClientId Username和 Password 签名模式下的设置方法,参考文档 https://help.aliyun.com/document_detail/73742.html?spm=a2c4g.11186623.6.614.c92e3d45d80aqG # MQTT - 合成connect报文中使用的 ClientID、Username、Password mqttClientId = deviceName + '|securemode=3,signmethod=hmacsha1|' mqttUsername = deviceName + '&' + productKey content = 'clientId' + deviceName + 'deviceName' + deviceName + 'productKey' + productKey mqttPassword = hmac.new(deviceSecret.encode(), content.encode(), sha1).hexdigest() # 接入的服务器地址 regionId = 'cn-shanghai' # MQTT 接入点域名 brokerUrl = productKey + '.iot-as-mqtt.' + regionId + '.aliyuncs.com' # Topic,post,客户端向服务器上报消息 topic_post = '/sys/' + productKey + '/' + deviceName + '/thing/event/property/post' # Topic,set,服务器向客户端下发消息 topic_set = '/sys/' + productKey + '/' + deviceName + '/thing/service/property/set' # 物模型名称的前缀(去除后缀的数字) modelName = 'PowerSwitch_' # 下发的设置报文示例:{"method":"thing.service.property.set","id":"1227667605","params":{"PowerSwitch_1":1},"version":"1.0.0"} # json合成上报开关状态的报文 def json_switch_set(num, status): switch_info = {} switch_data = json.loads(json.dumps(switch_info)) switch_data['method'] = '/thing/event/property/post' switch_data['id'] = random.randint(100000000,999999999) # 随机数即可,用于让服务器区分开报文 switch_status = {modelName + num : status} switch_data['params'] = switch_status return json.dumps(switch_data, ensure_ascii=False) # 开关的状态,0/1 onoff = 0 # 建立mqtt连接对象 client = mqtt.Client(mqttClientId, protocol=mqtt.MQTTv311, clean_session=True) def on_log(client, userdata, level, buf): if level == MQTT_LOG_INFO: head = 'INFO' elif level == MQTT_LOG_NOTICE: head = 'NOTICE' elif level == MQTT_LOG_WARNING: head = 'WARN' elif level == MQTT_LOG_ERR: head = 'ERR' elif level == MQTT_LOG_DEBUG: head = 'DEBUG' else: head = level print('%s: %s' % (head, buf)) # MQTT成功连接到服务器的回调处理函数 def on_connect(client, userdata, flags, rc): print('Connected with result code ' + str(rc)) # 与MQTT服务器连接成功,之后订阅主题 client.subscribe(topic_post, qos=0) client.subscribe(topic_set, qos=0) # 向服务器发布测试消息 client.publish(topic_post, payload='test msg', qos=0) # MQTT接收到服务器消息的回调处理函数 def on_message(client, userdata, msg): print('recv:', msg.topic + ' ' + str(msg.payload)) def on_disconnect(client, userdata, rc): if rc != 0: print('Unexpected disconnection %s' % rc) def mqtt_connect_aliyun_iot_platform(): client.on_log = on_log client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect client.username_pw_set(mqttUsername, mqttPassword) print('clientId:', mqttClientId) print('userName:', mqttUsername) print('password:', mqttPassword) print('brokerUrl:', brokerUrl) # ssl设置,并且port=8883 # client.tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None) try: client.connect(brokerUrl, 1883, 60) except: print('阿里云物联网平台MQTT服务器连接错误,请检查设备证书三元组、及接入点的域名!') client.loop_forever() def publish_loop(): while 1: time.sleep(5) global onoff onoff = 1-onoff switchPost = json_switch_set('1', onoff) client.publish(topic_post, payload=switchPost, qos=0) if __name__ == '__main__': # 建立线程t1:mqtt连接阿里云物联网平台 # 建立线程t2:定时向阿里云发布消息:5s为间隔,变化开关状态 t1 = threading.Thread(target=mqtt_connect_aliyun_iot_platform, ) t2 = threading.Thread(target=publish_loop, ) t1.start() t2.start()

运行现象(IDE使用 PyCharm)

  • 代码运行后,会在调试窗口输出提示内容。
    在这里插入图片描述

  • 已经连接阿里云物联网平台,并定时上发开关状态的报文。
    在这里插入图片描述

  • 阿里云物联网平台,设备后台,可以看到设备上线。
    在这里插入图片描述

  • 查看客户端上发的开关状态:物模型 - 打开实时刷新
    在这里插入图片描述
    在这里插入图片描述

  • 点击查看数据,可以图标形式查看历史数据。
    在这里插入图片描述

  • 利用在线调试,模拟向客户端下发控制指令。
    在这里插入图片描述

  • 客户端收到服务器的控制指令。
    在这里插入图片描述


异常处理

  • 代码已验证无误,如有错误,只能是设备证书填写有误。
  • Demo中需要根据个人设备进行改动的仅5项:productKeydeviceNamedeviceSecretregionIdmodelName。一定要保证这5项与个人注册的设备相匹配。
  • 脚本运行时,会调试输出 clientId、userName、passWordbrokerUrl 等关键信息。可自行对照。
    在这里插入图片描述
  • productKeyregionId 填写有误,会导致 brokerUrl 合成错误,进而域名解析失败,MQTT不能正常连接阿里云服务器,引发异常。
    在这里插入图片描述

最后

以上就是冷静百褶裙最近收集整理的关于Python模拟智能开关设备MQTT接入阿里云物联网平台 - PyCharm paho.mqtt概要Python脚本使用说明Demo源码(IDE推荐用 PyCharm)运行现象(IDE使用 PyCharm)异常处理的全部内容,更多相关Python模拟智能开关设备MQTT接入阿里云物联网平台内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(83)

评论列表共有 0 条评论

立即
投稿
返回
顶部