我是靠谱客的博主 典雅芒果,最近开发中收集的这篇文章主要介绍树莓派60/100 - Pico连接物联网NB-IoT,给http网页发送POST请求,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前面试验通过了SIM7020C的http get 方法,将它封装为一个独立的类,今天试验通过了HTTP POST方法,sim7020.py的源代码。

import machine
import utime
import ubinascii


def ends_with(s, suffix):
    l = len(suffix)
    if len(s) >= l:
        return s[-l::] == suffix
    else:
        return False


def get_client_id(resp):
    id = -1
    if 'rnOKrn' in resp:
        lines = resp.split('rn')
        lines = list(filter(None, lines)) #移除空串
        for line in lines:
            if line.startswith('+CHTTPCREATE:'):
                print('found: ', line)
                paras = line.replace('+CHTTPCREATE:', '').split(',')
                id = int(paras[0])
                break

    return id


class SIM7020:
    HTTP_GET = 0
    HTTP_POST = 1
    
    def __init__(self, uart, power_pin=14):
        self.uart = uart
        self.power_enable = machine.Pin(power_pin, machine.Pin.OUT)


    def read_uart(self, timeout = 2000, fast_return = False):
        ticks_start = utime.ticks_ms()
        resp = b''
        while utime.ticks_diff(utime.ticks_ms(), ticks_start) < timeout:
            if self.uart.any():
                line = self.uart.readline()
                resp += line
                #print("■■■■", utime.ticks_diff(utime.ticks_ms(), ticks_start), line)
                if ends_with(resp, b'rnERRORrn'):
                    break
                if fast_return and ends_with(resp, b'rnOKrn'):
                    break
                ticks_start = utime.ticks_ms()
                
                   
        return resp

    def exec_cmd(self, cmd, timeout=2000):
        #print(">>>AT命令>>>", cmd)
        self.uart.write((cmd + 'rn').encode())
        resp = self.read_uart(timeout).decode()
        #print("<<<<响应<<<<", resp)
        return resp


    def exec_cmd_fast(self, cmd):
        self.uart.write((cmd + 'rn').encode())
        resp = ""
        try:
            resp = self.read_uart(1000, True).decode()
            
        except:
            resp = "ERROR: UnicodeError"
            
        return resp


    def reboot(self, force = False):
        if force:
            self.power_enable.off()
            utime.sleep(0.1)
        
        self.power_enable.on()
        utime.sleep(2)
        for _ in range(5):
            resp = self.exec_cmd_fast('AT')
            if 'rnOKrn' in resp:
                print("发现SIM7020C模块,重启成功")
                break
            else:
                print('给SIM7020C发送AT指令,没有得到OK响应,重试……')
                utime.sleep(1)
        

    def get_signal_strength(self):
        signal_strength = 0
        resp = self.exec_cmd_fast("AT+CSQ")
        if 'rnOKrn' in resp:
            lines = resp.split('rn')
            lines = list(filter(None, lines)) #移除空串
            for line in lines:
                if line.startswith('+CSQ:'):
                    paras = line.replace('+CSQ:', '').split(',')
                    signal_strength = int(paras[0])
                    break

        return signal_strength 

    ### 自检,测试几条AT指令的返回结果
    def self_test(self):
        self.exec_cmd_fast("ATE1")
        resp = self.exec_cmd_fast("AT+CGMM;+CPIN?;+CGREG?;+COPS?")
        return resp 

    def get_IP(self):
        # +CGCONTRDP: 1,5,"cmnbiot","100.1.2.3.255.255.255.0"
        resp = self.exec_cmd_fast("AT+CGCONTRDP") 
        paras = resp.split(',')
        if len(paras) >= 4: 
            ip_and_mask = paras[3]
            ip = ip_and_mask.split('.')
            if len(ip) >= 4:
                return '.'.join(ip[0:4])
        return '0.0.0.0'


    def is_ready(self):
        ip = self.get_IP()
        if ip != "0.0.0.0":
            print("IP地址:", ip)
            return True
        return False


    def http_get(self, url):
        index_slash = url.find('/', 8)  # 跳过http://或https:// 找下一个斜杠的位置
        host = url[:index_slash]
        path = url[index_slash:]

        # CHTTPCREATE会返回一个ID,拿着这个ID,进行访问网页的手续操作
        resp = self.exec_cmd_fast('AT+CHTTPCREATE="' + host + '"')
        id = get_client_id(resp)
        if id < 0:
            print(resp)
            return ""
        print("■■■■client id: ", id)
        
        self.exec_cmd_fast('AT+CHTTPCON=' + str(id))
        
        resp = self.exec_cmd('AT+CHTTPSEND=' + str(id) + ',' + str(self.HTTP_GET) + ',"' + path + '"')
        print("HTTP SEND的反馈:", resp)
        
        
        self.exec_cmd_fast('AT+CHTTPDISCON=' + str(id))    
        self.exec_cmd_fast('AT+CHTTPDESTROY=' + str(id))
        return resp


    # +CHTTPSEND: (0-4),(0-3),"path","http header","http content type","http content"
    def http_post(self, url, content, content_type = 'application/x-www-form-urlencoded'):
        index_slash = url.find('/', 8)  # 跳过http://或https:// 找下一个斜杠的位置
        host = url[:index_slash]
        print('host:' , host)
        path = url[index_slash:]
        print('path:' , path)

        # CHTTPCREATE会返回一个ID,拿着这个ID,进行访问网页的手续操作
        resp = self.exec_cmd_fast('AT+CHTTPCREATE="' + host + '"')
        id = get_client_id(resp)
        if id < 0:
            print(resp)
            return ""
        print("■■■■client id: ", id)

        self.exec_cmd_fast('AT+CHTTPCON=' + str(id))
        
        http_header = b'Accept: */*rnConnection: Keep-AlivernUser-Agent: SIMCOM_MODULErn'
        hex_http_header = ubinascii.hexlify(http_header).decode()
        hex_content = ubinascii.hexlify(content).decode()
        
        cmd = 'AT+CHTTPSEND=' + str(id) + ',' + str(self.HTTP_POST) + ',"' + path + '",' 
                              + hex_http_header + ',"'+ content_type + '",' + hex_content
        print('CMD:    ', cmd)
        resp = self.exec_cmd(cmd, 5000)
        
        print("HTTP SEND的反馈:", resp)
        
        self.exec_cmd_fast('AT+CHTTPDISCON=' + str(id))    
        self.exec_cmd_fast('AT+CHTTPDESTROY=' + str(id))
        return resp

简单说明一下几个函数:
1)self_test(),AT命令是可以一次发送多条的,中间用分号分隔即可
2)is_readey() 判断模块是否准备就绪,这里以它是否获得本地IP地址为准
3)http_get(),client_id不是硬编码为0,而是判断CHTTPCREATE的返回结果,这个client_id只能是0到4之间的整数,代码还没有加上try…except…,出现异常时,可能造成client_id不能关闭和释放,这个时候只能重启SIM7020C模块
4)http_post(),调用网页的POST方法,关键的一条AT命令是AT+CHTTPSEND: (0-4),(0-3),“path”,“http header”,“http content type”,“http content”,第一个参数是client_id,第二个参数是1,表示POST方法。header和content需要用十六进制字符进行编码

主程序是:

import sim7020
import utime
import ubinascii

uart = machine.UART(0, 115200, tx=machine.Pin(0), rx=machine.Pin(1), txbuf=1024, rxbuf=1024)
print(uart)

sim = sim7020.SIM7020(uart)
sim.reboot(force=True)
#sim.reboot()

def get_nmic(resp):
    lines = resp.split('rn')
    nmic = list(filter(None, lines))[-1]
    if nmic.startswith('+CHTTPNMIC:'):
        nmic = nmic.replace('+CHTTPNMIC:', '')
        return nmic.split(',')[-1].strip()
    else:
        return ''


def hex2str(hex_str):
    str_bin = ubinascii.unhexlify(hex_str)
    return str_bin.decode()


while not sim.is_ready():
    utime.sleep(1)

content = b'Time=2021-12-09 12:34:56&Ip=12.34.56.78&Type=%E7%83%9F%E7%81%AB%E4%BA%8B%E4%BB%B6&Authorization=Basic token_example_by_shenlongbin='
resp = sim.http_post('http://12.3.4.5:6789/shen_long_bin_test_page', content)
print(resp)
nmic = get_nmic(resp)
content = hex2str(nmic)
print(content)

在调用http_post()时需要注意几点:
1)准备一个支持POST的服务器网页
2)content type要填对,我测试的页面用的是:application/x-www-form-urlencoded
3)content是给网页传参数,开头不要写’?’,直接用key1=value1&key2=value2的格式
4)调用成功后 CHTTPNMIC里得到返回结果,我的测试程序是一个json字符串

http_get()和http_post()当前访问https协议时会报错,原因未知。

推荐阅读:
树莓派Pico开发系列文章

最后

以上就是典雅芒果为你收集整理的树莓派60/100 - Pico连接物联网NB-IoT,给http网页发送POST请求的全部内容,希望文章能够帮你解决树莓派60/100 - Pico连接物联网NB-IoT,给http网页发送POST请求所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(53)

评论列表共有 0 条评论

立即
投稿
返回
顶部