uasyncio.core: Switch to separate run and wait queues.

Instead of using single priority queue for all tasks, split into using
"run queue", which represents tasks not waiting until specific time,
which should be run on every (well, next) loop iteration, and wait queue,
still a priority queue. Run queue is a simple FIFO, implemented by
ucollections.deque, recently introduced in pfalcon/micropython. Thus,
there's minimal storage overhead and intrinsic scheduling fairness.
Generally, run queue should hold both a callback/coro and its arguments,
but as we don't feed any send args into coros still, it's optimized to
hold just 1 items for coros, while 2 for callbacks.

Introducing run queue will also allow to get rid of tie-breaking counter
in utimeq implementation, which was introduced to enforce fair scheduling.
It's no longer needed, as all tasks which should be run at given time
are batch-removed from wait queue and batch-inserted into run queue. So,
they may be executed not in the order scheduled (due to non-stable order
of heap), but the whole batch will be executed "atomically", and any new
schedulings from will be processed no earlier than next loop iteration.
pull/261/merge
Paul Sokolovsky 2018-02-07 00:06:10 +02:00 zatwierdzone przez Damien George
rodzic ab3198edd7
commit 4c63ecf5a6
1 zmienionych plików z 62 dodań i 39 usunięć

Wyświetl plik

@ -1,5 +1,6 @@
import utime as time
import utimeq
import ucollections
type_gen = type((lambda: (yield))())
@ -25,8 +26,9 @@ class TimeoutError(CancelledError):
class EventLoop:
def __init__(self, len=42):
self.q = utimeq.utimeq(len)
def __init__(self, runq_len=16, waitq_len=16):
self.runq = ucollections.deque((), runq_len, True)
self.waitq = utimeq.utimeq(waitq_len)
# Current task being run. Task is a top-level coroutine scheduled
# in the event loop (sub-coroutines executed transparently by
# yield from/await, event loop "doesn't see" them).
@ -41,18 +43,24 @@ class EventLoop:
# CPython asyncio incompatibility: we don't return Task object
def call_soon(self, callback, *args):
self.call_at_(self.time(), callback, args)
if __debug__ and DEBUG:
log.debug("Scheduling in runq: %s", (callback, args))
self.runq.append(callback)
if not isinstance(callback, type_gen):
self.runq.append(args)
def call_later(self, delay, callback, *args):
self.call_at_(time.ticks_add(self.time(), int(delay * 1000)), callback, args)
def call_later_ms(self, delay, callback, *args):
if not delay:
return self.call_soon(callback, *args)
self.call_at_(time.ticks_add(self.time(), delay), callback, args)
def call_at_(self, time, callback, args=()):
if __debug__ and DEBUG:
log.debug("Scheduling %s", (time, callback, args))
self.q.push(time, callback, args)
log.debug("Scheduling in waitq: %s", (time, callback, args))
self.waitq.push(time, callback, args)
def wait(self, delay):
# Default wait implementation, to be overriden in subclasses
@ -64,45 +72,45 @@ class EventLoop:
def run_forever(self):
cur_task = [0, 0, 0]
while True:
if self.q:
# wait() may finish prematurely due to I/O completion,
# and schedule new, earlier than before tasks to run.
while 1:
t = self.q.peektime()
tnow = self.time()
delay = time.ticks_diff(t, tnow)
if delay < 0:
delay = 0
# Always call wait(), to give a chance to I/O scheduling
self.wait(delay)
if delay == 0:
break
self.q.pop(cur_task)
t = cur_task[0]
cb = cur_task[1]
args = cur_task[2]
# Expire entries in waitq and move them to runq
tnow = self.time()
while self.waitq:
t = self.waitq.peektime()
delay = time.ticks_diff(t, tnow)
if delay > 0:
break
self.waitq.pop(cur_task)
if __debug__ and DEBUG:
log.debug("Next coroutine to run: %s", (t, cb, args))
log.debug("Moving from waitq to runq: %s", cur_task[1])
self.call_soon(cur_task[1], *cur_task[2])
# Process runq
l = len(self.runq)
if __debug__ and DEBUG:
log.debug("Entries in runq: %d", l)
while l:
cb = self.runq.popleft()
l -= 1
args = ()
if not isinstance(cb, type_gen):
args = self.runq.popleft()
l -= 1
if __debug__ and DEBUG:
log.info("Next callback to run: %s", (cb, args))
cb(*args)
continue
if __debug__ and DEBUG:
log.info("Next coroutine to run: %s", (cb, args))
self.cur_task = cb
# __main__.mem_info()
else:
self.wait(-1)
# Assuming IO completion scheduled some tasks
continue
if callable(cb):
cb(*args)
else:
delay = 0
try:
if __debug__ and DEBUG:
log.debug("Coroutine %s send args: %s", cb, args)
if args == ():
if args is ():
ret = next(cb)
else:
ret = cb.send(*args)
if __debug__ and DEBUG:
log.debug("Coroutine %s yield result: %s", cb, ret)
log.info("Coroutine %s yield result: %s", cb, ret)
if isinstance(ret, SysCall1):
arg = ret.arg
if isinstance(ret, SleepMs):
@ -147,7 +155,22 @@ class EventLoop:
# Currently all syscalls don't return anything, so we don't
# need to feed anything to the next invocation of coroutine.
# If that changes, need to pass that value below.
self.call_later_ms(delay, cb)
if delay:
self.call_later_ms(delay, cb)
else:
self.call_soon(cb)
# Wait until next waitq task or I/O availability
delay = 0
if not self.runq:
delay = -1
if self.waitq:
tnow = self.time()
t = self.waitq.peektime()
delay = time.ticks_diff(t, tnow)
if delay < 0:
delay = 0
self.wait(delay)
def run_until_complete(self, coro):
def _run_and_stop():
@ -195,10 +218,10 @@ class IOWriteDone(SysCall1):
_event_loop = None
_event_loop_class = EventLoop
def get_event_loop(len=42):
def get_event_loop(runq_len=16, waitq_len=16):
global _event_loop
if _event_loop is None:
_event_loop = _event_loop_class(len)
_event_loop = _event_loop_class(runq_len, waitq_len)
return _event_loop
def sleep(secs):