From 08125828fefe9391cb1689dca5435bd7bb3beb54 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Fri, 15 Nov 2019 17:26:26 +0000 Subject: [PATCH] Add demo of fast I/O on new uasyncio. --- uasyncio_iostream/tests/iotest4.py | 2 +- uasyncio_iostream/uasyncio.py | 526 +++++++++++++++++++++++++++++ 2 files changed, 527 insertions(+), 1 deletion(-) create mode 100644 uasyncio_iostream/uasyncio.py diff --git a/uasyncio_iostream/tests/iotest4.py b/uasyncio_iostream/tests/iotest4.py index 6b5c77f..b34723b 100644 --- a/uasyncio_iostream/tests/iotest4.py +++ b/uasyncio_iostream/tests/iotest4.py @@ -74,7 +74,7 @@ class MyIO(io.IOBase): # Emulate unbuffered hardware which writes one character: uasyncio waits # until hardware is ready for the next. Hardware ready is emulated by write # timer callback. - def write(self, buf, off, sz): + def write(self, buf, off=0, sz=0): self.wch = buf[off] # Hardware starts to write a char return 1 # 1 byte written. uasyncio waits on ioctl write ready diff --git a/uasyncio_iostream/uasyncio.py b/uasyncio_iostream/uasyncio.py new file mode 100644 index 0000000..c9accae --- /dev/null +++ b/uasyncio_iostream/uasyncio.py @@ -0,0 +1,526 @@ +""" +MicroPython uasyncio module +MIT license; Copyright (c) 2019 Damien P. George +""" + +from time import ticks_ms as ticks, ticks_diff, ticks_add +import sys, select + +################################################################################ +# Queue class + +class Queue: + def __init__(self): + self.next = None + self.last = None + + def push_sorted(self, v, data): + v.data = data + + if ticks_diff(data, ticks()) <= 0: + cur = self.last + if cur and ticks_diff(data, cur.data) >= 0: + # Optimisation: can start looking from self.last to insert this item + while cur.next and ticks_diff(data, cur.next.data) >= 0: + cur = cur.next + v.next = cur.next + cur.next = v + self.last = cur + return + + cur = self + while cur.next and (not isinstance(cur.next.data, int) or ticks_diff(data, cur.next.data) >= 0): + cur = cur.next + v.next = cur.next + cur.next = v + if cur is not self: + self.last = cur + + def push_head(self, v): + self.push_sorted(v, ticks()) + + def push_priority(self, v): + v.data = ticks() + v.next = self.next + self.next = v + + def push_error(self, v, err): + # Push directly to head (but should probably still consider fairness) + v.data = err + v.next = self.next + self.next = v + + def pop_head(self): + v = self.next + self.next = v.next + if self.last is v: + self.last = v.next + return v + + def remove(self, v): + cur = self + while cur.next: + if cur.next is v: + cur.next = v.next + break + cur = cur.next + if self.last is v: + self.last = v.next + +################################################################################ +# Fundamental classes + +class CancelledError(BaseException): + pass + +class TimeoutError(Exception): + pass + +# Task class representing a coroutine, can be waited on and cancelled +class Task: + def __init__(self, coro): + self.coro = coro # Coroutine of this Task + self.next = None # For linked list + self.data = None # General data for linked list + def __iter__(self): + if not hasattr(self, 'waiting'): + # Lazily allocated head of linked list of Tasks waiting on completion of this task + self.waiting = Queue() + return self + def send(self, v): + if not self.coro: + # Task finished, raise return value to caller so it can continue + raise self.data + else: + # Put calling task on waiting queue + self.waiting.push_head(cur_task) + # Set calling task's data to this task that it waits on, to double-link it + cur_task.data = self + def cancel(self): + if self is cur_task: + raise RuntimeError('cannot cancel self') + # If Task waits on another task then forward the cancel to the one it's waiting on + while isinstance(self.data, Task): + self = self.data + # Reschedule Task as a cancelled task + if hasattr(self.data, 'waiting'): + self.data.waiting.remove(self) + else: + _queue.remove(self) + _queue.push_error(self, CancelledError) + return True + +# Create and schedule a new task from a coroutine +def create_task(coro): + t = Task(coro) + _queue.push_head(t) + return t + +# "Yield" once, then raise StopIteration +class SingletonGenerator: + def __init__(self): + self.state = 0 + self.exc = StopIteration() + def __iter__(self): + return self + def __next__(self): + if self.state: + self.state = 0 + return None + else: + self.exc.__traceback__ = None + raise self.exc + +# Pause task execution for the given time (integer in milliseconds, uPy extension) +# Use a SingletonGenerator to do it without allocating on the heap +def sleep_ms(t, sgen=SingletonGenerator()): + _queue.push_sorted(cur_task, ticks_add(ticks(), t)) + sgen.state = 1 + return sgen + +# Pause task execution for the given time (in seconds) +def sleep(t): + return sleep_ms(int(t * 1000)) + +################################################################################ +# Helper functions + +def _promote_to_task(aw): + return aw if isinstance(aw, Task) else create_task(aw) + +def run(coro): + return run_until_complete(create_task(coro)) + +async def wait_for(aw, timeout): + aw = _promote_to_task(aw) + if timeout is None: + return await aw + def cancel(aw, timeout): + await sleep(timeout) + aw.cancel() + cancel_task = create_task(cancel(aw, timeout)) + try: + ret = await aw + except CancelledError: + # Ignore CancelledError from aw, it's probably due to timeout + pass + finally: + _queue.remove(cancel_task) + if cancel_task.coro is None: + # Cancel task ran to completion, ie there was a timeout + raise TimeoutError + return ret + +async def gather(*aws, return_exceptions=False): + ts = [_promote_to_task(aw) for aw in aws] + for i in range(len(ts)): + try: + # TODO handle cancel of gather itself + #if ts[i].coro: + # iter(ts[i]).waiting.push_head(cur_task) + # try: + # yield + # except CancelledError as er: + # # cancel all waiting tasks + # raise er + ts[i] = await ts[i] + except Exception as er: + if return_exceptions: + ts[i] = er + else: + raise er + return ts + +################################################################################ +# Lock (optional component) + +# Lock class for primitive mutex capability +class Lock: + def __init__(self): + self.state = 0 # 0=unlocked; 1=unlocked but waiting task pending resume; 2=locked + self.waiting = Queue() # Queue of Tasks waiting to acquire this Lock + def locked(self): + return self.state == 2 + def release(self): + if self.state != 2: + raise RuntimeError + if self.waiting.next: + # Task(s) waiting on lock, schedule first Task + _queue.push_head(self.waiting.pop_head()) + self.state = 1 + else: + # No Task waiting so unlock + self.state = 0 + async def acquire(self): + if self.state != 0 or self.waiting.next: + # Lock unavailable, put the calling Task on the waiting queue + self.waiting.push_head(cur_task) + # Set calling task's data to double-link it + cur_task.data = self + try: + yield + except CancelledError: + if self.state == 1: + # Cancelled while pending on resume, schedule next waiting Task + self.state = 2 + self.release() + raise + # Lock available, set it as locked + self.state = 2 + return True + async def __aenter__(self): + return await self.acquire() + async def __aexit__(self, exc_type, exc, tb): + return self.release() + +################################################################################ +# Event (optional component) + +# Event class for primitive events that can be waited on, set, and cleared +class Event: + def __init__(self): + self.state = 0 # 0=unset; 1=set + self.waiting = Queue() # Queue of Tasks waiting on completion of this event + def set(self): + # Event becomes set, schedule any tasks waiting on it + while self.waiting.next: + _queue.push_head(self.waiting.pop_head()) + self.state = 1 + def clear(self): + self.state = 0 + async def wait(self): + if self.state == 0: + # Event not set, put the calling task on the event's waiting queue + self.waiting.push_head(cur_task) + # Set calling task's data to this event that it waits on, to double-link it + cur_task.data = self + yield + return True + +################################################################################ +# General streams + +# Queue and poller for stream IO +class IOQueue: + def __init__(self): + self.poller = select.poll() + self.map = {} + self.fast = set() + def _queue(self, s, idx): + if id(s) not in self.map: + entry = [None, None, s] + entry[idx] = cur_task + self.map[id(s)] = entry + self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT) + else: + sm = self.map[id(s)] + assert sm[idx] is None + assert sm[1 - idx] is not None + sm[idx] = cur_task + self.poller.modify(s, select.POLLIN | select.POLLOUT) + def _dequeue(self, s): + del self.map[id(s)] + self.poller.unregister(s) + def queue_read(self, s): + self._queue(s, 0) + def queue_write(self, s): + self._queue(s, 1) + def priority(self, sid, v): + self.fast.add(sid) if v else self.fast.discard(sid) + def remove(self, task): + while True: + del_s = None + for k in self.map: # Iterate without allocating on the heap + q0, q1, s = self.map[k] + if q0 is task or q1 is task: + del_s = s + break + if del_s is not None: + self._dequeue(s) + else: + break + def wait_io_event(self, dt): + for s, ev in self.poller.ipoll(dt): + sid = id(s) + sm = self.map[sid] + err = ev & ~(select.POLLIN | select.POLLOUT) + fast = sid in self.fast + #print('poll', s, sm, ev, err) + if ev & select.POLLIN or (err and sm[0] is not None): + if fast: + _queue.push_priority(sm[0]) + else: + _queue.push_head(sm[0]) + sm[0] = None + if ev & select.POLLOUT or (err and sm[1] is not None): + if fast: + _queue.push_priority(sm[0]) + else: + _queue.push_head(sm[0]) + sm[1] = None + if sm[0] is None and sm[1] is None: + self._dequeue(s) + elif sm[0] is None: + self.poller.modify(s, select.POLLOUT) + else: + self.poller.modify(s, select.POLLIN) + +class Stream: + def __init__(self, s, e={}): + self.s = s + self.e = e + self.out_buf = b'' + def get_extra_info(self, v): + return self.e[v] + async def __aenter__(self): + return self + async def __aexit__(self, exc_type, exc, tb): + await self.close() + def close(self): + pass + async def wait_closed(self): + # TODO yield? + self.s.close() + def priority(self, v=True): + _io_queue.priority(id(self.s), v) + async def read(self, n): + yield _io_queue.queue_read(self.s) + return self.s.read(n) + async def readline(self): + l = b'' + while True: + yield _io_queue.queue_read(self.s) + l2 = self.s.readline() # may do multiple reads but won't block + l += l2 + if not l2 or l[-1] == 10: # \n (check l in case l2 is str) + return l + def write(self, buf): + self.out_buf += buf + async def drain(self): + mv = memoryview(self.out_buf) + off = 0 + while off < len(mv): + yield _io_queue.queue_write(self.s) + ret = self.s.write(mv[off:]) + if ret is not None: + off += ret + self.out_buf = b'' + +################################################################################ +# Socket streams + +# Create a TCP stream connection to a remove host +async def open_connection(host, port): + try: + import usocket as socket + except ImportError: + import socket + ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking! + s = socket.socket() + s.setblocking(False) + ss = Stream(s) + try: + s.connect(ai[-1]) + except OSError as er: + if er.args[0] != 115: # EINPROGRESS + raise er + yield _io_queue.queue_write(s) + return ss, ss + +# Class representing a TCP stream server, can be closed and used in "async with" +class Server: + async def __aenter__(self): + return self + async def __aexit__(self, exc_type, exc, tb): + self.close() + await self.wait_closed() + def close(self): + self.task.cancel() + async def wait_closed(self): + await self.task + async def _serve(self, cb, host, port, backlog): + try: + import usocket as socket + except ImportError: + import socket + ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking! + s = socket.socket() + s.setblocking(False) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(ai[-1]) + s.listen(backlog) + self.task = cur_task + # Accept incoming connections + while True: + try: + yield _io_queue.queue_read(s) + except CancelledError: + # Shutdown server + s.close() + return + s2, addr = s.accept() + s2.setblocking(False) + s2s = Stream(s2, {'peername': addr}) + create_task(cb(s2s, s2s)) + +# Helper function to start a TCP stream server, running as a new task +# TODO could use an accept-callback on socket read activity instead of creating a task +async def start_server(cb, host, port, backlog=5): + s = Server() + create_task(s._serve(cb, host, port, backlog)) + return s + +################################################################################ +# Main run loop + +# Queue of Task instances +_queue = Queue() + +# Task queue and poller for stream IO +_io_queue = IOQueue() + +# Keep scheduling tasks until there are none left to schedule +def run_until_complete(main_task=None): + global cur_task + excs_all = (CancelledError, Exception) # To prevent heap allocation in loop + excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop + while True: + # Wait until the head of _queue is ready to run + dt = 1 + while dt > 0: + dt = -1 + if _queue.next: + # A task waiting on _queue + if isinstance(_queue.next.data, int): + # "data" is time to schedule task at + dt = max(0, ticks_diff(_queue.next.data, ticks())) + else: + # "data" is an exception to throw into the task + dt = 0 + elif not _io_queue.map: + # No tasks can be woken so finished running + return + #print('(poll {})'.format(dt), len(_io_queue.map)) + _io_queue.wait_io_event(dt) + + # Get next task to run and continue it + t = _queue.pop_head() + cur_task = t + try: + # Continue running the coroutine, it's responsible for rescheduling itself + if isinstance(t.data, int): + t.coro.send(None) + else: + t.coro.throw(t.data) + except excs_all as er: + # This task is done, schedule any tasks waiting on it + if t is main_task: + if isinstance(er, StopIteration): + return er.value + raise er + t.data = er # save return value of coro to pass up to caller + waiting = False + if hasattr(t, 'waiting'): + while t.waiting.next: + _queue.push_head(t.waiting.pop_head()) + waiting = True + t.waiting = None # Free waiting queue head + _io_queue.remove(t) # Remove task from the IO queue (if it's on it) + t.coro = None # Indicate task is done + # Print out exception for detached tasks + if not waiting and not isinstance(er, excs_stop): + print('task raised exception:', t.coro) + sys.print_exception(er) + +################################################################################ +# Legacy uasyncio compatibility + +async def stream_awrite(self, buf, off=0, sz=-1): + if off != 0 or sz != -1: + buf = memoryview(buf) + if sz == -1: + sz = len(buf) + buf = buf[off:off + sz] + self.write(buf) + await self.drain() + +Stream.aclose = Stream.wait_closed +Stream.awrite = stream_awrite +Stream.awritestr = stream_awrite # TODO explicitly convert to bytes? + +StreamReader = Stream +StreamWriter = Stream + +class Loop: + def create_task(self, coro): + return create_task(coro) + def run_forever(self): + run_until_complete() + # TODO should keep running until .stop() is called, even if there're no tasks left + def run_until_complete(self, aw): + return run_until_complete(_promote_to_task(aw)) + def close(self): + pass + +def get_event_loop(runq_len=0, waitq_len=0): + return Loop()