概述
zigbee+树莓派网关,控制终端亮灭,终端上传温度数据到onenet云平台
要求:
设计中要求有系统功能、网关硬件结构设计、网关软件功能构成。
画出网关功能整体处理流程框架。
数据北向通信规约解析模块设计说明(与云平台通信功能模块设计说明)
数据南向通信规约解析模声设计说明(与ZigBee网络通信功能模块设计说明)
数据持久化存储设计说明。
配置文件格式规格说明。
各功能模块以类的方式进行设计,并写明每一功能的基本描述、设计思路、接口。
串口包
import serial # 导入串口包
import time # 导入时间包
import threading
import random
ser = serial.Serial("COM12", 115200, timeout=5) # 开启com3口,波特率115200,超时5
ser.flushInput() # 清空缓冲区
def uart_read():
while True:
count = ser.inWaiting() # 获取串口缓冲区数据
if count != 0:
res = ser.read(ser.in_waiting).decode("utf-8") # 读出串口数据,数据采用gbk编码
print('串口向上位机位机发送数据为:' + res) # 打印一下子
time.sleep(0.1) # 延时1秒,免得CPU出问题
return res # 返回读取的数
def uart_write(cmd):
print('串口向下位机发送命令为:'+cmd)
if cmd == '23':
ser.write(b'23') # 开
elif cmd == '24':
ser.write(b'24') # 关
elif cmd == '13':
ser.write(b'13')
elif cmd == '14':
ser.write(b'14')
数据库包
import sqlite3
def sql_create_db():
con = sqlite3.connect('value.db')
c = con.cursor()
# cursor()光标
sql_str = '''CREATE TABLE VALUE2(
time2 varchar(255),
value2 decimal(3,2)
);
'''
c.execute(sql_str)
# 插入语句
con.commit()
# 提交
con.close()
def sql_insert1(time1,value1):
con = sqlite3.connect('value.db')
c = con.cursor()
sql_str = '''INSERT INTO VALUE1 VALUES(''' + str(time1) + ''',''' + str(value1) + ''');'''
print(sql_str)
c.execute(sql_str)
con.commit()
con.close()
def sql_insert2(time2,value2):
con = sqlite3.connect('value.db')
c = con.cursor()
sql_str = '''INSERT INTO VALUE2 VALUES(''' + str(time2) + ''',''' + str(value2) + ''');'''
print(sql_str)
c.execute(sql_str)
con.commit()
con.close()
def sql_select1():
con = sqlite3.connect('value.db')
c = con.cursor()
sql_select = "SELECT * FROM VALUE1;"
print('查询数据:')
c.execute(sql_select)
for row in c:
print(row)
con.close()
def sql_select2():
con = sqlite3.connect('value.db')
c = con.cursor()
sql_select = "SELECT * FROM VALUE2;"
print('查询数据:')
c.execute(sql_select)
for row in c:
print(row)
con.close()
主函数(要同时运行mqtt_main)
from __future__ import print_function
import time
import paho.mqtt.client as mqtt
#修改成自己的即可
DEV_ID = "773948849" #设备ID
PRO_ID = "455579" #产品ID
AUTH_INFO = "TVqBms9=ltL2ZbiOPpq3YEVwYGw=" #APIKEY
TYPE_JSON = 0x01
TYPE_FLOAT = 0x17
while True:
client = mqtt.Client(client_id=DEV_ID, protocol=mqtt.MQTTv311)
client.username_pw_set(username=PRO_ID, password=AUTH_INFO)
client.connect('183.230.40.39', port=6002, keepalive=120) # 端口、ip地址、生存期
client.loop_forever()
time.sleep(1)
连接mqtt函数包:mqtt_main()
from __future__ import print_function
import _thread
import threading
import time
import serial # 导入串口包
import paho.mqtt.client as mqtt
import struct
import json
import uart
import link_db
# 修改成自己的即可
DEV_ID = "773948849" # 设备ID
PRO_ID = "455579" # 产品ID
AUTH_INFO = "TVqBms9=ltL2ZbiOPpq3YEVwYGw=" # APIKEY
TYPE_JSON = 0x01
TYPE_FLOAT = 0x17
# 定义上传数据的json格式 该格式是oneNET规定好的 按格式修改其中变量即可
# 判断不同设备通过串口发送的数据
value1 = uart.uart_read()
body = {}
semaphore = threading.Semaphore(0)
def value_1():
while True:
time.sleep(2)
localtime = time.time()
global value1
global body
value1 = uart.uart_read()
value = value1
if value[0:2] == 'A0': # 判断数据发送的设备
value_sendA0 = float(value[2:11])
body = {
"datastreams": [
{
"id": "temperatureA0", # 对应OneNet的数据流名称
"datapoints": [
{
"value": value_sendA0 # 数据值
}
]
}
]
}
#print("value_sendA0%.2f" % value_sendA0)
elif value[0:7] == 'tmpe1: ':
value_sendB0 = float(value[7:11])
body = {
"datastreams": [
{
"id": "temperature01", # 对应OneNet的数据流名称
"datapoints": [
{
"value": value_sendB0 # 数据值
}
]
}
]
}
link_db.sql_insert1(localtime, value_sendB0)
print("接受的数据%.1f" % value_sendB0)
elif value[0:7] == 'tmpe2: ':
value_sendC0 = float(value[7:11])
body = {
"datastreams": [
{
"id": "temperature", # 对应OneNet的数据流名称
"datapoints": [
{
"value": value_sendC0 # 数据值
}
]
}
]
}
link_db.sql_insert2(localtime, value_sendC0)
print("接受的数据%.1f"%value_sendC0)
else:
value_send = float(value)
body = {
"datastreams": [
{
"id": "temperature", # 对应OneNet的数据流名称
"datapoints": [
{
"value": value_send # 数据值
}
]
}
]
}
print("接受的数据(%.1f)" % value_send)
semaphore.release()
def build_payload(type, payload):
datatype = type
packet = bytearray()
packet.extend(struct.pack("!B", datatype))
if isinstance(payload, str):
udata = payload.encode('utf-8')
length = len(udata)
packet.extend(struct.pack("!H" + str(length) + "s", length, udata))
return packet
# 当客户端收到来自服务器的CONNACK响应时的回调。也就是申请连接,服务器返回结果是否成功等
def on_connect(client, userdata, flags, rc):
print("连接结果:" + mqtt.connack_string(rc))
# 上传数据
global body
json_body = json.dumps(body)
print(json_body)
packet = build_payload(TYPE_JSON, json_body)
client.publish("$dp", packet, qos=1) # qos代表服务质量
# 从服务器接收发布消息时的回调。
def on_message(client, userdata, msg):
print("*******接受消息************")
print("下发命令:" + str(msg.payload, 'utf-8') + "°C")
cmd = str(msg.payload, 'utf-8')
uart.uart_write(cmd)
# 当消息已经被发送给中间人,on_publish()回调将会被触发
def on_publish(client, userdata, mid):
print("回调次数" + str(mid))
def mqtt_up_main():
semaphore.acquire()
client = mqtt.Client(client_id=DEV_ID, protocol=mqtt.MQTTv311)
client.on_connect = on_connect
client.on_publish = on_publish
client.on_message = on_message
client.username_pw_set(username=PRO_ID, password=AUTH_INFO)
client.connect('183.230.40.39', port=6002, keepalive=120) # 端口、ip地址、生存期
client.loop_forever()
time.sleep(1)
if __name__ == '__main__':
while True:
t1 = threading.Thread(target=value_1, args=())
t2 = threading.Thread(target=mqtt_up_main, args=())
t1.start()
t2.start()
t1.join()
t2.join()
time.sleep(3)
文件命名格式
zigbee文件+python源代码https://download.csdn.net/download/ChinatheR/59323165
最后
以上就是甜甜机器猫为你收集整理的zigbee+树莓派网关,控制终端亮灭,终端上传温度数据到onenet云平台的全部内容,希望文章能够帮你解决zigbee+树莓派网关,控制终端亮灭,终端上传温度数据到onenet云平台所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复