Micropython学习交流群 学习QQ群:786510434 提供多种固件下载和学习交流。

Micropython-扇贝物联 QQ群:31324057 扇贝物联是一个让你与智能设备沟通更方便的物联网云平台

Micropython学习交流群 学习QQ群:468985481 学习交流ESP8266、ESP32、ESP8285、wifi模块开发交流、物联网。

Micropython老哥俩的IT农场分享QQ群:929132891 为喜欢科创制作的小白们分享一些自制的计算机软硬件免费公益课程,由两位多年从事IT研发的中年大叔发起。

Micropython ESP频道

micropython 获取url headers 信息


main.py

import mrequests
import network
from machine import Pin
import time,sys
import json

def do_connect():       
    wifi_led=Pin(12,Pin.OUT)             # 板载指示灯初始化
    wlan = network.WLAN(network.STA_IF)  # 以工作站 (wlan) 模式运行,需要创建一个工作站Wi-Fi接口的实例
    wlan.active(True)                    # 在工作站对象上调用激活方法并以True作为输入值传递来激活网络接口
    start_time=time.time()               # 记录开始时间
      
    if not wlan.isconnected():              # 如果尚未联网成功
        print("当前无线未联网,正在连接中....")  
        wlan.connect("NBWIFI", "xxxxxx")   # 无线网SSID、密码,开始联网
        while not wlan.isconnected():         # 如果还未连接成功,则LED灯闪烁提示
          wifi_led.value(1)
          time.sleep_ms(1000)
          wifi_led.value(1)
          time.sleep_ms(1000) 
          print("正在尝试连接到wifi....")
          print(time.time())
          
          if time.time()-start_time>15:       # 如果超过15秒还不行,就退出
            print("连接失败!!!无线网连接超过15秒,请检查无线网名称和密码是否正确..")
            break
            
    if wlan.isconnected():                  # 如果联接成功
        wifi_led.value(1)                     # LED灯常亮
        IP_info=wlan.ifconfig()    
        print("无线网已经连接,信息如下:")
        print("IP地址:"+IP_info[0])
        print("子网掩码:"+IP_info[1])
        print("网关:"+IP_info[2])
        print("DNS:"+IP_info[3])
        print(wlan.ifconfig())
        print(type(wlan.ifconfig()))
        ip_address = wlan.ifconfig()[0]
        print("ip_address "+ip_address)







class MyResponse(mrequests.Response):
    def __init__(self, *args, **kw):
        super().__init__(*args, **kw)
        self.headers = {}

    def add_header(self, data):
        # let base class handle headers, which influence response parsing
        self._parse_header(data)
        name, value = data.decode('utf-8').rstrip('\r\n').split(':', 1)
        self.headers[name.lower()] = value.strip()


def request(*args, **kw):
    kw.setdefault('response_class', MyResponse)
    return mrequests.request(*args, **kw)




do_connect()
url = "http://www.4936.cn/api/r1.php"
r = request("GET", url)

if r.status_code == 302:
    print("Response headers:")
    print("=================\n")
    for name, value in r.headers.items():
         print("{}: {}".format(name, value))



else:
    print("Request failed. Status: {}".format(r.status_code))


mrequests.py

"""A HTTP client module for MicroPython with an API similar to requests."""

import sys

try:
    import socket
except ImportError:
    import usocket as socket


MICROPY = sys.implementation.name == "micropython"
MAX_READ_SIZE = 4 * 1024


def encode_basic_auth(user, password):
    from ubinascii import b2a_base64

    auth_encoded = b2a_base64(b"%s:%s" % (user, password)).rstrip(b"\n")
    return {b"Authorization": b"Basic %s" % auth_encoded}


def head(url, **kw):
    return request("HEAD", url, **kw)


def get(url, **kw):
    return request("GET", url, **kw)


def post(url, **kw):
    return request("POST", url, **kw)


def put(url, **kw):
    return request("PUT", url, **kw)


def patch(url, **kw):
    return request("PATCH", url, **kw)


def delete(url, **kw):
    return request("DELETE", url, **kw)


def parse_url(url):
    port = None
    host = None

    # str.partition() would be handy here,
    # but it's not supported on the esp8266 port
    delim = url.find("//")
    if delim >= 0:
        scheme, loc = url[:delim].rstrip(':'), url[delim+2:]
    else:
        loc = url
        scheme = ""

    psep = loc.find("/")
    if psep == -1:
        if scheme:
            host = loc
            path = "/"
        else:
            path = loc
    elif psep == 0:
        path = loc
    else:
        path = loc[psep:]
        host = loc[:psep]

    if host:
        hsep = host.rfind(":")

        if hsep > 0:
            port = int(host[hsep + 1 :])
            host = host[:hsep]

    return scheme or None, host, port, path


class RequestContext:
    def __init__(self, url, method=None):
        self.redirect = False
        self.method = method or "GET"
        self.scheme, self.host, self._port, self.path = parse_url(url)
        if not self.scheme or not self.host:
            raise ValueError("An absolute URL is required.")

    @property
    def port(self):
        return self._port if self._port is not None else 443 if self.scheme == "https" else 80

    @property
    def url(self):
        return "%s://%s%s" % (
            self.scheme,
            self.host if self._port is None else ("%s:%s" % (self.host, self.port)),
            self.path,
        )

    def set_location(self, status, location):
        if status in (301, 302, 307, 308):
            self.redirect = True
        elif status == 303 and self.method != "GET":
            self.redirect = True

        if self.redirect:
            scheme, host, port, path = parse_url(location)

            if scheme and self.scheme == "https" and scheme != "https":
                self.redirect = False
                return

            if status not in (307, 308) and self.method != "HEAD":
                self.method = "GET"

            if scheme:
                self.scheme = scheme
            if host:
                self.host = host
                self._port = port

            if path.startswith("/"):
                self.path = path
            else:
                self.path = self.path.rsplit("/", 1)[0] + "/" + path


class Response:
    def __init__(self, sock, sockfile, save_headers=False):
        self.sock = sock
        self.sf = sockfile
        self.encoding = "utf-8"
        self._cached = None
        self._chunk_size = 0
        self._content_size = 0
        self.chunked = False
        self.status_code = None
        self.reason = ""
        self.headers = [] if save_headers else None

    def read(self, size=MAX_READ_SIZE):
        if self.chunked:
            if self._chunk_size == 0:
                l = self.sf.readline()
                # print("Chunk line:", l)
                # ignore chunk extensions
                l = l.split(b";", 1)[0]
                self._chunk_size = int(l, 16)
                # print("Chunk size:", self._chunk_size)

                if self._chunk_size == 0:
                    # End of message
                    sep = sf.read(2)
                    if sep != b"\r\n":
                        raise ValueError("Expected final chunk separator, read %r instead." % sep)

                    return b""

            data = self.sf.read(min(size, self._chunk_size))
            self._chunk_size -= len(data)

            if self._chunk_size == 0:
                sep = self.sf.read(2)
                if sep != b"\r\n":
                    raise ValueError("Expected chunk separator, read %r instead." % sep)

            return data
        else:
            if size:
                return self.sf.read(size)
            else:
                return self.sf.read(self._content_size)

    def save(self, fn, chunk_size=1024):
        read = 0

        with open(fn, "wb") as fp:
            while True:
                remain = self._content_size - read

                if remain <= 0:
                    break

                chunk = self.read(min(chunk_size, remain))
                read += len(chunk)

                if not chunk:
                    break

                fp.write(chunk)

        self.close()

    def _parse_header(self, data):
        if data[:18].lower() == b"transfer-encoding:" and b"chunked" in data[18:]:
            self.chunked = True
            # print("Chunked response detected.")
        elif data[:15].lower() == b"content-length:":
            self._content_size = int(data.split(b":", 1)[1])
            # print("Content length: %i" % self._content_size)

    # overwrite this method, if you want to process/store headers differently
    def add_header(self, data):
        self._parse_header(data)

        if self.headers is not None:
            self.headers.append(data)

    def close(self):
        if not MICROPY:
            self.sf.close()
            self.sf = None
        if self.sock:
            self.sock.close()
            self.sock = None
        self._cached = None

    @property
    def content(self):
        if self._cached is None:
            try:
                self._cached = self.read(size=None)
            finally:
                self.sock.close()
                self.sock = None
        return self._cached

    @property
    def text(self):
        return str(self.content, self.encoding)

    def json(self):
        import ujson

        return ujson.loads(self.content)


def request(
    method,
    url,
    data=None,
    json=None,
    headers={},
    auth=None,
    encoding=None,
    response_class=Response,
    save_headers=False,
    max_redirects=1,
    timeout=None,
):
    if auth:
        headers.update(auth if callable(auth) else encode_basic_auth(auth[0], auth[1]))

    if json is not None:
        assert data is None
        import ujson

        data = ujson.dumps(json)

    ctx = RequestContext(url, method)

    while True:
        if ctx.scheme not in ("http", "https"):
            raise ValueError("Protocol scheme %s not supported." % ctx.scheme)

        ctx.redirect = False

        # print("Resolving host address...")
        ai = socket.getaddrinfo(ctx.host, ctx.port, 0, socket.SOCK_STREAM)
        ai = ai[0]

        # print("Creating socket...")
        sock = socket.socket(ai[0], ai[1], ai[2])
        sock.settimeout(timeout)
        try:
            # print("Connecting to %s:%i..." % (ctx.host, ctx.port))
            sock.connect(ai[-1])
            if ctx.scheme == "https":
                try:
                    import ssl
                except ImportError:
                    import ussl as ssl

                # print("Wrapping socket with SSL")
                create_ctx = getattr(ssl, 'create_default_context', None)
                if create_ctx:
                    sock = create_ctx().wrap_socket(sock, server_hostname=ctx.host)
                else:
                    sock = ssl.wrap_socket(sock, server_hostname=ctx.host)

            sf = sock if MICROPY else sock.makefile("rwb")
            sf.write(b"%s %s HTTP/1.1\r\n" % (ctx.method.encode("ascii"), ctx.path.encode("ascii")))

            if not b"Host" in headers:
                sf.write(b"Host: %s\r\n" % ctx.host.encode())

            for k, val in headers.items():
                sf.write(k if isinstance(k, bytes) else k.encode('ascii'))
                sf.write(b": ")
                sf.write(val if isinstance(val, bytes) else val.encode('ascii'))
                sf.write(b"\r\n")

            if data and ctx.method not in ("GET", "HEAD"):
                if json is not None:
                    sf.write(b"Content-Type: application/json")
                    if encoding:
                        sf.write(b"; charset=%s" % encoding.encode())
                    sf.write(b"\r\n")

                sf.write(b"Content-Length: %d\r\n" % len(data))

            sf.write(b"Connection: close\r\n\r\n")

            if data and ctx.method not in ("GET", "HEAD"):
                sf.write(data if isinstance(data, bytes) else data.encode(encoding or "utf-8"))

            if not MICROPY:
                sf.flush()

            resp = response_class(sock, sf, save_headers=save_headers)
            l = b""
            i = 0
            while True:
                l += sf.read(1)
                i += 1

                if l.endswith(b"\r\n") or i > MAX_READ_SIZE:
                    break

            # print("Response: %s" % l.decode("ascii"))
            l = l.split(None, 2)
            resp.status_code = int(l[1])

            if len(l) > 2:
                resp.reason = l[2].rstrip()

            while True:
                l = sf.readline()
                if not l or l == b"\r\n":
                    break

                if l.startswith(b"Location:"):
                    ctx.set_location(resp.status_code, l[9:].strip().decode("ascii"))

                # print("Header: %r" % l)
                resp.add_header(l)
        except OSError:
            sock.close()
            raise

        if ctx.redirect:
            print("Redirect to: %s" % ctx.url)
            sock.close()
            max_redirects -= 1

            if max_redirects < 0:
                raise ValueError("Maximum redirection count exceeded.")

        else:
            break

    return resp



推荐分享
图文皆来源于网络,内容仅做公益性分享,版权归原作者所有,如有侵权请告知删除!
 

Copyright © 2014 ESP56.com All Rights Reserved

执行时间: 0.009951114654541 seconds