diff --git a/extmod/uasyncio/__init__.py b/extmod/uasyncio/__init__.py new file mode 100644 index 0000000000..6bff13883a --- /dev/null +++ b/extmod/uasyncio/__init__.py @@ -0,0 +1,26 @@ +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019 Damien P. George + +from .core import * + +__version__ = (3, 0, 0) + +_attrs = { + "wait_for": "funcs", + "gather": "funcs", + "Event": "event", + "Lock": "lock", + "open_connection": "stream", + "start_server": "stream", +} + +# Lazy loader, effectively does: +# global attr +# from .mod import attr +def __getattr__(attr): + mod = _attrs.get(attr, None) + if mod is None: + raise AttributeError(attr) + value = getattr(__import__(mod, None, None, True, 1), attr) + globals()[attr] = value + return value diff --git a/extmod/uasyncio/core.py b/extmod/uasyncio/core.py new file mode 100644 index 0000000000..049e2537f8 --- /dev/null +++ b/extmod/uasyncio/core.py @@ -0,0 +1,231 @@ +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019 Damien P. George + +from time import ticks_ms as ticks, ticks_diff, ticks_add +import sys, select + +# Import TaskQueue and Task +from .task import TaskQueue, Task + + +################################################################################ +# Exceptions + + +class CancelledError(BaseException): + pass + + +class TimeoutError(Exception): + pass + + +################################################################################ +# Sleep functions + +# "Yield" once, then raise StopIteration +class SingletonGenerator: + def __init__(self): + self.state = None + self.exc = StopIteration() + + def __iter__(self): + return self + + def __next__(self): + if self.state is not None: + _task_queue.push_sorted(cur_task, self.state) + self.state = None + return None + else: + self.exc.__traceback__ = None + raise self.exc + + +# Pause task execution for the given time (integer in milliseconds, uPy extension) +# Use a SingletonGenerator to do it without allocating on the heap +def sleep_ms(t, sgen=SingletonGenerator()): + assert sgen.state is None + sgen.state = ticks_add(ticks(), t) + return sgen + + +# Pause task execution for the given time (in seconds) +def sleep(t): + return sleep_ms(int(t * 1000)) + + +################################################################################ +# Queue and poller for stream IO + + +class IOQueue: + def __init__(self): + self.poller = select.poll() + self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream] + + def _enqueue(self, s, idx): + if id(s) not in self.map: + entry = [None, None, s] + entry[idx] = cur_task + self.map[id(s)] = entry + self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT) + else: + sm = self.map[id(s)] + assert sm[idx] is None + assert sm[1 - idx] is not None + sm[idx] = cur_task + self.poller.modify(s, select.POLLIN | select.POLLOUT) + # Link task to this IOQueue so it can be removed if needed + cur_task.data = self + + def _dequeue(self, s): + del self.map[id(s)] + self.poller.unregister(s) + + def queue_read(self, s): + self._enqueue(s, 0) + + def queue_write(self, s): + self._enqueue(s, 1) + + def remove(self, task): + while True: + del_s = None + for k in self.map: # Iterate without allocating on the heap + q0, q1, s = self.map[k] + if q0 is task or q1 is task: + del_s = s + break + if del_s is not None: + self._dequeue(s) + else: + break + + def wait_io_event(self, dt): + for s, ev in self.poller.ipoll(dt): + sm = self.map[id(s)] + # print('poll', s, sm, ev) + if ev & ~select.POLLOUT and sm[0] is not None: + # POLLIN or error + _task_queue.push_head(sm[0]) + sm[0] = None + if ev & ~select.POLLIN and sm[1] is not None: + # POLLOUT or error + _task_queue.push_head(sm[1]) + sm[1] = None + if sm[0] is None and sm[1] is None: + self._dequeue(s) + elif sm[0] is None: + self.poller.modify(s, select.POLLOUT) + else: + self.poller.modify(s, select.POLLIN) + + +################################################################################ +# Main run loop + +# TaskQueue of Task instances +_task_queue = TaskQueue() + +# Task queue and poller for stream IO +_io_queue = IOQueue() + + +# Ensure the awaitable is a task +def _promote_to_task(aw): + return aw if isinstance(aw, Task) else create_task(aw) + + +# Create and schedule a new task from a coroutine +def create_task(coro): + if not hasattr(coro, "send"): + raise TypeError("coroutine expected") + t = Task(coro, globals()) + _task_queue.push_head(t) + return t + + +# Keep scheduling tasks until there are none left to schedule +def run_until_complete(main_task=None): + global cur_task + excs_all = (CancelledError, Exception) # To prevent heap allocation in loop + excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop + while True: + # Wait until the head of _task_queue is ready to run + dt = 1 + while dt > 0: + dt = -1 + t = _task_queue.peek() + if t: + # A task waiting on _task_queue; "ph_key" is time to schedule task at + dt = max(0, ticks_diff(t.ph_key, ticks())) + elif not _io_queue.map: + # No tasks can be woken so finished running + return + # print('(poll {})'.format(dt), len(_io_queue.map)) + _io_queue.wait_io_event(dt) + + # Get next task to run and continue it + t = _task_queue.pop_head() + cur_task = t + try: + # Continue running the coroutine, it's responsible for rescheduling itself + exc = t.data + if not exc: + t.coro.send(None) + else: + t.data = None + t.coro.throw(exc) + except excs_all as er: + # Check the task is not on any event queue + assert t.data is None + # This task is done, check if it's the main task and then loop should stop + if t is main_task: + if isinstance(er, StopIteration): + return er.value + raise er + # Save return value of coro to pass up to caller + t.data = er + # Schedule any other tasks waiting on the completion of this task + waiting = False + if hasattr(t, "waiting"): + while t.waiting.peek(): + _task_queue.push_head(t.waiting.pop_head()) + waiting = True + t.waiting = None # Free waiting queue head + # Print out exception for detached tasks + if not waiting and not isinstance(er, excs_stop): + print("task raised exception:", t.coro) + sys.print_exception(er) + # Indicate task is done + t.coro = None + + +# Create a new task from a coroutine and run it until it finishes +def run(coro): + return run_until_complete(create_task(coro)) + + +################################################################################ +# Event loop wrapper + + +class Loop: + def create_task(self, coro): + return create_task(coro) + + def run_forever(self): + run_until_complete() + # TODO should keep running until .stop() is called, even if there're no tasks left + + def run_until_complete(self, aw): + return run_until_complete(_promote_to_task(aw)) + + def close(self): + pass + + +# The runq_len and waitq_len arguments are for legacy uasyncio compatibility +def get_event_loop(runq_len=0, waitq_len=0): + return Loop() diff --git a/extmod/uasyncio/event.py b/extmod/uasyncio/event.py new file mode 100644 index 0000000000..31cb00e055 --- /dev/null +++ b/extmod/uasyncio/event.py @@ -0,0 +1,31 @@ +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019-2020 Damien P. George + +from . import core + +# Event class for primitive events that can be waited on, set, and cleared +class Event: + def __init__(self): + self.state = False # False=unset; True=set + self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event + + def is_set(self): + return self.state + + def set(self): + # Event becomes set, schedule any tasks waiting on it + while self.waiting.peek(): + core._task_queue.push_head(self.waiting.pop_head()) + self.state = True + + def clear(self): + self.state = False + + async def wait(self): + if not self.state: + # Event not set, put the calling task on the event's waiting queue + self.waiting.push_head(core.cur_task) + # Set calling task's data to the event's queue so it can be removed if needed + core.cur_task.data = self.waiting + yield + return True diff --git a/extmod/uasyncio/funcs.py b/extmod/uasyncio/funcs.py new file mode 100644 index 0000000000..7a4bddf256 --- /dev/null +++ b/extmod/uasyncio/funcs.py @@ -0,0 +1,50 @@ +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019-2020 Damien P. George + +from . import core + + +async def wait_for(aw, timeout): + aw = core._promote_to_task(aw) + if timeout is None: + return await aw + + def cancel(aw, timeout): + await core.sleep(timeout) + aw.cancel() + + cancel_task = core.create_task(cancel(aw, timeout)) + try: + ret = await aw + except core.CancelledError: + # Ignore CancelledError from aw, it's probably due to timeout + pass + finally: + # Cancel the "cancel" task if it's still active (optimisation instead of cancel_task.cancel()) + if cancel_task.coro is not None: + core._task_queue.remove(cancel_task) + if cancel_task.coro is None: + # Cancel task ran to completion, ie there was a timeout + raise core.TimeoutError + return ret + + +async def gather(*aws, return_exceptions=False): + ts = [core._promote_to_task(aw) for aw in aws] + for i in range(len(ts)): + try: + # TODO handle cancel of gather itself + # if ts[i].coro: + # iter(ts[i]).waiting.push_head(cur_task) + # try: + # yield + # except CancelledError as er: + # # cancel all waiting tasks + # raise er + ts[i] = await ts[i] + except Exception as er: + if return_exceptions: + ts[i] = er + else: + raise er + return ts diff --git a/extmod/uasyncio/lock.py b/extmod/uasyncio/lock.py new file mode 100644 index 0000000000..18a55cb483 --- /dev/null +++ b/extmod/uasyncio/lock.py @@ -0,0 +1,53 @@ +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019-2020 Damien P. George + +from . import core + +# Lock class for primitive mutex capability +class Lock: + def __init__(self): + # The state can take the following values: + # - 0: unlocked + # - 1: locked + # - : unlocked but this task has been scheduled to acquire the lock next + self.state = 0 + # Queue of Tasks waiting to acquire this Lock + self.waiting = core.TaskQueue() + + def locked(self): + return self.state == 1 + + def release(self): + if self.state != 1: + raise RuntimeError + if self.waiting.peek(): + # Task(s) waiting on lock, schedule next Task + self.state = self.waiting.pop_head() + core._task_queue.push_head(self.state) + else: + # No Task waiting so unlock + self.state = 0 + + async def acquire(self): + if self.state != 0: + # Lock unavailable, put the calling Task on the waiting queue + self.waiting.push_head(core.cur_task) + # Set calling task's data to the lock's queue so it can be removed if needed + core.cur_task.data = self.waiting + try: + yield + except core.CancelledError as er: + if self.state == core.cur_task: + # Cancelled while pending on resume, schedule next waiting Task + self.state = 1 + self.release() + raise er + # Lock available, set it as locked + self.state = 1 + return True + + async def __aenter__(self): + return await self.acquire() + + async def __aexit__(self, exc_type, exc, tb): + return self.release() diff --git a/extmod/uasyncio/stream.py b/extmod/uasyncio/stream.py new file mode 100644 index 0000000000..7803ac4bfa --- /dev/null +++ b/extmod/uasyncio/stream.py @@ -0,0 +1,141 @@ +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019-2020 Damien P. George + +from . import core + + +class Stream: + def __init__(self, s, e={}): + self.s = s + self.e = e + self.out_buf = b"" + + def get_extra_info(self, v): + return self.e[v] + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.close() + + def close(self): + pass + + async def wait_closed(self): + # TODO yield? + self.s.close() + + async def read(self, n): + yield core._io_queue.queue_read(self.s) + return self.s.read(n) + + async def readline(self): + l = b"" + while True: + yield core._io_queue.queue_read(self.s) + l2 = self.s.readline() # may do multiple reads but won't block + l += l2 + if not l2 or l[-1] == 10: # \n (check l in case l2 is str) + return l + + def write(self, buf): + self.out_buf += buf + + async def drain(self): + mv = memoryview(self.out_buf) + off = 0 + while off < len(mv): + yield core._io_queue.queue_write(self.s) + ret = self.s.write(mv[off:]) + if ret is not None: + off += ret + self.out_buf = b"" + + +# Create a TCP stream connection to a remote host +async def open_connection(host, port): + from uerrno import EINPROGRESS + import usocket as socket + + ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking! + s = socket.socket() + s.setblocking(False) + ss = Stream(s) + try: + s.connect(ai[-1]) + except OSError as er: + if er.args[0] != EINPROGRESS: + raise er + yield core._io_queue.queue_write(s) + return ss, ss + + +# Class representing a TCP stream server, can be closed and used in "async with" +class Server: + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + self.close() + await self.wait_closed() + + def close(self): + self.task.cancel() + + async def wait_closed(self): + await self.task + + async def _serve(self, cb, host, port, backlog): + import usocket as socket + + ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking! + s = socket.socket() + s.setblocking(False) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(ai[-1]) + s.listen(backlog) + self.task = core.cur_task + # Accept incoming connections + while True: + try: + yield core._io_queue.queue_read(s) + except core.CancelledError: + # Shutdown server + s.close() + return + try: + s2, addr = s.accept() + except: + # Ignore a failed accept + continue + s2.setblocking(False) + s2s = Stream(s2, {"peername": addr}) + core.create_task(cb(s2s, s2s)) + + +# Helper function to start a TCP stream server, running as a new task +# TODO could use an accept-callback on socket read activity instead of creating a task +async def start_server(cb, host, port, backlog=5): + s = Server() + core.create_task(s._serve(cb, host, port, backlog)) + return s + + +################################################################################ +# Legacy uasyncio compatibility + + +async def stream_awrite(self, buf, off=0, sz=-1): + if off != 0 or sz != -1: + buf = memoryview(buf) + if sz == -1: + sz = len(buf) + buf = buf[off : off + sz] + self.write(buf) + await self.drain() + + +Stream.aclose = Stream.wait_closed +Stream.awrite = stream_awrite +Stream.awritestr = stream_awrite # TODO explicitly convert to bytes? diff --git a/extmod/uasyncio/task.py b/extmod/uasyncio/task.py new file mode 100644 index 0000000000..1d5bcc5cf7 --- /dev/null +++ b/extmod/uasyncio/task.py @@ -0,0 +1,168 @@ +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019-2020 Damien P. George + +# This file contains the core TaskQueue based on a pairing heap, and the core Task class. +# They can optionally be replaced by C implementations. + +from . import core + + +# pairing-heap meld of 2 heaps; O(1) +def ph_meld(h1, h2): + if h1 is None: + return h2 + if h2 is None: + return h1 + lt = core.ticks_diff(h1.ph_key, h2.ph_key) < 0 + if lt: + if h1.ph_child is None: + h1.ph_child = h2 + else: + h1.ph_child_last.ph_next = h2 + h1.ph_child_last = h2 + h2.ph_next = None + h2.ph_rightmost_parent = h1 + return h1 + else: + h1.ph_next = h2.ph_child + h2.ph_child = h1 + if h1.ph_next is None: + h2.ph_child_last = h1 + h1.ph_rightmost_parent = h2 + return h2 + + +# pairing-heap pairing operation; amortised O(log N) +def ph_pairing(child): + heap = None + while child is not None: + n1 = child + child = child.ph_next + n1.ph_next = None + if child is not None: + n2 = child + child = child.ph_next + n2.ph_next = None + n1 = ph_meld(n1, n2) + heap = ph_meld(heap, n1) + return heap + + +# pairing-heap delete of a node; stable, amortised O(log N) +def ph_delete(heap, node): + if node is heap: + child = heap.ph_child + node.ph_child = None + return ph_pairing(child) + # Find parent of node + parent = node + while parent.ph_next is not None: + parent = parent.ph_next + parent = parent.ph_rightmost_parent + # Replace node with pairing of its children + if node is parent.ph_child and node.ph_child is None: + parent.ph_child = node.ph_next + node.ph_next = None + return heap + elif node is parent.ph_child: + child = node.ph_child + next = node.ph_next + node.ph_child = None + node.ph_next = None + node = ph_pairing(child) + parent.ph_child = node + else: + n = parent.ph_child + while node is not n.ph_next: + n = n.ph_next + child = node.ph_child + next = node.ph_next + node.ph_child = None + node.ph_next = None + node = ph_pairing(child) + if node is None: + node = n + else: + n.ph_next = node + node.ph_next = next + if next is None: + node.ph_rightmost_parent = parent + parent.ph_child_last = node + return heap + + +# TaskQueue class based on the above pairing-heap functions. +class TaskQueue: + def __init__(self): + self.heap = None + + def peek(self): + return self.heap + + def push_sorted(self, v, key): + v.data = None + v.ph_key = key + v.ph_child = None + v.ph_next = None + self.heap = ph_meld(v, self.heap) + + def push_head(self, v): + self.push_sorted(v, core.ticks()) + + def pop_head(self): + v = self.heap + self.heap = ph_pairing(self.heap.ph_child) + return v + + def remove(self, v): + self.heap = ph_delete(self.heap, v) + + +# Task class representing a coroutine, can be waited on and cancelled. +class Task: + def __init__(self, coro, globals=None): + self.coro = coro # Coroutine of this Task + self.data = None # General data for queue it is waiting on + self.ph_key = 0 # Pairing heap + self.ph_child = None # Paring heap + self.ph_child_last = None # Paring heap + self.ph_next = None # Paring heap + self.ph_rightmost_parent = None # Paring heap + + def __iter__(self): + if not hasattr(self, "waiting"): + # Lazily allocated head of linked list of Tasks waiting on completion of this task. + self.waiting = TaskQueue() + return self + + def __next__(self): + if not self.coro: + # Task finished, raise return value to caller so it can continue. + raise self.data + else: + # Put calling task on waiting queue. + self.waiting.push_head(core.cur_task) + # Set calling task's data to this task that it waits on, to double-link it. + core.cur_task.data = self + + def cancel(self): + # Check if task is already finished. + if self.coro is None: + return False + # Can't cancel self (not supported yet). + if self is core.cur_task: + raise RuntimeError("cannot cancel self") + # If Task waits on another task then forward the cancel to the one it's waiting on. + while isinstance(self.data, Task): + self = self.data + # Reschedule Task as a cancelled task. + if hasattr(self.data, "remove"): + # Not on the main running queue, remove the task from the queue it's on. + self.data.remove(self) + core._task_queue.push_head(self) + elif core.ticks_diff(self.ph_key, core.ticks()) > 0: + # On the main running queue but scheduled in the future, so bring it forward to now. + core._task_queue.remove(self) + core._task_queue.push_head(self) + self.data = core.CancelledError + return True