概述
树莓派采集温湿度、震动传感器检测的声音、光照强度,并发送到云平台OneNET进行曲线绘制,保存传感器数据到树莓派本地
1、项目简介
2、实现逻辑
#通过i2c协议读取光照传感器bh1750检测的光照数据
#通过单总线协议获取温度传感器dht11检测的温湿度数据
#设置输入引脚,检测震动传感器由于震动导致的引脚跳变
#设置好onenet线上环境及ui页面,代码中设置好密钥等信息,按照固定格式将传感器数据发送到onenet
#设置保存间隔,打开本地的txt文件,将光照,温湿度等传感器数据带着保存那刻的时间戳存入本地路径
#各线程进行初始化配置及开始运行,做相应的线程卡死处理及最终硬件连接
3、应用场景
#远程室内环境检测
#开关门及开关灯检测(震动*光照)
#火灾及漏水检测
#绿植土壤干燥度检测
4、核心代码梳理
# 硬件连接 传感器 信号引脚 信号引脚 电源正 电源负
# BH1750光照采集相关程序
__DEV_ADDR=0x23 #BH1750地址
#BH1750控制字 具体含义和手册对照
__CMD_PWR_OFF=0x00 #关机
__CMD_PWR_ON=0x01 #开机
__CMD_RESET=0x07 #重置
__CMD_CHRES=0x10 #持续高分辨率检测
__CMD_CHRES2=0x11 #持续高分辨率模式2检测
__CMD_CLHRES=0x13 #持续低分辨率检测
__CMD_THRES=0x20 #一次高分辨率
__CMD_THRES2=0x21 #一次高分辨率模式2
__CMD_TLRES=0x23 #一次分辨率
__CMD_SEN100H=0x42 #灵敏度100%,高位
__CMD_SEN100L=0X65 #灵敏度100%,低位
__CMD_SEN50H=0x44 #50%
__CMD_SEN50L=0x6A #50%
__CMD_SEN200H=0x41 #200%
__CMD_SEN200L=0x73 #200%
bus = smbus.SMBus(1) #使用树莓派iic总线1
bus.write_byte(__DEV_ADDR, __CMD_PWR_ON) #向bh1750下发各种指令
bus.write_byte(__DEV_ADDR, __CMD_RESET)
bus.write_byte(__DEV_ADDR, __CMD_SEN100H)
bus.write_byte(__DEV_ADDR, __CMD_SEN100L)
bus.write_byte(__DEV_ADDR, __CMD_PWR_OFF)
def getIlluminance(): # 获取光照强度
bus.write_byte(__DEV_ADDR, __CMD_PWR_ON)
bus.write_byte(__DEV_ADDR, __CMD_THRES2)
time.sleep(0.2) # 稍微稳定一下
res = bus.read_word_data(__DEV_ADDR,0) # 读取光照强度数据
# read_word_data
res=((res>>8)&0xff)|(res<<8)&0xff00 # 数据处理
res=round(res/(2*1.2),2) # bh1750原始数据换算成物理光照强度 公式看手册
# result="光照强度: "+str(res)+"lx"
return res
def threadSOUND(): # 线程启动和保护
GPIO.setmode(GPIO.BOARD)
time.sleep(1)
# 设置声音校测引脚为输入
GPIO.setup(xx, GPIO.IN)
threadCOLECT = threading.Thread(target=colectSOUND)
threadCOLECT.start()
while True:
if not threadCOLECT.is_alive(): # 如果声音采集线程卡死 隔60s再次开启
threadCOLECT = threading.Thread(target=colectSOUND)
threadCOLECT.start()
time.sleep(60)
def colectSOUND(): # 硬件采集声音
global sound_status
while True:
if(sound_status == 0): # 没有声音/上一次检测结果发送完毕才进行下一次检测
while GPIO.input(xx) == GPIO.HIGH: # 如果是高电平就一直死循环
continue
sound_status = 1 # 物理低电平代表有声音
# time.sleep(1)
# print("colectSOUND")
# 发送和保存传感器数据相关程序 #
def threadDATAPRO(): # 线程数据处理
threaddatapro = threading.Thread(target=dataCheck)
threaddatapro.start()
while True:
if not threaddatapro.is_alive(): # 如果发送和保存线程卡死 隔60s再次开启
threaddatapro = threading.Thread(target=dataCheck)
threaddatapro.start()
time.sleep(60)
def dataCheck(): # 检测
global sound_status
global tem
global hum
global lux
sound_status = 0
while True:
hum_temp, tem_temp = Adafruit_DHT.read_retry(xx, xx) # 参数里面 xx是指DHT11 xx 指gpioxx 在树莓派上是pinxx
if ((hum_temp < 100) and (tem_temp < 100)):
tem_temp = '{:0.1f}'.format(tem_temp) # 保留小数点后1位
hum_temp = '{:0.1f}'.format(hum_temp)
tem = float(tem_temp) # 把字符串转换为float类型
hum = float(hum_temp) # 把字符串转换为float类型
lux = getIlluminance()
time.sleep(1)
#print("SendSave")
def dataSave(): # 保存
global sound_status
global tem
global hum
global lux
# 数据保存
CurTime = datetime.datetime.now() # 获取当前时间
file_txt = open('/home/pi/xx.txt', 'a', encoding='utf-8') # 'a’是接续写入,在不删除之前文本内容的前提下,写入数据
file_txt.write('ntime:' + str(CurTime) + 'ttem:' + str(tem) + 'thum:' + str(hum) + 'tsound:' + str(
sound_status) + 'tlux:' + str(lux) + 'n') # 写入操作
file_txt.close() # 关闭文件
def dataSend(): # 发送
global sound_status
global tem
global hum
global lux
# 数据发送
resp = http_put()
sound_status = 0 # 开始下一周期声音检测
print("OneNET result:n %s" % resp)
# onenet云平台通信相关程序 #
def http_put():
global sound_status
global tem
global hum
global lux
ssl._create_default_https_context = ssl._create_unverified_context
# 当使用urllib.urlopen一个 https 的时候会验证一次 SSL证书 通过导入ssl模块把证书验证改成不用验证
url = 'http://api.heclouds.com/devices/xxxxxxxx/datapoints'
print("the tem is: %d" %tem)
print("the hum is: %d" %hum)
print("the sound is: %d" %sound_status)
print("the lux is: %d" %lux)
values = {"datastreams":[{"id":"TEM","datapoints":[{"value":tem}]},{"id":"HUM","datapoints":[{"value":hum}]},{"id":"SOUND","datapoints":[{"value":sound_status}]},{"id":"LUX","datapoints":[{"value":lux}]}]}
jdata = json.dumps(values).encode("utf-8")
#print(jdata)
request = urllib.request.Request(url, jdata)
request.add_header('api-key', APIKEY)
request.get_method = lambda: 'POST'
request = urllib.request.urlopen(request)
return request.read()
if __name__ == '__main__':
threadCollectSound = threading.Thread(target=threadSOUND) # 创建线程
threadCollectSound.setDaemon(True) # 守护线程
threadCollectSound.start() # 启动线程
threadDataPro = threading.Thread(target=threadDATAPRO)
threadDataPro.setDaemon(True) # 守护线程
threadDataPro.start()
schedule.every(1).seconds.do(dataSave) # 每几秒保存一次数据 注意:发送和保存的时间尽量不要一样,否则容易冲突
schedule.every(xx).seconds.do(dataSend) # 每几秒发送一次
while True:
schedule.run_pending() #定时器,在while True死循环中,schedule.run_pending()是保持schedule一直运行,
# 去查询上面那一堆的任务,在任务中,就可以设置不同的时间去运行
time.sleep(1)
5、部分参考资料
#dht11参考
https://www.cnblogs.com/hilary0614/p/dht11.html
#bh1750参考
https://www.cnblogs.com/sirius-swu/p/6682746.html
注意事项
#出现 urllib2.URLError: urlopen error [Errno -3] Temporary failure in name resolution解决方法参考:
http://www.mamicode.com/info-detail-2549054.html(设置静态ip和dns)
#出现 http.client remotedisconnected:remote end closed connection without respone原因:太频繁和onenet通信,被onenet认为是爬虫,被关闭远端连接,重新运行即可
#文件保存在同路径 dataSave.txt 里面
#注意上电刚开始可能定时不准确,过几分钟一般会好些
完整可运行项目地址
或 点击下方”大饼匠人“卡片,关注并回复"1"免费下载开发资料
最后
以上就是温婉大白为你收集整理的技能梳理1@树莓派3B+bh1750+震动+dht11+onenet+本地保存的全部内容,希望文章能够帮你解决技能梳理1@树莓派3B+bh1750+震动+dht11+onenet+本地保存所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复