本站改版新增arduino频道
使用默认设置运行
import uping
pinger = uping.Ping('example.org')
pinger.start()
PING example.org (93.184.216.34): 56 data bytes
64 bytes from 93.184.216.34: seq=0 ttl=54 time=106.261 ms
64 bytes from 93.184.216.34: seq=1 ttl=54 time=106.221 ms
64 bytes from 93.184.216.34: seq=2 ttl=54 time=106.421 ms
64 bytes from 93.184.216.34: seq=3 ttl=54 time=107.521 ms
4 packets transmitted, 4 packets received, 0 packet loss
round-trip min/avg/max = 106.221/106.606/107.521 ms
参数
Required
HOST:(FQDN(正式域名) or IP address)
Optional
SOURCE:(default: None, ip address)
COUNT:(default: 4, integer)
INTERVAL:(default: 1000, ms)
SIZE:(default: 64, bytes)
TIMEOUT:(default: 5000, ms)
quiet:(default: False, bool)
类方法
Ping.start()
使用给定的参数启动ping循环。并且总是从第一个序号开始。
如果是静默的,则返回带有ping结果的元组:
result(tx=4, rx=4, losses=0, min=106.221, avg=106.606, max=107.521)
Where:
tx - transmitted(传输),
rx - received(接收),
losses - percentage of packets that be lost(丢失数据包百分比).
Ping.ping()
只发送一个数据包。保持和增加当前序号。
import uping
pinger = uping.Ping('example.org')
# Some awesome code
pong = pinger.ping()
print(pong)
返回元组:序列号(int)、往返时间(ms, float)、ttl(int)。
(5, 106.521, 54)
Ping.ping_async()
支持异步。通过 Ping.getEND_async() 获取数据为元祖:序列号(int)、往返时间(ms, float)、ttl(int)。
Ping.getEND_async()
import uping
async def unraid_ip():
try:
pinger = uping.Ping("www.baidu.com")
await pinger.ping_async()
print("PING Success...")
print(pinger.getEND_async())
except:
print('PING network except')
finally:
pinger.close()
Ping.close()
关闭 socket 连接。
uping.py
'''
Author: Yogile
Gitee: https://gitee.com/yogile
Date: 2022-07-16 17:41:38
LastEditors: Yogile
LastEditTime: 2022-07-16 18:04:28
Fork from: https://github.com/coffee-it/uPing
Description: Porting code to ESP32 chips
'''
import utime
import uselect
import uctypes
import usocket
import ustruct
import urandom
import micropython
urandom.seed(utime.ticks_us())
class Ping():
"""
Create the ping calculating object exemplar
"""
def __init__(self, HOST, SOURCE=None, COUNT=4, INTERVAL=1000, SIZE=64, TIMEOUT=5000, quiet=False):
self.HOST = HOST
self.COUNT = COUNT
self.TIMEOUT = TIMEOUT
self.INTERVAL = INTERVAL
self.SIZE = SIZE
self.quiet = quiet
self.END = None
# prepare packet
assert SIZE >= 16, "pkt size too small"
self._PKT = b'Q'*SIZE
self.PKT_DESC = {
"type": uctypes.UINT8 | 0,
"code": uctypes.UINT8 | 1,
"checksum": uctypes.UINT16 | 2,
"id": uctypes.UINT16 | 4,
"seq": uctypes.INT16 | 6,
"timestamp": uctypes.UINT64 | 8,
} # packet header descriptor
h = uctypes.struct(uctypes.addressof(self._PKT), self.PKT_DESC, uctypes.BIG_ENDIAN)
h.type = 8 # ICMP_ECHO_REQUEST
h.code = 0
h.checksum = 0
h.id = urandom.getrandbits(16)
h.seq = 1
self.h = h
# init socket
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_RAW, 1)
if SOURCE:
src_addr = usocket.getaddrinfo(SOURCE, 1)[0][-1] # ip address
sock.bind(src_addr)
sock.setblocking(0)
sock.settimeout(TIMEOUT/1000)
addresses = usocket.getaddrinfo(HOST, 1) # [0][-1] # list of ip addresses
assert addresses, "Can not take the IP address of host"
self.CLIENT_IP = None
for addr in addresses:
try:
sock.connect(addr[-1])
self.CLIENT_IP = addr[-1][0]
except:
continue
assert self.CLIENT_IP, "Connection failed"
self.sock = sock
# [ COUNTERS ]
self.seq_num = 1 # Next sequence number
self.transmitted = 0
self.received = 0
self.seqs = None
def start(self):
"""
Starting a ping cycle with the specified settings (like a interval, count e.t.c.)
"""
t = self.INTERVAL
finish = False
# [ Rtt metrics ]
pongs = []
min_rtt, max_rtt = None, None
# [ Start over ]
self.seq_num = 1
self.transmitted = 0
self.received = 0
not self.quiet and print("PING %s (%s): %u data bytes" % (self.HOST, self.CLIENT_IP, len(self._PKT)))
if self.seqs:
self.seqs.extend(list(range(self.seq_num, self.COUNT + 1))) # [seq_num, seq_num + 1, ...., seq_num + n])
else:
self.seqs = list(range(self.seq_num, self.COUNT + 1)) # [1,2,...,count]
# Здесь нужно подвязаться на реальное время
while self.seq_num <= self.seq_num + self.COUNT:
if t >= self.INTERVAL:
pong = self.ping()
t = 0
rtt = pong[1]
if rtt:
pongs.append(rtt)
if not min_rtt or rtt <= min_rtt: min_rtt = round(rtt, 3)
if not max_rtt or rtt >= max_rtt: max_rtt = round(rtt, 3)
if len(self.seqs) == 0:
finish = True
if finish:
break
utime.sleep_ms(1)
t += 1
losses = round((self.transmitted - self.received) / self.transmitted) * 100
avg_rtt = round(sum(pongs) / len(pongs), 3) if pongs else None
from ucollections import namedtuple
_result = namedtuple("result", ("tx", "rx", "losses", "min", "avg", "max"))
result = _result(self.transmitted, self.received, losses, min_rtt, avg_rtt, max_rtt)
if not self.quiet:
print(r'%u packets transmitted, %u packets received, %u packet loss' % (self.transmitted, self.received, losses))
if avg_rtt: print(r'round-trip min/avg/max = %r/%r/%r ms' % (min_rtt, avg_rtt, max_rtt))
else:
return result
def ping(self):
"""
Send ping manually.
Returns sequense number(int), round-trip time (ms, float), ttl
"""
sock = self.sock
if not self.seqs:
self.seqs = []
self.seqs.append(self.seq_num)
seq, t_elasped, ttl = None, None, None
# header
h = self.h
h.checksum = 0
h.seq = self.seq_num
h.timestamp = utime.ticks_us()
h.checksum = self.checksum(self._PKT)
try:
# send packet
if sock.send(self._PKT) == self.SIZE:
self.transmitted += 1
else:
self.seqs.remove(self.seq_num)
self.seq_num += 1
# recv packet
while 1:
resp = sock.recv(self.SIZE + 20) # ICMP header and payload + IP header
resp_mv = memoryview(resp)
h2 = uctypes.struct(uctypes.addressof(resp_mv[20:]), self.PKT_DESC, uctypes.BIG_ENDIAN)
seq = h2.seq
if h2.type==0 and h2.id==h.id and (seq in self.seqs): # 0: ICMP_ECHO_REPLY
t_elasped = (utime.ticks_us()-h2.timestamp) / 1000
self.seqs.remove(seq)
if h2.checksum == self.checksum(resp[24:]): # except IP header and a part of ICMP header (type, code, checksum)
ttl = ustruct.unpack('!B', resp_mv[8:9])[0] # time-to-live
self.received += 1
not self.quiet and print("%u bytes from %s: icmp_seq=%u, ttl=%u, time=%f ms" % (len(resp[12:]), self.CLIENT_IP, seq, ttl, t_elasped))
break
else:
not self.quiet and print("Payload checksum doesnt match")
t_elasped = None
break
except Exception as identifier:
import errno
if identifier.args[0] == errno.EPIPE:
print("Client connection unexpectedly closed")
pass
elif identifier.args[0] == errno.EBADF:
print("Bad file descriptor.")
pass
else:
raise identifier
return (seq, t_elasped, ttl)
async def ping_async(self):
"""
Send ping manually by async.
Returns sequense number(int), round-trip time (ms, float), ttl
"""
sock = self.sock
if not self.seqs:
self.seqs = []
self.seqs.append(self.seq_num)
seq, t_elasped, ttl = None, None, None
# header
h = self.h
h.checksum = 0
h.seq = self.seq_num
h.timestamp = utime.ticks_us()
h.checksum = self.checksum(self._PKT)
try:
# send packet
if sock.send(self._PKT) == self.SIZE:
self.transmitted += 1
else:
self.seqs.remove(self.seq_num)
self.seq_num += 1
# recv packet
while 1:
resp = sock.recv(self.SIZE + 20) # ICMP header and payload + IP header
resp_mv = memoryview(resp)
h2 = uctypes.struct(uctypes.addressof(resp_mv[20:]), self.PKT_DESC, uctypes.BIG_ENDIAN)
seq = h2.seq
if h2.type==0 and h2.id==h.id and (seq in self.seqs): # 0: ICMP_ECHO_REPLY
t_elasped = (utime.ticks_us()-h2.timestamp) / 1000
self.seqs.remove(seq)
if h2.checksum == self.checksum(resp[24:]): # except IP header and a part of ICMP header (type, code, checksum)
ttl = ustruct.unpack('!B', resp_mv[8:9])[0] # time-to-live
self.received += 1
not self.quiet and print("%u bytes from %s: icmp_seq=%u, ttl=%u, time=%f ms" % (len(resp[12:]), self.CLIENT_IP, seq, ttl, t_elasped))
break
else:
not self.quiet and print("Payload checksum doesnt match")
t_elasped = None
break
except Exception as identifier:
import errno
if identifier.args[0] == errno.EPIPE:
print("Client connection unexpectedly closed")
pass
elif identifier.args[0] == errno.EBADF:
print("Bad file descriptor.")
pass
else:
raise identifier
self.END = (seq, t_elasped, ttl)
def checksum(self, data):
if len(data) & 0x1: # Odd number of bytes
data += b'\0'
cs = 0
for pos in range(0, len(data), 2):
b1 = data[pos]
b2 = data[pos + 1]
cs += (b1 << 8) + b2
while cs >= 0x10000:
cs = (cs & 0xffff) + (cs >> 16)
cs = ~cs & 0xffff
return cs
def getEND_async(self):
"""
get ping_async() result
"""
return self.END
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def close(self):
self.sock.close()来源:https://gitee.com/Yogile/uPing/
Copyright © 2014 ESP56.com All Rights Reserved
晋ICP备14006235号-22 晋公网安备14108102001165号
执行时间: 0.0098991394042969 seconds