Micropython学习交流群 学习QQ群:786510434 提供多种固件下载和学习交流。

Micropython-扇贝物联 QQ群:31324057 扇贝物联是一个让你与智能设备沟通更方便的物联网云平台

Micropython学习交流群 学习QQ群:468985481 学习交流ESP8266、ESP32、ESP8285、wifi模块开发交流、物联网。

Micropython老哥俩的IT农场分享QQ群:929132891 为喜欢科创制作的小白们分享一些自制的计算机软硬件免费公益课程,由两位多年从事IT研发的中年大叔发起。

Micropython ESP频道

micropython esp32 定时器调度管理器


MicroPython 定时器调度管理器


main.py

from dispatcher import Dispatcher

if __name__ == '__main__':
    def task1():
        print('task1')

    def task2(content):
        print(content)

    tasks = Dispatcher()
    tasks.add_work(task1, 3000)
    tasks.add_work(task2, 5000, 'task2')

dispatcher.py

"""
Copyright © 2022 Walkline Wang (https://walkline.wang)
Gitee: https://gitee.com/walkline/micropython-timer-dispatcher
"""
__version__ = 'v1.1'
__version_info__ = (1, 1)

from machine import Timer


class Worker(object):
	'''定时器任务'''
	def __init__(self, work:function, period:int, *params):
		self.__counter = 0
		self.__work = work
		self.__period = period
		self.__params = params

	@property
	def counter(self):
		return self.__counter
	
	@counter.setter
	def counter(self, count):
		self.__counter = count

	@property
	def period(self):
		return self.__period

	def do_work(self):
		self.__work(*self.__params)


class Dispatcher(object):
	'''定时器任务调度器'''
	__DEFAULT_PERIOD = 20

	def __init__(self, adjusting_rate=1, timer_id=0):
		'''
		初始化任务调度器

		参数:
		- adjusting_rate:时间间隔调整倍率,默认值 1。
		- timer_id:定时器 ID,默认值 0。
		'''
		if not isinstance(adjusting_rate, int) or adjusting_rate < 1:
			adjusting_rate = 1

		if not isinstance(timer_id, int):
			timer_id = 0

		self.__workers = {}
		self.__timer = Timer(timer_id)
		self.__adjusting_rate = adjusting_rate
		self.__paused = False

		self.__timer.init(
			mode=Timer.PERIODIC,
			period=Dispatcher.__DEFAULT_PERIOD,
			callback=self.__worker_callback
		)

	def deinit(self):
		self.__timer.deinit()
		self.__workers = {}

	def __worker_callback(self, _):
		if self.__paused: return

		for worker in self.__workers.values():
			if self.__paused: break

			worker.counter += 1

			if (worker.counter * Dispatcher.__DEFAULT_PERIOD * self.__adjusting_rate) % worker.period == 0:
				worker.counter = 0
				worker.do_work()

	def add_work(self, work:function, period:int, *params) -> bool:
		'''
		添加/更新一个调度任务

		参数:
		- work:任务函数
		- period:任务执行间隔,单位 毫秒
		- params:任务函数参数列表
		'''
		result = False

		if callable(work):
			self.__workers[id(work)] = Worker(work, period, *params)
			result = True
		else:
			print('work must be a function')

		return result

	def has_work(self, work:function) -> bool:
		'''
		判断任务是否在队列中

		参数:
		- work:任务函数
		'''
		result = False

		if callable(work):
			result = True if self.__workers.get(id(work)) is not None else False

		return result

	def del_work(self, work:function=None):
		'''
		删除指定或最后添加的任务

		参数:
		- work:任务函数,默认值 None
		'''
		if work is None:
			if len(self.__workers) > 0:
				self.pause()
				self.__workers.popitem()
				self.pause()
		elif callable(work):
			if self.has_work(work):
				self.pause()
				self.__workers.pop(id(work))
				self.pause()

	def del_works(self):
		'''删除所有任务'''
		self.pause()
		self.__workers.clear()
		self.pause()

	def pause(self):
		'''暂停/开启 所有任务'''
		self.__paused = not self.__paused
	
	def is_paused(self):
		'''判断所有任务是否正在运行'''
		return self.__paused

来源:https://gitee.com/walkline/micropython-timer-dispatcher/tree/master


推荐分享
图文皆来源于网络,内容仅做公益性分享,版权归原作者所有,如有侵权请告知删除!
 

Copyright © 2014 ESP56.com All Rights Reserved

执行时间: 0.0091381072998047 seconds