概述
一、硬件设备
1.树莓派CM4 Sensing(包含485接口)
2.带485接口的继电器
继电器的mdobus指令
3.水质仪
水质仪线路需要改造一下
下图中1是水质仪自带的线,2是给水质仪提供电源的线(是usb口,可以插谁没派上)
绿色的转接口可以不用,但是要自己接线
四根跳线从左到右分别接的是:电源+,485a口,485b口,地
4.灯管任意
水质仪和继电器说明文档
链接:https://pan.baidu.com/s/1jfdBehoNU8kc6Xgh9SiOLg
提取码:vs5j
二、阿里云配置
1.存储水质数据和灯光状态
进入阿里云物联网平台创建一个产品,注意所属品类要选自定义品类
点击查看刚才添加的产品,接着进入功能定义模块,点击编辑草稿
输入以下数据,也可以直接导入我的模板
链接:https://pan.baidu.com/s/1PcDY_eHNmCQGl3FU-yeiUg
提取码:w9g9
点击发布上线
接着进入设备,选择添加设备,选择刚添加的产品
查看刚才添加的设备,进入后点击 DeviceSecret 右边的查看,记住下面三个参数
2.灯光控制
进入增值服务
进入免费的应用与开发
先项目管理里面新建一个项目,再到应用开发的业务逻辑下新建一个空白的业务服务
选择定时触发,每天的早上8点整
第一个节点设置
第二个节点设置
三、代码
1.plant_factory.py
# coding=utf-8
import aliLink
import schedule
import time,json
import datetime
import os
import mqttd
import water_quality
import light
# 开机自启动需要 等一会不然会报错(可能是开机没联网)
time.sleep(120)
# 三元素(iot后台获取)
ProductKey = 你设备的ProductKey
DeviceName = 你设备的DeviceName
DeviceSecret = 你设备的DeviceSecret
# topic (iot后台获取)
POST = '/sys/你设备的ProductKey/你设备的DeviceName/thing/event/property/post' # 上报消息到云
SET = ' /sys/你设备的ProductKey/你设备的DeviceName/thing/service/property/set' # 订阅云端指令
# 消息回调(云端下发消息的回调函数)
def on_message(client, userdata, msg):
print(msg.payload)
Msg = json.loads(msg.payload)
switch = Msg['params']['LEDSwitch']
if (switch == 1):
light.open()
# print("light.open")
else:
light.stop()
# print("light.stop")
# 连接回调(与阿里云建立链接后的回调函数)
def on_connect(client, userdata, flags, rc):
mqtt.subscribe(SET) # 订阅服务器下发消息topic
# 链接信息
Server,ClientId,userNmae,Password = aliLink.linkiot(DeviceName,ProductKey,DeviceSecret)
# mqtt链接
mqtt = mqttd.MQTT(Server,ClientId,userNmae,Password)
mqtt.begin(on_message,on_connect)
# 检查网络,断网重启wifi
def restartWifi():
status = os.system("ping -w 3 www.baidu.com")
if status == 1:
# 关闭wifi
os.system('sudo ip link set wlan0 down')
# 开启wifi
os.system('sudo ip link set wlan0 up')
else:
print('已经连上网了')
return status
def read():
try:
# 读水质仪
room_temperature,room_humidity,water_temperature,TDS,PH,EC = water_quality.read()
# room_temperature, room_humidity, water_temperature, TDS, PH, EC = 18.0, 66.0, 20.8, 367, 7.9, 667
# 读灯的状态
LEDSwitch = light.read()
# LEDSwitch = 1
update_msg = {
'LEDSwitch': LEDSwitch,
'room_temperature': room_temperature,
'room_humidity': room_humidity,
'water_temperature': water_temperature,
'TDS': TDS,
'PH': PH,
'EC': EC
}
print(datetime.datetime.now())
JsonUpdataMsn = aliLink.Alink(update_msg)
print(JsonUpdataMsn)
mqtt.push(POST, JsonUpdataMsn) # 定时向阿里云IOT推送我们构建好的Alink协议数据
except Exception as re:
print("plant_factory_err: ", re)
restartWifi()
# 读取环境数据并推送到阿里云ito
schedule.every(30).minutes.do(read)
# schedule.every(10).seconds.do(read)
# 检查网络
schedule.every(15).minutes.do(restartWifi)
# 每天07:55 和 19:55 检查一次网络
schedule.every().day.at("07:55").do(restartWifi)
schedule.every().day.at("17:55").do(restartWifi)
while True:
schedule.run_pending()
time.sleep(1)
2.mqttd.py
# coding=utf-8
# pip install paho-mqtt
import paho.mqtt.client
# =====初始化======
class MQTT():
def __init__(self,host,CcientID,username=None,password=None,port=1883,timeOut=60):
self.Host = host
self.Port = port
self.timeOut = timeOut
self.username =username
self.password = password
self.CcientID = CcientID
self.mqttc = paho.mqtt.client.Client(self.CcientID) #配置ID
if self.username is not None: #判断用户名密码是否为空
self.mqttc.username_pw_set(self.username, self.password) #不为空则配置账号密码
self.mqttc.reconnect_delay_set()
self.mqttc.connect(self.Host, self.Port, self.timeOut) #初始化服务器 IP 端口 超时时间
# 初始化
def begin(self,message,connect):
self.mqttc.on_connect = connect
self.mqttc.on_message = message
self.mqttc.loop_start() # 后台新进程循环监听
# =====发送消息==========
def push(self,tag,date,_Qos = 2):
info = self.mqttc.publish(tag,date,_Qos)
# print("push", info.is_published())
#print('OK',date)
# =======订阅tips=====
def subscribe(self,_tag):
self.mqttc.subscribe(_tag) #监听标签
# 判断是否在线
def is_connected(self):
flag = self.mqttc.is_connected()
return flag
# 重连
def reconnect(self):
self.mqttc.reconnect()
3.water_quality.py
# coding=utf-8
import serial
##############################################################
# 485接口水质监测仪数据读取
#
##############################################################
#连接水质仪器485端口
s=serial.Serial('/dev/ttyAMA4',4800,timeout=1)
# 数据可读化
def readability(date):
room_temperature = float(int(date[6:10], 16)/10)
room_humidity = float(int(date[10:14], 16)/10)
water_temperature = float(int(date[14:18], 16)/10)
TDS = int(date[18:22], 16)
PH = float(int(date[22:26], 16)/10)
EC = int(date[26:30], 16)
# update_msg = {
# 'room_temperature': room_temperature,
# 'room_humidity': room_humidity,
# 'water_temperature': water_temperature,
# 'TDS': TDS,
# 'PH': PH,
# 'EC': EC
# }
return room_temperature,room_humidity,water_temperature,TDS,PH,EC
def read():
# 这是发送的命令,
d = bytes.fromhex('01 03 00 00 00 06 C5 C8')
print("I'm writing water_quality:")
# 读水质仪状态偶尔会出问题,重复读10此水质仪器的数据
for i in range(10):
s.write(d)
res = s.read(17)
str_return_data = str(res.hex())
# 数据正常,就返回正确的数据
if len(str_return_data) == 34 and str_return_data[0:4] == "0103":
print("I'm reading water_quality:", str_return_data)
return readability(str_return_data)
print("water_quality err:")
return ""
4.light.py
# coding=utf-8
import serial
import time
#连接继电器485端口
s=serial.Serial('/dev/ttyAMA2',9600,timeout=1)
def open():
for i in range(10):
# 这是发送的命令,控制第 4 路开。
d = bytes.fromhex('FE 05 00 03 FF 00 68 35')
s.write(d)
time.sleep(1)
if light_state() == 1:
print("I'm opening")
break
def stop():
for i in range(10):
# 这是发送的命令,控制第 4 路关。
d = bytes.fromhex('FE 05 00 03 00 00 29 C5')
s.write(d)
time.sleep(1)
if light_state() == 0:
print("I'm stopping")
break
# 查看继电器第4路开关状态 开返回1 关换回0
def light_state():
d = bytes.fromhex('FE 01 00 00 00 04 29 C6')
print("I'm writing light:")
s.write(d)
res = s.read(6)
str_return_data = str(res.hex())
# 01表示正常
if len(str_return_data) != 12 or str_return_data[2:4] != '01':
return ''
print("I'm reading light:", str_return_data)
# 6和7位表示电路状态的16位返回值,转换成 2 进制数据,0表示关,1 表示开。例:对 4 路继电器若返回字段为 08,转换成 2 进制后为 1000,则第 1、2、3 路关,第 4 路开。
state_number = str_return_data[6:8]
# 16进制转2进制
b = bin(int(state_number, 16))[2:]
return int(b[0])
def read():
# 读继电器状态偶尔会出问题,所以读10次
for i in range(10):
LEDSwitch = light_state()
if LEDSwitch != "":
return LEDSwitch
print("reading light err")
return ""
最后
以上就是单身香烟为你收集整理的树莓派CM4 Sensing(包含485接口)+python+继电器+水质仪+阿里云物联网平台ito实现实时检测水质并上传数据到阿里云ito和远程控制灯光一、硬件设备二、阿里云配置三、代码的全部内容,希望文章能够帮你解决树莓派CM4 Sensing(包含485接口)+python+继电器+水质仪+阿里云物联网平台ito实现实时检测水质并上传数据到阿里云ito和远程控制灯光一、硬件设备二、阿里云配置三、代码所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复