我是靠谱客的博主 冷艳星星,这篇文章主要介绍EC600 QuecPython接入第三方MQTT服务器, 以阿里云物联网平台为例,可替换为自己创建的MQTT服务器EC600 MQTT脚本运行现象注意事项,现在分享给大家,希望可以做个参考。

EC600 QuecPython 自带Aliyun腾讯云物联网平台的接入库,但重口难调,产品项目中更多的是接入自己搭建的第三方服务器。

MQTT协议接入不同的服务器,最大的区别在于connect报文。

connect报文的载荷中,包含了设备的登录账号,而每个平台都有自己的设备管理方式,也就造成编码方式的不一致。不过好在MQTT也规定了 clientId、userName、passWord 这几个关键载荷,用户只要保证这几个部分与平台匹配即可连接。


使用 EC600 QuecPython 接入第三方MQTT服务器,需要使用 umqtt 模块。

这里为方便测试,使用阿里云账号,不过不用 aliyun 的MQTT接入库,直接用 umqtt 模块。

  • EC600 umqtt 使用帮助:QuecPython - 在线API文档
  • 本篇demo使用的测试设备账号、以及密码的合成方式:网络调试助手接入阿里云MQTT物联网平台,逐字节讲解各字段合成方式
  • 阿里云物联网平台的设备添加:阿里云物联网平台注册、添加产品、设备

上述的设备账号会一直保留,方便测试,各位可放心使用。


EC600 MQTT脚本


接入第三方MQTT服务器,需要根据自己的设备及服务器,自行计算并替换:clientId、userName、passWord、brokerUrl 以及 topic。(阿里云的 clientI、userName、passWord 计算方式见上述链接)

Demo代码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from machine import Pin import log import checkNet from umqtt import MQTTClient # 使用提前计算好的密码,可以登录连接。hamc库暂时使用有问题,待官方的固件库更新 # 使用umqtt库接入阿里云物联网平台,模拟接入第三方MQTT服务器进行测试 clientId = 'co_0001|securemode=3,signmethod=hmacsha1|' userName ='co_0001&a1wFylTxYeD' passWord = 'e782b5e55b37655c27812a60c307b0a7575d8f6d' brokerUrl = 'a1wFylTxYeD.iot-as-mqtt.cn-shanghai.aliyuncs.com' topic_post = b'/sys/a1wFylTxYeD/co_0001/thing/event/property/post' topic_set = b'/sys/a1wFylTxYeD/co_0001/thing/service/property/set' PROJECT_NAME = "MQTT_example" PROJECT_VERSION = "1.0.0" # 检查网络状态,创建checkNet对象 checknet = checkNet.CheckNetwork(PROJECT_NAME, PROJECT_VERSION) # 设置日志输出级别,仅输出等级高于INFO的日志结果 log.basicConfig(level=log.INFO) log = log.getLogger("MQTT") state = 0 # V1.2 Demo板上的led - D6 blink = False led = Pin(Pin.GPIO24, Pin.OUT, Pin.PULL_DISABLE, 0) # 设置MQTT接收消息回调 def sub_cb(topic, msg): global state log.info("Subscribe Recv: Topic={},Msg={}".format(topic.decode(), msg.decode())) state = 1 # 收到下发的消息时,LED亮灭状态会发生变化 global blink blink = bool(1-blink) led.write(blink) if __name__ == '__main__': # 上电运行后调试输出 项目名称、项目版本号、固件版本号、开机原因、SIM卡状态 checknet.poweron_print_once() print('Hardware PowerON!') # 开机后LED亮起,指示状态。 led.write(1) # 阻塞等待网络就绪,超时等待30s stagecode, subcode = checknet.wait_network_connected(30) if stagecode == 3 and subcode == 1: # 网络准备就绪,开始执行用户代码 log.info('Network connection successful! Then connect to Aliyun MQTT') # 指示LED灭掉,提示网络连接正常。 led.write(0) print('username:', userName) print('password:', passWord) # 创建一个mqtt实例 c = MQTTClient( client_id=clientId, server=brokerUrl, port=1883, user=userName, password=passWord, keepalive=60) # 设置消息回调 c.set_callback(sub_cb) # 建立连接 try: c.connect() except Exception as e: print('!!!,e=%s' % e) # 正常连接后,输出消息 log.info('Aliyun MQTT connected , Then subscribe topic') # 订阅主题 c.subscribe(topic_post) log.info('subscribe topic: %s' % topic_post) c.subscribe(topic_set) log.info('subscribe topic: %s' % topic_set) # 发布消息 c.publish(topic_post, 'test publish') log.info('Publish topic: %s, Msg: %s' % (topic_post, 'test publish')) log.info('listen') while True: c.wait_msg() # 阻塞函数,监听消息 else: log.info('Network connection failed! stagecode = {}, subcode = {}'.format(stagecode, subcode))

运行现象


SIM插入、4G网络正常,正常运行的现象。

log会显示正常连接、订阅topic 和 publish。最后在 publish 一条消息后,一直处于监听状态。
在这里插入图片描述

  • 连接后能在管理页面看到设备在线。
    在这里插入图片描述
  • 测试下发控制消息
    在这里插入图片描述
  • 观察 V1.2 Demo板上的led - D6,收到下发的消息时的同时,LED亮灭状态也会发生变化。(led亮灭是脚本中自己加入的代码)
    在这里插入图片描述

注意事项

  • hmac 暂时有问题,需要等待固件更新,才可以让设备自行合成 passWord。
  • 接入第三方MQTT服务器,需要自行计算clientId、userName、passWord、brokerUrl 以及 topic,并替换。
  • umqtt的异常提醒很坑,如设备账号填错,会报错 bytes index out of range。而非重新连接。
  • 代码中如有 while,会导致与上位机的通信阻塞。重新下载脚本时,需要断开连接、复位、再重新连接。(如带有while 的脚本文件名为 main.py,则会上电自运行,只能通过重刷固件解决)

最后

以上就是冷艳星星最近收集整理的关于EC600 QuecPython接入第三方MQTT服务器, 以阿里云物联网平台为例,可替换为自己创建的MQTT服务器EC600 MQTT脚本运行现象注意事项的全部内容,更多相关EC600内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(73)

评论列表共有 0 条评论

立即
投稿
返回
顶部