Micropython学习交流群 学习QQ群:786510434 提供多种固件下载和学习交流。
Micropython-扇贝物联 QQ群:31324057 扇贝物联是一个让你与智能设备沟通更方便的物联网云平台
Micropython学习交流群 学习QQ群:468985481 学习交流ESP8266、ESP32、ESP8285、wifi模块开发交流、物联网。
Micropython老哥俩的IT农场分享QQ群:929132891 为喜欢科创制作的小白们分享一些自制的计算机软硬件免费公益课程,由两位多年从事IT研发的中年大叔发起。
使用默认设置运行 import uping pinger = uping.Ping('example.org') pinger.start() PING example.org (93.184.216.34): 56 data bytes 64 bytes from 93.184.216.34: seq=0 ttl=54 time=106.261 ms 64 bytes from 93.184.216.34: seq=1 ttl=54 time=106.221 ms 64 bytes from 93.184.216.34: seq=2 ttl=54 time=106.421 ms 64 bytes from 93.184.216.34: seq=3 ttl=54 time=107.521 ms 4 packets transmitted, 4 packets received, 0 packet loss round-trip min/avg/max = 106.221/106.606/107.521 ms 参数 Required HOST:(FQDN(正式域名) or IP address) Optional SOURCE:(default: None, ip address) COUNT:(default: 4, integer) INTERVAL:(default: 1000, ms) SIZE:(default: 64, bytes) TIMEOUT:(default: 5000, ms) quiet:(default: False, bool) 类方法 Ping.start() 使用给定的参数启动ping循环。并且总是从第一个序号开始。 如果是静默的,则返回带有ping结果的元组: result(tx=4, rx=4, losses=0, min=106.221, avg=106.606, max=107.521) Where: tx - transmitted(传输), rx - received(接收), losses - percentage of packets that be lost(丢失数据包百分比). Ping.ping() 只发送一个数据包。保持和增加当前序号。 import uping pinger = uping.Ping('example.org') # Some awesome code pong = pinger.ping() print(pong) 返回元组:序列号(int)、往返时间(ms, float)、ttl(int)。 (5, 106.521, 54) Ping.ping_async() 支持异步。通过 Ping.getEND_async() 获取数据为元祖:序列号(int)、往返时间(ms, float)、ttl(int)。 Ping.getEND_async() import uping async def unraid_ip(): try: pinger = uping.Ping("www.baidu.com") await pinger.ping_async() print("PING Success...") print(pinger.getEND_async()) except: print('PING network except') finally: pinger.close() Ping.close() 关闭 socket 连接。
uping.py
''' Author: Yogile Gitee: https://gitee.com/yogile Date: 2022-07-16 17:41:38 LastEditors: Yogile LastEditTime: 2022-07-16 18:04:28 Fork from: https://github.com/coffee-it/uPing Description: Porting code to ESP32 chips ''' import utime import uselect import uctypes import usocket import ustruct import urandom import micropython urandom.seed(utime.ticks_us()) class Ping(): """ Create the ping calculating object exemplar """ def __init__(self, HOST, SOURCE=None, COUNT=4, INTERVAL=1000, SIZE=64, TIMEOUT=5000, quiet=False): self.HOST = HOST self.COUNT = COUNT self.TIMEOUT = TIMEOUT self.INTERVAL = INTERVAL self.SIZE = SIZE self.quiet = quiet self.END = None # prepare packet assert SIZE >= 16, "pkt size too small" self._PKT = b'Q'*SIZE self.PKT_DESC = { "type": uctypes.UINT8 | 0, "code": uctypes.UINT8 | 1, "checksum": uctypes.UINT16 | 2, "id": uctypes.UINT16 | 4, "seq": uctypes.INT16 | 6, "timestamp": uctypes.UINT64 | 8, } # packet header descriptor h = uctypes.struct(uctypes.addressof(self._PKT), self.PKT_DESC, uctypes.BIG_ENDIAN) h.type = 8 # ICMP_ECHO_REQUEST h.code = 0 h.checksum = 0 h.id = urandom.getrandbits(16) h.seq = 1 self.h = h # init socket sock = usocket.socket(usocket.AF_INET, usocket.SOCK_RAW, 1) if SOURCE: src_addr = usocket.getaddrinfo(SOURCE, 1)[0][-1] # ip address sock.bind(src_addr) sock.setblocking(0) sock.settimeout(TIMEOUT/1000) addresses = usocket.getaddrinfo(HOST, 1) # [0][-1] # list of ip addresses assert addresses, "Can not take the IP address of host" self.CLIENT_IP = None for addr in addresses: try: sock.connect(addr[-1]) self.CLIENT_IP = addr[-1][0] except: continue assert self.CLIENT_IP, "Connection failed" self.sock = sock # [ COUNTERS ] self.seq_num = 1 # Next sequence number self.transmitted = 0 self.received = 0 self.seqs = None def start(self): """ Starting a ping cycle with the specified settings (like a interval, count e.t.c.) """ t = self.INTERVAL finish = False # [ Rtt metrics ] pongs = [] min_rtt, max_rtt = None, None # [ Start over ] self.seq_num = 1 self.transmitted = 0 self.received = 0 not self.quiet and print("PING %s (%s): %u data bytes" % (self.HOST, self.CLIENT_IP, len(self._PKT))) if self.seqs: self.seqs.extend(list(range(self.seq_num, self.COUNT + 1))) # [seq_num, seq_num + 1, ...., seq_num + n]) else: self.seqs = list(range(self.seq_num, self.COUNT + 1)) # [1,2,...,count] # Здесь нужно подвязаться на реальное время while self.seq_num <= self.seq_num + self.COUNT: if t >= self.INTERVAL: pong = self.ping() t = 0 rtt = pong[1] if rtt: pongs.append(rtt) if not min_rtt or rtt <= min_rtt: min_rtt = round(rtt, 3) if not max_rtt or rtt >= max_rtt: max_rtt = round(rtt, 3) if len(self.seqs) == 0: finish = True if finish: break utime.sleep_ms(1) t += 1 losses = round((self.transmitted - self.received) / self.transmitted) * 100 avg_rtt = round(sum(pongs) / len(pongs), 3) if pongs else None from ucollections import namedtuple _result = namedtuple("result", ("tx", "rx", "losses", "min", "avg", "max")) result = _result(self.transmitted, self.received, losses, min_rtt, avg_rtt, max_rtt) if not self.quiet: print(r'%u packets transmitted, %u packets received, %u packet loss' % (self.transmitted, self.received, losses)) if avg_rtt: print(r'round-trip min/avg/max = %r/%r/%r ms' % (min_rtt, avg_rtt, max_rtt)) else: return result def ping(self): """ Send ping manually. Returns sequense number(int), round-trip time (ms, float), ttl """ sock = self.sock if not self.seqs: self.seqs = [] self.seqs.append(self.seq_num) seq, t_elasped, ttl = None, None, None # header h = self.h h.checksum = 0 h.seq = self.seq_num h.timestamp = utime.ticks_us() h.checksum = self.checksum(self._PKT) try: # send packet if sock.send(self._PKT) == self.SIZE: self.transmitted += 1 else: self.seqs.remove(self.seq_num) self.seq_num += 1 # recv packet while 1: resp = sock.recv(self.SIZE + 20) # ICMP header and payload + IP header resp_mv = memoryview(resp) h2 = uctypes.struct(uctypes.addressof(resp_mv[20:]), self.PKT_DESC, uctypes.BIG_ENDIAN) seq = h2.seq if h2.type==0 and h2.id==h.id and (seq in self.seqs): # 0: ICMP_ECHO_REPLY t_elasped = (utime.ticks_us()-h2.timestamp) / 1000 self.seqs.remove(seq) if h2.checksum == self.checksum(resp[24:]): # except IP header and a part of ICMP header (type, code, checksum) ttl = ustruct.unpack('!B', resp_mv[8:9])[0] # time-to-live self.received += 1 not self.quiet and print("%u bytes from %s: icmp_seq=%u, ttl=%u, time=%f ms" % (len(resp[12:]), self.CLIENT_IP, seq, ttl, t_elasped)) break else: not self.quiet and print("Payload checksum doesnt match") t_elasped = None break except Exception as identifier: import errno if identifier.args[0] == errno.EPIPE: print("Client connection unexpectedly closed") pass elif identifier.args[0] == errno.EBADF: print("Bad file descriptor.") pass else: raise identifier return (seq, t_elasped, ttl) async def ping_async(self): """ Send ping manually by async. Returns sequense number(int), round-trip time (ms, float), ttl """ sock = self.sock if not self.seqs: self.seqs = [] self.seqs.append(self.seq_num) seq, t_elasped, ttl = None, None, None # header h = self.h h.checksum = 0 h.seq = self.seq_num h.timestamp = utime.ticks_us() h.checksum = self.checksum(self._PKT) try: # send packet if sock.send(self._PKT) == self.SIZE: self.transmitted += 1 else: self.seqs.remove(self.seq_num) self.seq_num += 1 # recv packet while 1: resp = sock.recv(self.SIZE + 20) # ICMP header and payload + IP header resp_mv = memoryview(resp) h2 = uctypes.struct(uctypes.addressof(resp_mv[20:]), self.PKT_DESC, uctypes.BIG_ENDIAN) seq = h2.seq if h2.type==0 and h2.id==h.id and (seq in self.seqs): # 0: ICMP_ECHO_REPLY t_elasped = (utime.ticks_us()-h2.timestamp) / 1000 self.seqs.remove(seq) if h2.checksum == self.checksum(resp[24:]): # except IP header and a part of ICMP header (type, code, checksum) ttl = ustruct.unpack('!B', resp_mv[8:9])[0] # time-to-live self.received += 1 not self.quiet and print("%u bytes from %s: icmp_seq=%u, ttl=%u, time=%f ms" % (len(resp[12:]), self.CLIENT_IP, seq, ttl, t_elasped)) break else: not self.quiet and print("Payload checksum doesnt match") t_elasped = None break except Exception as identifier: import errno if identifier.args[0] == errno.EPIPE: print("Client connection unexpectedly closed") pass elif identifier.args[0] == errno.EBADF: print("Bad file descriptor.") pass else: raise identifier self.END = (seq, t_elasped, ttl) def checksum(self, data): if len(data) & 0x1: # Odd number of bytes data += b'\0' cs = 0 for pos in range(0, len(data), 2): b1 = data[pos] b2 = data[pos + 1] cs += (b1 << 8) + b2 while cs >= 0x10000: cs = (cs & 0xffff) + (cs >> 16) cs = ~cs & 0xffff return cs def getEND_async(self): """ get ping_async() result """ return self.END def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def close(self): self.sock.close()
来源:https://gitee.com/Yogile/uPing/
Copyright © 2014 ESP56.com All Rights Reserved
执行时间: 0.010303020477295 seconds