micropython-micro-gui/gui/primitives/delay_ms.py

79 wiersze
2.5 KiB
Python
Czysty Zwykły widok Historia

2021-06-09 16:11:48 +00:00
# delay_ms.py Now uses ThreadSafeFlag and has extra .wait() API
# Usage:
# from primitives.delay_ms import Delay_ms
# Copyright (c) 2018-2021 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
import uasyncio as asyncio
from utime import ticks_add, ticks_diff, ticks_ms
from . import launch
class Delay_ms:
class DummyTimer: # Stand-in for the timer class. Can be cancelled.
def cancel(self):
pass
_fake = DummyTimer()
def __init__(self, func=None, args=(), duration=1000):
self._func = func
self._args = args
self._durn = duration # Default duration
self._retn = None # Return value of launched callable
self._tend = None # Stop time (absolute ms).
self._busy = False
self._trig = asyncio.ThreadSafeFlag()
self._tout = asyncio.Event() # Timeout event
self.wait = self._tout.wait # Allow: await wait_ms.wait()
2022-08-13 09:44:08 +00:00
self.clear = self._tout.clear
2021-06-09 16:11:48 +00:00
self._ttask = self._fake # Timer task
2022-02-06 12:05:38 +00:00
self._mtask = asyncio.create_task(self._run()) #Main task
2021-06-09 16:11:48 +00:00
async def _run(self):
while True:
await self._trig.wait() # Await a trigger
self._ttask.cancel() # Cancel and replace
await asyncio.sleep_ms(0)
dt = max(ticks_diff(self._tend, ticks_ms()), 0) # Beware already elapsed.
self._ttask = asyncio.create_task(self._timer(dt))
async def _timer(self, dt):
await asyncio.sleep_ms(dt)
self._tout.set() # Only gets here if not cancelled.
self._busy = False
if self._func is not None:
self._retn = launch(self._func, self._args)
# API
# trigger may be called from hard ISR.
def trigger(self, duration=0): # Update absolute end time, 0-> ctor default
2022-02-06 12:05:38 +00:00
if self._mtask is None:
raise RuntimeError("Delay_ms.deinit() has run.")
2021-06-09 16:11:48 +00:00
self._tend = ticks_add(ticks_ms(), duration if duration > 0 else self._durn)
self._retn = None # Default in case cancelled.
self._busy = True
self._trig.set()
def stop(self):
self._ttask.cancel()
self._ttask = self._fake
self._busy = False
def __call__(self): # Current running status
return self._busy
running = __call__
def rvalue(self):
return self._retn
def callback(self, func=None, args=()):
self._func = func
self._args = args
2022-02-06 12:05:38 +00:00
def deinit(self):
self.stop()
self._mtask.cancel()
self._mtask = None