kopia lustrzana https://github.com/micropython/micropython-lib
316 wiersze
9.3 KiB
Python
316 wiersze
9.3 KiB
Python
![]() |
import utime as time
|
||
![]() |
import utimeq
|
||
![]() |
import ucollections
|
||
![]() |
|
||
|
|
||
![]() |
type_gen = type((lambda: (yield))())
|
||
|
|
||
![]() |
DEBUG = 0
|
||
![]() |
log = None
|
||
![]() |
|
||
![]() |
def set_debug(val):
|
||
|
global DEBUG, log
|
||
|
DEBUG = val
|
||
|
if val:
|
||
|
import logging
|
||
![]() |
log = logging.getLogger("uasyncio.core")
|
||
![]() |
|
||
|
|
||
![]() |
class CancelledError(Exception):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class TimeoutError(CancelledError):
|
||
![]() |
pass
|
||
|
|
||
|
|
||
![]() |
class EventLoop:
|
||
|
|
||
![]() |
def __init__(self, runq_len=16, waitq_len=16):
|
||
|
self.runq = ucollections.deque((), runq_len, True)
|
||
|
self.waitq = utimeq.utimeq(waitq_len)
|
||
![]() |
# Current task being run. Task is a top-level coroutine scheduled
|
||
|
# in the event loop (sub-coroutines executed transparently by
|
||
|
# yield from/await, event loop "doesn't see" them).
|
||
|
self.cur_task = None
|
||
![]() |
|
||
|
def time(self):
|
||
![]() |
return time.ticks_ms()
|
||
![]() |
|
||
![]() |
def create_task(self, coro):
|
||
|
# CPython 3.4.2
|
||
![]() |
self.call_later_ms(0, coro)
|
||
![]() |
# CPython asyncio incompatibility: we don't return Task object
|
||
|
|
||
![]() |
def call_soon(self, callback, *args):
|
||
![]() |
if __debug__ and DEBUG:
|
||
|
log.debug("Scheduling in runq: %s", (callback, args))
|
||
|
self.runq.append(callback)
|
||
|
if not isinstance(callback, type_gen):
|
||
|
self.runq.append(args)
|
||
![]() |
|
||
|
def call_later(self, delay, callback, *args):
|
||
![]() |
self.call_at_(time.ticks_add(self.time(), int(delay * 1000)), callback, args)
|
||
![]() |
|
||
![]() |
def call_later_ms(self, delay, callback, *args):
|
||
![]() |
if not delay:
|
||
|
return self.call_soon(callback, *args)
|
||
![]() |
self.call_at_(time.ticks_add(self.time(), delay), callback, args)
|
||
![]() |
|
||
![]() |
def call_at_(self, time, callback, args=()):
|
||
|
if __debug__ and DEBUG:
|
||
![]() |
log.debug("Scheduling in waitq: %s", (time, callback, args))
|
||
|
self.waitq.push(time, callback, args)
|
||
![]() |
|
||
![]() |
def wait(self, delay):
|
||
|
# Default wait implementation, to be overriden in subclasses
|
||
|
# with IO scheduling
|
||
![]() |
if __debug__ and DEBUG:
|
||
![]() |
log.debug("Sleeping for: %s", delay)
|
||
![]() |
time.sleep_ms(delay)
|
||
![]() |
|
||
|
def run_forever(self):
|
||
![]() |
cur_task = [0, 0, 0]
|
||
![]() |
while True:
|
||
![]() |
# Expire entries in waitq and move them to runq
|
||
|
tnow = self.time()
|
||
|
while self.waitq:
|
||
|
t = self.waitq.peektime()
|
||
|
delay = time.ticks_diff(t, tnow)
|
||
|
if delay > 0:
|
||
|
break
|
||
|
self.waitq.pop(cur_task)
|
||
|
if __debug__ and DEBUG:
|
||
|
log.debug("Moving from waitq to runq: %s", cur_task[1])
|
||
|
self.call_soon(cur_task[1], *cur_task[2])
|
||
|
|
||
|
# Process runq
|
||
|
l = len(self.runq)
|
||
|
if __debug__ and DEBUG:
|
||
|
log.debug("Entries in runq: %d", l)
|
||
|
while l:
|
||
|
cb = self.runq.popleft()
|
||
|
l -= 1
|
||
|
args = ()
|
||
|
if not isinstance(cb, type_gen):
|
||
|
args = self.runq.popleft()
|
||
|
l -= 1
|
||
|
if __debug__ and DEBUG:
|
||
|
log.info("Next callback to run: %s", (cb, args))
|
||
|
cb(*args)
|
||
|
continue
|
||
|
|
||
![]() |
if __debug__ and DEBUG:
|
||
![]() |
log.info("Next coroutine to run: %s", (cb, args))
|
||
![]() |
self.cur_task = cb
|
||
![]() |
delay = 0
|
||
|
try:
|
||
![]() |
if args is ():
|
||
![]() |
ret = next(cb)
|
||
|
else:
|
||
|
ret = cb.send(*args)
|
||
![]() |
if __debug__ and DEBUG:
|
||
![]() |
log.info("Coroutine %s yield result: %s", cb, ret)
|
||
![]() |
if isinstance(ret, SysCall1):
|
||
|
arg = ret.arg
|
||
![]() |
if isinstance(ret, SleepMs):
|
||
|
delay = arg
|
||
![]() |
elif isinstance(ret, IORead):
|
||
![]() |
cb.pend_throw(False)
|
||
![]() |
self.add_reader(arg, cb)
|
||
![]() |
continue
|
||
|
elif isinstance(ret, IOWrite):
|
||
![]() |
cb.pend_throw(False)
|
||
![]() |
self.add_writer(arg, cb)
|
||
![]() |
continue
|
||
|
elif isinstance(ret, IOReadDone):
|
||
![]() |
self.remove_reader(arg)
|
||
![]() |
elif isinstance(ret, IOWriteDone):
|
||
![]() |
self.remove_writer(arg)
|
||
![]() |
elif isinstance(ret, StopLoop):
|
||
|
return arg
|
||
![]() |
else:
|
||
|
assert False, "Unknown syscall yielded: %r (of type %r)" % (ret, type(ret))
|
||
![]() |
elif isinstance(ret, type_gen):
|
||
|
self.call_soon(ret)
|
||
![]() |
elif isinstance(ret, int):
|
||
|
# Delay
|
||
|
delay = ret
|
||
![]() |
elif ret is None:
|
||
|
# Just reschedule
|
||
|
pass
|
||
![]() |
elif ret is False:
|
||
|
# Don't reschedule
|
||
|
continue
|
||
![]() |
else:
|
||
|
assert False, "Unsupported coroutine yield value: %r (of type %r)" % (ret, type(ret))
|
||
|
except StopIteration as e:
|
||
![]() |
if __debug__ and DEBUG:
|
||
![]() |
log.debug("Coroutine finished: %s", cb)
|
||
![]() |
continue
|
||
![]() |
except CancelledError as e:
|
||
|
if __debug__ and DEBUG:
|
||
|
log.debug("Coroutine cancelled: %s", cb)
|
||
|
continue
|
||
![]() |
# Currently all syscalls don't return anything, so we don't
|
||
|
# need to feed anything to the next invocation of coroutine.
|
||
|
# If that changes, need to pass that value below.
|
||
![]() |
if delay:
|
||
|
self.call_later_ms(delay, cb)
|
||
|
else:
|
||
|
self.call_soon(cb)
|
||
|
|
||
|
# Wait until next waitq task or I/O availability
|
||
|
delay = 0
|
||
|
if not self.runq:
|
||
|
delay = -1
|
||
|
if self.waitq:
|
||
|
tnow = self.time()
|
||
|
t = self.waitq.peektime()
|
||
|
delay = time.ticks_diff(t, tnow)
|
||
|
if delay < 0:
|
||
|
delay = 0
|
||
|
self.wait(delay)
|
||
![]() |
|
||
|
def run_until_complete(self, coro):
|
||
|
def _run_and_stop():
|
||
|
yield from coro
|
||
|
yield StopLoop(0)
|
||
|
self.call_soon(_run_and_stop())
|
||
|
self.run_forever()
|
||
|
|
||
![]() |
def stop(self):
|
||
|
self.call_soon((lambda: (yield StopLoop(0)))())
|
||
|
|
||
![]() |
def close(self):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class SysCall:
|
||
|
|
||
|
def __init__(self, *args):
|
||
|
self.args = args
|
||
|
|
||
|
def handle(self):
|
||
|
raise NotImplementedError
|
||
|
|
||
![]() |
# Optimized syscall with 1 arg
|
||
|
class SysCall1(SysCall):
|
||
|
|
||
|
def __init__(self, arg):
|
||
|
self.arg = arg
|
||
|
|
||
|
class StopLoop(SysCall1):
|
||
![]() |
pass
|
||
|
|
||
![]() |
class IORead(SysCall1):
|
||
![]() |
pass
|
||
|
|
||
![]() |
class IOWrite(SysCall1):
|
||
![]() |
pass
|
||
|
|
||
![]() |
class IOReadDone(SysCall1):
|
||
![]() |
pass
|
||
|
|
||
![]() |
class IOWriteDone(SysCall1):
|
||
![]() |
pass
|
||
|
|
||
|
|
||
![]() |
_event_loop = None
|
||
|
_event_loop_class = EventLoop
|
||
![]() |
def get_event_loop(runq_len=16, waitq_len=16):
|
||
![]() |
global _event_loop
|
||
|
if _event_loop is None:
|
||
![]() |
_event_loop = _event_loop_class(runq_len, waitq_len)
|
||
![]() |
return _event_loop
|
||
|
|
||
|
def sleep(secs):
|
||
![]() |
yield int(secs * 1000)
|
||
|
|
||
![]() |
# Implementation of sleep_ms awaitable with zero heap memory usage
|
||
|
class SleepMs(SysCall1):
|
||
|
|
||
|
def __init__(self):
|
||
|
self.v = None
|
||
|
self.arg = None
|
||
|
|
||
|
def __call__(self, arg):
|
||
|
self.v = arg
|
||
|
#print("__call__")
|
||
|
return self
|
||
|
|
||
|
def __iter__(self):
|
||
|
#print("__iter__")
|
||
|
return self
|
||
|
|
||
|
def __next__(self):
|
||
|
if self.v is not None:
|
||
|
#print("__next__ syscall enter")
|
||
|
self.arg = self.v
|
||
|
self.v = None
|
||
|
return self
|
||
|
#print("__next__ syscall exit")
|
||
|
_stop_iter.__traceback__ = None
|
||
|
raise _stop_iter
|
||
|
|
||
|
_stop_iter = StopIteration()
|
||
|
sleep_ms = SleepMs()
|
||
|
|
||
![]() |
|
||
![]() |
def cancel(coro):
|
||
|
prev = coro.pend_throw(CancelledError())
|
||
|
if prev is False:
|
||
|
_event_loop.call_soon(coro)
|
||
|
|
||
|
|
||
![]() |
class TimeoutObj:
|
||
|
def __init__(self, coro):
|
||
|
self.coro = coro
|
||
|
|
||
|
|
||
|
def wait_for_ms(coro, timeout):
|
||
|
|
||
|
def waiter(coro, timeout_obj):
|
||
|
res = yield from coro
|
||
|
if __debug__ and DEBUG:
|
||
|
log.debug("waiter: cancelling %s", timeout_obj)
|
||
|
timeout_obj.coro = None
|
||
|
return res
|
||
|
|
||
|
def timeout_func(timeout_obj):
|
||
|
if timeout_obj.coro:
|
||
|
if __debug__ and DEBUG:
|
||
|
log.debug("timeout_func: cancelling %s", timeout_obj.coro)
|
||
![]() |
prev = timeout_obj.coro.pend_throw(TimeoutError())
|
||
|
#print("prev pend", prev)
|
||
|
if prev is False:
|
||
|
_event_loop.call_soon(timeout_obj.coro)
|
||
![]() |
|
||
|
timeout_obj = TimeoutObj(_event_loop.cur_task)
|
||
|
_event_loop.call_later_ms(timeout, timeout_func, timeout_obj)
|
||
|
return (yield from waiter(coro, timeout_obj))
|
||
|
|
||
|
|
||
|
def wait_for(coro, timeout):
|
||
|
return wait_for_ms(coro, int(timeout * 1000))
|
||
|
|
||
|
|
||
![]() |
def coroutine(f):
|
||
|
return f
|
||
|
|
||
![]() |
#
|
||
|
# The functions below are deprecated in uasyncio, and provided only
|
||
|
# for compatibility with CPython asyncio
|
||
|
#
|
||
|
|
||
![]() |
def ensure_future(coro, loop=_event_loop):
|
||
![]() |
_event_loop.call_soon(coro)
|
||
|
# CPython asyncio incompatibility: we don't return Task object
|
||
![]() |
return coro
|
||
|
|
||
![]() |
|
||
|
# CPython asyncio incompatibility: Task is a function, not a class (for efficiency)
|
||
|
def Task(coro, loop=_event_loop):
|
||
|
# Same as async()
|
||
|
_event_loop.call_soon(coro)
|