Micropython学习交流群 学习QQ群:786510434 提供多种固件下载和学习交流。

Micropython-扇贝物联 QQ群:31324057 扇贝物联是一个让你与智能设备沟通更方便的物联网云平台

Micropython学习交流群 学习QQ群:468985481 学习交流ESP8266、ESP32、ESP8285、wifi模块开发交流、物联网。

Micropython老哥俩的IT农场分享QQ群:929132891 为喜欢科创制作的小白们分享一些自制的计算机软硬件免费公益课程,由两位多年从事IT研发的中年大叔发起。

Micropython ESP频道

micropython 分享一个HttpClient库 支持分段下载大文件


main.py

from uhttp import requests
import socket
import threading
import network
import time


def do_connect():
    wlan = network.WLAN(network.STA_IF)  # 以工作站 (wlan) 模式运行,需要创建一个工作站Wi-Fi接口的实例
    wlan.active(True)                    # 在工作站对象上调用激活方法并以True作为输入值传递来激活网络接口
    start_time=time.time()               # 记录开始时间
      
    if not wlan.isconnected():              # 如果尚未联网成功
        print("当前无线未联网,正在连接中....")  
        wlan.connect("NBWIFI", "xxxxx")   # 无线网SSID、密码,开始联网
        while not wlan.isconnected():         # 如果还未连接成功,则LED灯闪烁提示
          time.sleep_ms(1000) 
          print("正在尝试连接到wifi....")
          print(time.time())
          
          if time.time()-start_time>15:       # 如果超过15秒还不行,就退出
            print("连接失败!!!无线网连接超过15秒,请检查无线网名称和密码是否正确..")
            break
            
    if wlan.isconnected():                  # 如果联接成功
        IP_info=wlan.ifconfig()    
        print("无线网已经连接,信息如下:")
        print("IP地址:"+IP_info[0])
        global serverip 
        serverip=IP_info[0]
        print("子网掩码:"+IP_info[1])
        print("网关:"+IP_info[2])
        print("DNS:"+IP_info[3])
        ip_address = wlan.ifconfig()[0]
        print("ip_address "+ip_address)




do_connect()


import os, gc
from libs.httpclient import HttpClient
http_client = HttpClient(headers={})

#获取网页内容
print(http_client.get("http://www.xxxxx.cn/api/r1.php").text)
#下载文件
http_client.get('http://www.esp56.com/api/einkweather/style1.php', saveToFile="123.bmp")
try:
    f = open("123.bmp", "r")
    print("下载完成")
    # continue with the file.
except OSError:  # open failed
    print("下载失败")
   # handle the file open case



httpclient.py

import usocket, os, gc
class Response:

    def __init__(self, socket, saveToFile=None):
        self._socket = socket
        self._saveToFile = saveToFile
        self._encoding = 'utf-8'
        if saveToFile is not None:
            CHUNK_SIZE = 512 # bytes
            with open(saveToFile, 'w') as outfile:
                data = self._socket.read(CHUNK_SIZE)
                while data:
                    outfile.write(data)
                    data = self._socket.read(CHUNK_SIZE)
                outfile.close()
                
            self.close()

    def close(self):
        if self._socket:
            self._socket.close()
            self._socket = None

    @property
    def content(self):
        if self._saveToFile is not None:
            raise SystemError('You cannot get the content from the response as you decided to save it in {}'.format(self._saveToFile))

        try:
            result = self._socket.read()
            return result
        finally:
            self.close()

    @property
    def text(self):
        return str(self.content, self._encoding)

    def json(self):
        try:
            import ujson
            result = ujson.load(self._socket)
            return result
        finally:
            self.close()


class HttpClient:

    def __init__(self, headers={}):
        self._headers = headers

    def is_chunked_data(data):
        return getattr(data, "__iter__", None) and not getattr(data, "__len__", None)

    def request(self, method, url, data=None, json=None, file=None, custom=None, saveToFile=None, headers={}, stream=None):
        chunked = data and self.is_chunked_data(data)
        redirect = None #redirection url, None means no redirection
        def _write_headers(sock, _headers):
            for k in _headers:
                sock.write(b'{}: {}\r\n'.format(k, _headers[k]))

        try:
            proto, dummy, host, path = url.split('/', 3)
        except ValueError:
            proto, dummy, host = url.split('/', 2)
            path = ''
        if proto == 'http:':
            port = 80
        elif proto == 'https:':
            import ussl
            port = 443
        else:
            raise ValueError('Unsupported protocol: ' + proto)

        if ':' in host:
            host, port = host.split(':', 1)
            port = int(port)

        ai = usocket.getaddrinfo(host, port, 0, usocket.SOCK_STREAM)
        if len(ai) < 1:
            raise ValueError('You are not connected to the internet...')
        ai = ai[0]

        s = usocket.socket(ai[0], ai[1], ai[2])
        try:
            s.connect(ai[-1])
            if proto == 'https:':
                gc.collect()
                s = ussl.wrap_socket(s, server_hostname=host)
            s.write(b'%s /%s HTTP/1.0\r\n' % (method, path))
            if not 'Host' in headers:
                s.write(b'Host: %s\r\n' % host)
            # Iterate over keys to avoid tuple alloc
            _write_headers(s, self._headers)
            _write_headers(s, headers)

            # add user agent
            s.write(b'User-Agent: MicroPython Client\r\n')
            if json is not None:
                assert data is None
                import ujson
                data = ujson.dumps(json)
                s.write(b'Content-Type: application/json\r\n')

            if data:
                if chunked:
                    s.write(b"Transfer-Encoding: chunked\r\n")
                else:
                    s.write(b"Content-Length: %d\r\n" % len(data))
            s.write(b"\r\n")
            if data:
                if chunked:
                    for chunk in data:
                        s.write(b"%x\r\n" % len(chunk))
                        s.write(chunk)
                        s.write(b"\r\n")
                    s.write("0\r\n\r\n")
                else:
                    s.write(data)
            elif file:
                s.write(b'Content-Length: %d\r\n' % os.stat(file)[6])
                s.write(b'\r\n')
                with open(file, 'r') as file_object:
                    for line in file_object:
                        s.write(line + '\n')
            elif custom:
                custom(s)
            else:
                s.write(b'\r\n')

            l = s.readline()
            #print('l: ', l)
            l = l.split(None, 2)
            status = int(l[1])
            reason = ''
            if len(l) > 2:
                reason = l[2].rstrip()
            while True:
                l = s.readline()
                if not l or l == b'\r\n':
                    break
                #print('l: ', l)
                if l.startswith(b'Transfer-Encoding:'):
                    if b'chunked' in l:
                        raise ValueError('Unsupported ' + l)
                elif l.startswith(b'Location:') and not 200 <= status <= 299:
                    if status in [301, 302, 303, 307, 308]:
                        redirect = l[10:-2].decode()
                    else:
                        raise NotImplementedError("Redirect {} not yet supported".format(status))
        except OSError:
            s.close()
            raise

        if redirect:
            s.close()
            print(redirect)
            if status in [301, 302, 303]:
                return self.request('GET', url=redirect, **kw)
            else:
                return self.request(method, redirect, **kw)
        else:
            resp = Response(s,saveToFile)
            resp.status_code = status
            resp.reason = reason
            return resp

    def head(self, url, **kw):
        return self.request('HEAD', url, **kw)

    def get(self, url, **kw):
        return self.request('GET', url, **kw)

    def post(self, url, **kw):
        return self.request('POST', url, **kw)

    def put(self, url, **kw):
        return self.request('PUT', url, **kw)

    def patch(self, url, **kw):
        return self.request('PATCH', url, **kw)

    def delete(self, url, **kw):
        return self.request('DELETE', url, **kw)



推荐分享
图文皆来源于网络,内容仅做公益性分享,版权归原作者所有,如有侵权请告知删除!
 

Copyright © 2014 ESP56.com All Rights Reserved

执行时间: 0.0094521045684814 seconds