概述
前面试验通过了SIM7020C的http get 方法,将它封装为一个独立的类,今天试验通过了HTTP POST方法,sim7020.py的源代码。
import machine
import utime
import ubinascii
def ends_with(s, suffix):
l = len(suffix)
if len(s) >= l:
return s[-l::] == suffix
else:
return False
def get_client_id(resp):
id = -1
if 'rnOKrn' in resp:
lines = resp.split('rn')
lines = list(filter(None, lines)) #移除空串
for line in lines:
if line.startswith('+CHTTPCREATE:'):
print('found: ', line)
paras = line.replace('+CHTTPCREATE:', '').split(',')
id = int(paras[0])
break
return id
class SIM7020:
HTTP_GET = 0
HTTP_POST = 1
def __init__(self, uart, power_pin=14):
self.uart = uart
self.power_enable = machine.Pin(power_pin, machine.Pin.OUT)
def read_uart(self, timeout = 2000, fast_return = False):
ticks_start = utime.ticks_ms()
resp = b''
while utime.ticks_diff(utime.ticks_ms(), ticks_start) < timeout:
if self.uart.any():
line = self.uart.readline()
resp += line
#print("■■■■", utime.ticks_diff(utime.ticks_ms(), ticks_start), line)
if ends_with(resp, b'rnERRORrn'):
break
if fast_return and ends_with(resp, b'rnOKrn'):
break
ticks_start = utime.ticks_ms()
return resp
def exec_cmd(self, cmd, timeout=2000):
#print(">>>AT命令>>>", cmd)
self.uart.write((cmd + 'rn').encode())
resp = self.read_uart(timeout).decode()
#print("<<<<响应<<<<", resp)
return resp
def exec_cmd_fast(self, cmd):
self.uart.write((cmd + 'rn').encode())
resp = ""
try:
resp = self.read_uart(1000, True).decode()
except:
resp = "ERROR: UnicodeError"
return resp
def reboot(self, force = False):
if force:
self.power_enable.off()
utime.sleep(0.1)
self.power_enable.on()
utime.sleep(2)
for _ in range(5):
resp = self.exec_cmd_fast('AT')
if 'rnOKrn' in resp:
print("发现SIM7020C模块,重启成功")
break
else:
print('给SIM7020C发送AT指令,没有得到OK响应,重试……')
utime.sleep(1)
def get_signal_strength(self):
signal_strength = 0
resp = self.exec_cmd_fast("AT+CSQ")
if 'rnOKrn' in resp:
lines = resp.split('rn')
lines = list(filter(None, lines)) #移除空串
for line in lines:
if line.startswith('+CSQ:'):
paras = line.replace('+CSQ:', '').split(',')
signal_strength = int(paras[0])
break
return signal_strength
### 自检,测试几条AT指令的返回结果
def self_test(self):
self.exec_cmd_fast("ATE1")
resp = self.exec_cmd_fast("AT+CGMM;+CPIN?;+CGREG?;+COPS?")
return resp
def get_IP(self):
# +CGCONTRDP: 1,5,"cmnbiot","100.1.2.3.255.255.255.0"
resp = self.exec_cmd_fast("AT+CGCONTRDP")
paras = resp.split(',')
if len(paras) >= 4:
ip_and_mask = paras[3]
ip = ip_and_mask.split('.')
if len(ip) >= 4:
return '.'.join(ip[0:4])
return '0.0.0.0'
def is_ready(self):
ip = self.get_IP()
if ip != "0.0.0.0":
print("IP地址:", ip)
return True
return False
def http_get(self, url):
index_slash = url.find('/', 8) # 跳过http://或https:// 找下一个斜杠的位置
host = url[:index_slash]
path = url[index_slash:]
# CHTTPCREATE会返回一个ID,拿着这个ID,进行访问网页的手续操作
resp = self.exec_cmd_fast('AT+CHTTPCREATE="' + host + '"')
id = get_client_id(resp)
if id < 0:
print(resp)
return ""
print("■■■■client id: ", id)
self.exec_cmd_fast('AT+CHTTPCON=' + str(id))
resp = self.exec_cmd('AT+CHTTPSEND=' + str(id) + ',' + str(self.HTTP_GET) + ',"' + path + '"')
print("HTTP SEND的反馈:", resp)
self.exec_cmd_fast('AT+CHTTPDISCON=' + str(id))
self.exec_cmd_fast('AT+CHTTPDESTROY=' + str(id))
return resp
# +CHTTPSEND: (0-4),(0-3),"path","http header","http content type","http content"
def http_post(self, url, content, content_type = 'application/x-www-form-urlencoded'):
index_slash = url.find('/', 8) # 跳过http://或https:// 找下一个斜杠的位置
host = url[:index_slash]
print('host:' , host)
path = url[index_slash:]
print('path:' , path)
# CHTTPCREATE会返回一个ID,拿着这个ID,进行访问网页的手续操作
resp = self.exec_cmd_fast('AT+CHTTPCREATE="' + host + '"')
id = get_client_id(resp)
if id < 0:
print(resp)
return ""
print("■■■■client id: ", id)
self.exec_cmd_fast('AT+CHTTPCON=' + str(id))
http_header = b'Accept: */*rnConnection: Keep-AlivernUser-Agent: SIMCOM_MODULErn'
hex_http_header = ubinascii.hexlify(http_header).decode()
hex_content = ubinascii.hexlify(content).decode()
cmd = 'AT+CHTTPSEND=' + str(id) + ',' + str(self.HTTP_POST) + ',"' + path + '",'
+ hex_http_header + ',"'+ content_type + '",' + hex_content
print('CMD: ', cmd)
resp = self.exec_cmd(cmd, 5000)
print("HTTP SEND的反馈:", resp)
self.exec_cmd_fast('AT+CHTTPDISCON=' + str(id))
self.exec_cmd_fast('AT+CHTTPDESTROY=' + str(id))
return resp
简单说明一下几个函数:
1)self_test(),AT命令是可以一次发送多条的,中间用分号分隔即可
2)is_readey() 判断模块是否准备就绪,这里以它是否获得本地IP地址为准
3)http_get(),client_id不是硬编码为0,而是判断CHTTPCREATE的返回结果,这个client_id只能是0到4之间的整数,代码还没有加上try…except…,出现异常时,可能造成client_id不能关闭和释放,这个时候只能重启SIM7020C模块
4)http_post(),调用网页的POST方法,关键的一条AT命令是AT+CHTTPSEND: (0-4),(0-3),“path”,“http header”,“http content type”,“http content”,第一个参数是client_id,第二个参数是1,表示POST方法。header和content需要用十六进制字符进行编码
主程序是:
import sim7020
import utime
import ubinascii
uart = machine.UART(0, 115200, tx=machine.Pin(0), rx=machine.Pin(1), txbuf=1024, rxbuf=1024)
print(uart)
sim = sim7020.SIM7020(uart)
sim.reboot(force=True)
#sim.reboot()
def get_nmic(resp):
lines = resp.split('rn')
nmic = list(filter(None, lines))[-1]
if nmic.startswith('+CHTTPNMIC:'):
nmic = nmic.replace('+CHTTPNMIC:', '')
return nmic.split(',')[-1].strip()
else:
return ''
def hex2str(hex_str):
str_bin = ubinascii.unhexlify(hex_str)
return str_bin.decode()
while not sim.is_ready():
utime.sleep(1)
content = b'Time=2021-12-09 12:34:56&Ip=12.34.56.78&Type=%E7%83%9F%E7%81%AB%E4%BA%8B%E4%BB%B6&Authorization=Basic token_example_by_shenlongbin='
resp = sim.http_post('http://12.3.4.5:6789/shen_long_bin_test_page', content)
print(resp)
nmic = get_nmic(resp)
content = hex2str(nmic)
print(content)
在调用http_post()时需要注意几点:
1)准备一个支持POST的服务器网页
2)content type要填对,我测试的页面用的是:application/x-www-form-urlencoded
3)content是给网页传参数,开头不要写’?’,直接用key1=value1&key2=value2的格式
4)调用成功后 CHTTPNMIC里得到返回结果,我的测试程序是一个json字符串
http_get()和http_post()当前访问https协议时会报错,原因未知。
推荐阅读:
树莓派Pico开发系列文章
最后
以上就是典雅芒果为你收集整理的树莓派60/100 - Pico连接物联网NB-IoT,给http网页发送POST请求的全部内容,希望文章能够帮你解决树莓派60/100 - Pico连接物联网NB-IoT,给http网页发送POST请求所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复