Micropython学习交流群 学习QQ群:786510434 提供多种固件下载和学习交流。
Micropython-扇贝物联 QQ群:31324057 扇贝物联是一个让你与智能设备沟通更方便的物联网云平台
Micropython学习交流群 学习QQ群:468985481 学习交流ESP8266、ESP32、ESP8285、wifi模块开发交流、物联网。
Micropython老哥俩的IT农场分享QQ群:929132891 为喜欢科创制作的小白们分享一些自制的计算机软硬件免费公益课程,由两位多年从事IT研发的中年大叔发起。
main.py
from uhttp import requests import socket import threading import network import time def do_connect(): wlan = network.WLAN(network.STA_IF) # 以工作站 (wlan) 模式运行,需要创建一个工作站Wi-Fi接口的实例 wlan.active(True) # 在工作站对象上调用激活方法并以True作为输入值传递来激活网络接口 start_time=time.time() # 记录开始时间 if not wlan.isconnected(): # 如果尚未联网成功 print("当前无线未联网,正在连接中....") wlan.connect("NBWIFI", "xxxxx") # 无线网SSID、密码,开始联网 while not wlan.isconnected(): # 如果还未连接成功,则LED灯闪烁提示 time.sleep_ms(1000) print("正在尝试连接到wifi....") print(time.time()) if time.time()-start_time>15: # 如果超过15秒还不行,就退出 print("连接失败!!!无线网连接超过15秒,请检查无线网名称和密码是否正确..") break if wlan.isconnected(): # 如果联接成功 IP_info=wlan.ifconfig() print("无线网已经连接,信息如下:") print("IP地址:"+IP_info[0]) global serverip serverip=IP_info[0] print("子网掩码:"+IP_info[1]) print("网关:"+IP_info[2]) print("DNS:"+IP_info[3]) ip_address = wlan.ifconfig()[0] print("ip_address "+ip_address) do_connect() import os, gc from libs.httpclient import HttpClient http_client = HttpClient(headers={}) #获取网页内容 print(http_client.get("http://www.xxxxx.cn/api/r1.php").text) #下载文件 http_client.get('http://www.esp56.com/api/einkweather/style1.php', saveToFile="123.bmp") try: f = open("123.bmp", "r") print("下载完成") # continue with the file. except OSError: # open failed print("下载失败") # handle the file open case
httpclient.py
import usocket, os, gc class Response: def __init__(self, socket, saveToFile=None): self._socket = socket self._saveToFile = saveToFile self._encoding = 'utf-8' if saveToFile is not None: CHUNK_SIZE = 512 # bytes with open(saveToFile, 'w') as outfile: data = self._socket.read(CHUNK_SIZE) while data: outfile.write(data) data = self._socket.read(CHUNK_SIZE) outfile.close() self.close() def close(self): if self._socket: self._socket.close() self._socket = None @property def content(self): if self._saveToFile is not None: raise SystemError('You cannot get the content from the response as you decided to save it in {}'.format(self._saveToFile)) try: result = self._socket.read() return result finally: self.close() @property def text(self): return str(self.content, self._encoding) def json(self): try: import ujson result = ujson.load(self._socket) return result finally: self.close() class HttpClient: def __init__(self, headers={}): self._headers = headers def is_chunked_data(data): return getattr(data, "__iter__", None) and not getattr(data, "__len__", None) def request(self, method, url, data=None, json=None, file=None, custom=None, saveToFile=None, headers={}, stream=None): chunked = data and self.is_chunked_data(data) redirect = None #redirection url, None means no redirection def _write_headers(sock, _headers): for k in _headers: sock.write(b'{}: {}\r\n'.format(k, _headers[k])) try: proto, dummy, host, path = url.split('/', 3) except ValueError: proto, dummy, host = url.split('/', 2) path = '' if proto == 'http:': port = 80 elif proto == 'https:': import ussl port = 443 else: raise ValueError('Unsupported protocol: ' + proto) if ':' in host: host, port = host.split(':', 1) port = int(port) ai = usocket.getaddrinfo(host, port, 0, usocket.SOCK_STREAM) if len(ai) < 1: raise ValueError('You are not connected to the internet...') ai = ai[0] s = usocket.socket(ai[0], ai[1], ai[2]) try: s.connect(ai[-1]) if proto == 'https:': gc.collect() s = ussl.wrap_socket(s, server_hostname=host) s.write(b'%s /%s HTTP/1.0\r\n' % (method, path)) if not 'Host' in headers: s.write(b'Host: %s\r\n' % host) # Iterate over keys to avoid tuple alloc _write_headers(s, self._headers) _write_headers(s, headers) # add user agent s.write(b'User-Agent: MicroPython Client\r\n') if json is not None: assert data is None import ujson data = ujson.dumps(json) s.write(b'Content-Type: application/json\r\n') if data: if chunked: s.write(b"Transfer-Encoding: chunked\r\n") else: s.write(b"Content-Length: %d\r\n" % len(data)) s.write(b"\r\n") if data: if chunked: for chunk in data: s.write(b"%x\r\n" % len(chunk)) s.write(chunk) s.write(b"\r\n") s.write("0\r\n\r\n") else: s.write(data) elif file: s.write(b'Content-Length: %d\r\n' % os.stat(file)[6]) s.write(b'\r\n') with open(file, 'r') as file_object: for line in file_object: s.write(line + '\n') elif custom: custom(s) else: s.write(b'\r\n') l = s.readline() #print('l: ', l) l = l.split(None, 2) status = int(l[1]) reason = '' if len(l) > 2: reason = l[2].rstrip() while True: l = s.readline() if not l or l == b'\r\n': break #print('l: ', l) if l.startswith(b'Transfer-Encoding:'): if b'chunked' in l: raise ValueError('Unsupported ' + l) elif l.startswith(b'Location:') and not 200 <= status <= 299: if status in [301, 302, 303, 307, 308]: redirect = l[10:-2].decode() else: raise NotImplementedError("Redirect {} not yet supported".format(status)) except OSError: s.close() raise if redirect: s.close() print(redirect) if status in [301, 302, 303]: return self.request('GET', url=redirect, **kw) else: return self.request(method, redirect, **kw) else: resp = Response(s,saveToFile) resp.status_code = status resp.reason = reason return resp def head(self, url, **kw): return self.request('HEAD', url, **kw) def get(self, url, **kw): return self.request('GET', url, **kw) def post(self, url, **kw): return self.request('POST', url, **kw) def put(self, url, **kw): return self.request('PUT', url, **kw) def patch(self, url, **kw): return self.request('PATCH', url, **kw) def delete(self, url, **kw): return self.request('DELETE', url, **kw)
Copyright © 2014 ESP56.com All Rights Reserved
执行时间: 0.0094521045684814 seconds