我是靠谱客的博主 冷艳星星,最近开发中收集的这篇文章主要介绍EC600 QuecPython接入第三方MQTT服务器, 以阿里云物联网平台为例,可替换为自己创建的MQTT服务器EC600 MQTT脚本运行现象注意事项,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

EC600 QuecPython 自带Aliyun腾讯云物联网平台的接入库,但重口难调,产品项目中更多的是接入自己搭建的第三方服务器。

MQTT协议接入不同的服务器,最大的区别在于connect报文。

connect报文的载荷中,包含了设备的登录账号,而每个平台都有自己的设备管理方式,也就造成编码方式的不一致。不过好在MQTT也规定了 clientId、userName、passWord 这几个关键载荷,用户只要保证这几个部分与平台匹配即可连接。


使用 EC600 QuecPython 接入第三方MQTT服务器,需要使用 umqtt 模块。

这里为方便测试,使用阿里云账号,不过不用 aliyun 的MQTT接入库,直接用 umqtt 模块。

  • EC600 umqtt 使用帮助:QuecPython - 在线API文档
  • 本篇demo使用的测试设备账号、以及密码的合成方式:网络调试助手接入阿里云MQTT物联网平台,逐字节讲解各字段合成方式
  • 阿里云物联网平台的设备添加:阿里云物联网平台注册、添加产品、设备

上述的设备账号会一直保留,方便测试,各位可放心使用。


EC600 MQTT脚本


接入第三方MQTT服务器,需要根据自己的设备及服务器,自行计算并替换:clientId、userName、passWord、brokerUrl 以及 topic。(阿里云的 clientI、userName、passWord 计算方式见上述链接)

Demo代码:

from machine import Pin
import log
import checkNet
from umqtt import MQTTClient
# 使用提前计算好的密码,可以登录连接。hamc库暂时使用有问题,待官方的固件库更新
# 使用umqtt库接入阿里云物联网平台,模拟接入第三方MQTT服务器进行测试
clientId = 'co_0001|securemode=3,signmethod=hmacsha1|'
userName ='co_0001&a1wFylTxYeD'
passWord = 'e782b5e55b37655c27812a60c307b0a7575d8f6d'
brokerUrl = 'a1wFylTxYeD.iot-as-mqtt.cn-shanghai.aliyuncs.com'
topic_post = b'/sys/a1wFylTxYeD/co_0001/thing/event/property/post'
topic_set = b'/sys/a1wFylTxYeD/co_0001/thing/service/property/set'
PROJECT_NAME = "MQTT_example"
PROJECT_VERSION = "1.0.0"
# 检查网络状态,创建checkNet对象
checknet = checkNet.CheckNetwork(PROJECT_NAME, PROJECT_VERSION)
# 设置日志输出级别,仅输出等级高于INFO的日志结果
log.basicConfig(level=log.INFO)
log = log.getLogger("MQTT")
state = 0
# V1.2 Demo板上的led - D6
blink = False
led = Pin(Pin.GPIO24, Pin.OUT, Pin.PULL_DISABLE, 0)
# 设置MQTT接收消息回调
def sub_cb(topic, msg):
global state
log.info("Subscribe Recv: Topic={},Msg={}".format(topic.decode(), msg.decode()))
state = 1
# 收到下发的消息时,LED亮灭状态会发生变化
global blink
blink = bool(1-blink)
led.write(blink)
if __name__ == '__main__':
# 上电运行后调试输出 项目名称、项目版本号、固件版本号、开机原因、SIM卡状态
checknet.poweron_print_once()
print('Hardware PowerON!')
# 开机后LED亮起,指示状态。
led.write(1)
# 阻塞等待网络就绪,超时等待30s
stagecode, subcode = checknet.wait_network_connected(30)
if stagecode == 3 and subcode == 1:
# 网络准备就绪,开始执行用户代码
log.info('Network connection successful! Then connect to Aliyun MQTT')
# 指示LED灭掉,提示网络连接正常。
led.write(0)
print('username:', userName)
print('password:', passWord)
# 创建一个mqtt实例
c = MQTTClient(
client_id=clientId,
server=brokerUrl,
port=1883,
user=userName,
password=passWord,
keepalive=60)
# 设置消息回调
c.set_callback(sub_cb)
# 建立连接
try:
c.connect()
except Exception as e:
print('!!!,e=%s' % e)
# 正常连接后,输出消息
log.info('Aliyun MQTT connected , Then subscribe topic')
# 订阅主题
c.subscribe(topic_post)
log.info('subscribe topic: %s' % topic_post)
c.subscribe(topic_set)
log.info('subscribe topic: %s' % topic_set)
# 发布消息
c.publish(topic_post, 'test publish')
log.info('Publish topic: %s, Msg: %s' % (topic_post, 'test publish'))
log.info('listen')
while True:
c.wait_msg()
# 阻塞函数,监听消息
else:
log.info('Network connection failed! stagecode = {}, subcode = {}'.format(stagecode, subcode))

运行现象


SIM插入、4G网络正常,正常运行的现象。

log会显示正常连接、订阅topic 和 publish。最后在 publish 一条消息后,一直处于监听状态。
在这里插入图片描述

  • 连接后能在管理页面看到设备在线。
    在这里插入图片描述
  • 测试下发控制消息
    在这里插入图片描述
  • 观察 V1.2 Demo板上的led - D6,收到下发的消息时的同时,LED亮灭状态也会发生变化。(led亮灭是脚本中自己加入的代码)
    在这里插入图片描述

注意事项

  • hmac 暂时有问题,需要等待固件更新,才可以让设备自行合成 passWord。
  • 接入第三方MQTT服务器,需要自行计算clientId、userName、passWord、brokerUrl 以及 topic,并替换。
  • umqtt的异常提醒很坑,如设备账号填错,会报错 bytes index out of range。而非重新连接。
  • 代码中如有 while,会导致与上位机的通信阻塞。重新下载脚本时,需要断开连接、复位、再重新连接。(如带有while 的脚本文件名为 main.py,则会上电自运行,只能通过重刷固件解决)

最后

以上就是冷艳星星为你收集整理的EC600 QuecPython接入第三方MQTT服务器, 以阿里云物联网平台为例,可替换为自己创建的MQTT服务器EC600 MQTT脚本运行现象注意事项的全部内容,希望文章能够帮你解决EC600 QuecPython接入第三方MQTT服务器, 以阿里云物联网平台为例,可替换为自己创建的MQTT服务器EC600 MQTT脚本运行现象注意事项所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(70)

评论列表共有 0 条评论

立即
投稿
返回
顶部