Micropython学习交流群 学习QQ群:786510434 提供多种固件下载和学习交流。
Micropython-扇贝物联 QQ群:31324057 扇贝物联是一个让你与智能设备沟通更方便的物联网云平台
Micropython学习交流群 学习QQ群:468985481 学习交流ESP8266、ESP32、ESP8285、wifi模块开发交流、物联网。
Micropython老哥俩的IT农场分享QQ群:929132891 为喜欢科创制作的小白们分享一些自制的计算机软硬件免费公益课程,由两位多年从事IT研发的中年大叔发起。
MicroPython 定时器调度管理器
main.py
from dispatcher import Dispatcher if __name__ == '__main__': def task1(): print('task1') def task2(content): print(content) tasks = Dispatcher() tasks.add_work(task1, 3000) tasks.add_work(task2, 5000, 'task2')
dispatcher.py
""" Copyright © 2022 Walkline Wang (https://walkline.wang) Gitee: https://gitee.com/walkline/micropython-timer-dispatcher """ __version__ = 'v1.1' __version_info__ = (1, 1) from machine import Timer class Worker(object): '''定时器任务''' def __init__(self, work:function, period:int, *params): self.__counter = 0 self.__work = work self.__period = period self.__params = params @property def counter(self): return self.__counter @counter.setter def counter(self, count): self.__counter = count @property def period(self): return self.__period def do_work(self): self.__work(*self.__params) class Dispatcher(object): '''定时器任务调度器''' __DEFAULT_PERIOD = 20 def __init__(self, adjusting_rate=1, timer_id=0): ''' 初始化任务调度器 参数: - adjusting_rate:时间间隔调整倍率,默认值 1。 - timer_id:定时器 ID,默认值 0。 ''' if not isinstance(adjusting_rate, int) or adjusting_rate < 1: adjusting_rate = 1 if not isinstance(timer_id, int): timer_id = 0 self.__workers = {} self.__timer = Timer(timer_id) self.__adjusting_rate = adjusting_rate self.__paused = False self.__timer.init( mode=Timer.PERIODIC, period=Dispatcher.__DEFAULT_PERIOD, callback=self.__worker_callback ) def deinit(self): self.__timer.deinit() self.__workers = {} def __worker_callback(self, _): if self.__paused: return for worker in self.__workers.values(): if self.__paused: break worker.counter += 1 if (worker.counter * Dispatcher.__DEFAULT_PERIOD * self.__adjusting_rate) % worker.period == 0: worker.counter = 0 worker.do_work() def add_work(self, work:function, period:int, *params) -> bool: ''' 添加/更新一个调度任务 参数: - work:任务函数 - period:任务执行间隔,单位 毫秒 - params:任务函数参数列表 ''' result = False if callable(work): self.__workers[id(work)] = Worker(work, period, *params) result = True else: print('work must be a function') return result def has_work(self, work:function) -> bool: ''' 判断任务是否在队列中 参数: - work:任务函数 ''' result = False if callable(work): result = True if self.__workers.get(id(work)) is not None else False return result def del_work(self, work:function=None): ''' 删除指定或最后添加的任务 参数: - work:任务函数,默认值 None ''' if work is None: if len(self.__workers) > 0: self.pause() self.__workers.popitem() self.pause() elif callable(work): if self.has_work(work): self.pause() self.__workers.pop(id(work)) self.pause() def del_works(self): '''删除所有任务''' self.pause() self.__workers.clear() self.pause() def pause(self): '''暂停/开启 所有任务''' self.__paused = not self.__paused def is_paused(self): '''判断所有任务是否正在运行''' return self.__paused
来源:https://gitee.com/walkline/micropython-timer-dispatcher/tree/master
Copyright © 2014 ESP56.com All Rights Reserved
执行时间: 0.0099320411682129 seconds