diff --git a/asyncio_slow/asyncio_slow.py b/asyncio_slow/asyncio_slow.py new file mode 100644 index 00000000..ef21708f --- /dev/null +++ b/asyncio_slow/asyncio_slow.py @@ -0,0 +1,144 @@ +import time +import logging + + +log = logging.getLogger("asyncio") + + +# Workaround for not being able to subclass builtin types +DoneException = AssertionError + +class InvalidStateError: + pass + +# Object not matching any other object +_sentinel = [] + + +class EventLoop: + + def __init__(self): + self.q = [] + + def call_soon(self, c, *args): + self.q.append(c) + + def run_forever(self): + while self.q: + c = self.q.pop(0) + c() + # I mean, forever + while True: + time.sleep(1) + + def run_until_complete(self, coro): + def _cb(val): + raise DoneException + + t = async(coro) + t.add_done_callback(_cb) + self.call_soon(t) + try: + self.run_forever() + except DoneException: + pass + + def close(self): + pass + + +_def_event_loop = EventLoop() + + +class Future: + + def __init__(self, loop=_def_event_loop): + self.loop = loop + self.res = _sentinel + self.cbs = [] + + def result(self): + if self.res is _sentinel: + raise InvalidStateError + return self.res + + def add_done_callback(self, fn): + if self.res is _sentinel: + self.cbs.append(fn) + else: + self.loop.call_soon(fn, self) + + def set_result(self, val): + self.res = val + for f in self.cbs: + f(self) + + +class Task(Future): + + def __init__(self, coro, loop=_def_event_loop): + super().__init__() + self.loop = loop + self.c = coro + # upstream asyncio forces task to be scheduled on instantiation + self.loop.call_soon(self) + + def __call__(self): + try: + next(self.c) + self.loop.call_soon(self) + except StopIteration as e: + log.debug("Coro finished: %s", self.c) + self.set_result(None) + + +def get_event_loop(): + return _def_event_loop + + +# Decorator +def coroutine(f): + return f + + +def async(coro): + if isinstance(coro, Future): + return coro + return Task(coro) + + +class Wait(Future): + + def __init__(self, n): + Future.__init__(self) + self.n = n + + def _done(self): + self.n -= 1 + log.debug("Wait: remaining tasks: %d", self.n) + if not self.n: + self.set_result(None) + + def __call__(self): + pass + + +def wait(coro_list, loop=_def_event_loop): + + w = Wait(len(coro_list)) + + for c in coro_list: + t = async(c) + t.add_done_callback(lambda val: w._done()) + loop.call_soon(t) + + return w + + +def sleep(secs): + t = time.time() + log.debug("Started sleep at: %s, targetting: %s", t, t + secs) + while time.time() < t + secs: + time.sleep(0.01) + yield + log.debug("Finished sleeping %ss", secs)