概述
环境:
测试地点: 北京
测试主机: 公网测试主机
语言: python
目标:
计划用 python 实现 mqtt 接入
前期准备:
已经可以通过 web 控制台, 创建 topic
文档有 java/.net/c++ 的样例, 但是没有 python 的样例程序, 自己测试
问题:
在控制台无法看到 任何 消费者/生产者 连接
测试输出:
消费者输出
Connected with result code 0
生产者输出
Connection returned 0
publish success, msg = 2016-11-12 14:58:43.454239
测试:
下面是我写的测试连接程序
订阅
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
user = "user0001"
pwd = "user0001password"
mqtt_svr = "mqtt-test.cn-qingdao.aliyuncs.com"
port = 1883 # endpoint端口
topic = "testtopic" # 订阅的主题内容
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe(topic, qos=0)
def on_message(client, userdata, msg):
print("topic:" + msg.topic + " Message:" + str(msg.payload))
client = mqtt.Client(
client_id="CID_test0001",
clean_session=True,
userdata=None,
protocol='MQTTv31'
)
client.username_pw_set(user, pwd)
client.on_connect = on_connect
client.on_message = on_message
client.connect(mqtt_svr, port, 60)
client.loop_forever()
发布
import time
import paho.mqtt.client as mqtt
import datetime
def on_publish(msg, rc): # 成功发布消息的操作
if rc == 0:
print("publish success, msg = " + msg)
def on_connect(client, userdata, flags, rc): # 连接后的操作 0为成功
print("Connection returned " + str(rc))
client = mqtt.Client(
client_id="PID_test0001",
clean_session=True,
userdata=None,
protocol='MQTTv311'
)
user = "user0001"
pwd = "user0001password"
mqtt_svr = "mqtt-test.cn-qingdao.aliyuncs.com"
port = 1883
topic = "testtopic"
client.username_pw_set(user, pwd)
client.connect(endpoint, port, 60)
client.on_connect = on_connect
client.loop_start()
time.sleep(2)
count = 0
while count < 1:
count = count + 1
msg = str(datetime.datetime.now())
rc, mid = client.publish(topic, payload=msg, qos=0)
on_publish(msg, rc)
time.sleep(1)
最后
以上就是自由小蚂蚁为你收集整理的python实现mqtt_python 如何 实现 mqtt 接入阿里-问答-阿里云开发者社区-阿里云的全部内容,希望文章能够帮你解决python实现mqtt_python 如何 实现 mqtt 接入阿里-问答-阿里云开发者社区-阿里云所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复