diff --git a/uasyncio.core/uasyncio/core.py b/uasyncio.core/uasyncio/core.py index 274883a7..77fdb7a2 100644 --- a/uasyncio.core/uasyncio/core.py +++ b/uasyncio.core/uasyncio/core.py @@ -1,5 +1,6 @@ import utime as time import utimeq +import ucollections type_gen = type((lambda: (yield))()) @@ -25,8 +26,9 @@ class TimeoutError(CancelledError): class EventLoop: - def __init__(self, len=42): - self.q = utimeq.utimeq(len) + def __init__(self, runq_len=16, waitq_len=16): + self.runq = ucollections.deque((), runq_len, True) + self.waitq = utimeq.utimeq(waitq_len) # Current task being run. Task is a top-level coroutine scheduled # in the event loop (sub-coroutines executed transparently by # yield from/await, event loop "doesn't see" them). @@ -41,18 +43,24 @@ class EventLoop: # CPython asyncio incompatibility: we don't return Task object def call_soon(self, callback, *args): - self.call_at_(self.time(), callback, args) + if __debug__ and DEBUG: + log.debug("Scheduling in runq: %s", (callback, args)) + self.runq.append(callback) + if not isinstance(callback, type_gen): + self.runq.append(args) def call_later(self, delay, callback, *args): self.call_at_(time.ticks_add(self.time(), int(delay * 1000)), callback, args) def call_later_ms(self, delay, callback, *args): + if not delay: + return self.call_soon(callback, *args) self.call_at_(time.ticks_add(self.time(), delay), callback, args) def call_at_(self, time, callback, args=()): if __debug__ and DEBUG: - log.debug("Scheduling %s", (time, callback, args)) - self.q.push(time, callback, args) + log.debug("Scheduling in waitq: %s", (time, callback, args)) + self.waitq.push(time, callback, args) def wait(self, delay): # Default wait implementation, to be overriden in subclasses @@ -64,45 +72,45 @@ class EventLoop: def run_forever(self): cur_task = [0, 0, 0] while True: - if self.q: - # wait() may finish prematurely due to I/O completion, - # and schedule new, earlier than before tasks to run. - while 1: - t = self.q.peektime() - tnow = self.time() - delay = time.ticks_diff(t, tnow) - if delay < 0: - delay = 0 - # Always call wait(), to give a chance to I/O scheduling - self.wait(delay) - if delay == 0: - break - - self.q.pop(cur_task) - t = cur_task[0] - cb = cur_task[1] - args = cur_task[2] + # Expire entries in waitq and move them to runq + tnow = self.time() + while self.waitq: + t = self.waitq.peektime() + delay = time.ticks_diff(t, tnow) + if delay > 0: + break + self.waitq.pop(cur_task) if __debug__ and DEBUG: - log.debug("Next coroutine to run: %s", (t, cb, args)) + log.debug("Moving from waitq to runq: %s", cur_task[1]) + self.call_soon(cur_task[1], *cur_task[2]) + + # Process runq + l = len(self.runq) + if __debug__ and DEBUG: + log.debug("Entries in runq: %d", l) + while l: + cb = self.runq.popleft() + l -= 1 + args = () + if not isinstance(cb, type_gen): + args = self.runq.popleft() + l -= 1 + if __debug__ and DEBUG: + log.info("Next callback to run: %s", (cb, args)) + cb(*args) + continue + + if __debug__ and DEBUG: + log.info("Next coroutine to run: %s", (cb, args)) self.cur_task = cb -# __main__.mem_info() - else: - self.wait(-1) - # Assuming IO completion scheduled some tasks - continue - if callable(cb): - cb(*args) - else: delay = 0 try: - if __debug__ and DEBUG: - log.debug("Coroutine %s send args: %s", cb, args) - if args == (): + if args is (): ret = next(cb) else: ret = cb.send(*args) if __debug__ and DEBUG: - log.debug("Coroutine %s yield result: %s", cb, ret) + log.info("Coroutine %s yield result: %s", cb, ret) if isinstance(ret, SysCall1): arg = ret.arg if isinstance(ret, SleepMs): @@ -147,7 +155,22 @@ class EventLoop: # Currently all syscalls don't return anything, so we don't # need to feed anything to the next invocation of coroutine. # If that changes, need to pass that value below. - self.call_later_ms(delay, cb) + if delay: + self.call_later_ms(delay, cb) + else: + self.call_soon(cb) + + # Wait until next waitq task or I/O availability + delay = 0 + if not self.runq: + delay = -1 + if self.waitq: + tnow = self.time() + t = self.waitq.peektime() + delay = time.ticks_diff(t, tnow) + if delay < 0: + delay = 0 + self.wait(delay) def run_until_complete(self, coro): def _run_and_stop(): @@ -195,10 +218,10 @@ class IOWriteDone(SysCall1): _event_loop = None _event_loop_class = EventLoop -def get_event_loop(len=42): +def get_event_loop(runq_len=16, waitq_len=16): global _event_loop if _event_loop is None: - _event_loop = _event_loop_class(len) + _event_loop = _event_loop_class(runq_len, waitq_len) return _event_loop def sleep(secs):