asyncio_slow: Start new upstream API-compatible asyncio implementation.

The trait of this implementation is that it doesn't use priority queue and
time scheduling, and instead does its all operations using polling, starting
with such basic one as sleep. On the other hand, this tries to implement
all (well, much) of upstream asyncio API and warts.

asyncio_slow: Rename from asyncio_micro.

It may turn out that this won't be "micro" at all. The main trait of this
implementation is that it stay 100% API compatible with upstream (in
those APIs which are implemented of course). It will also keep inefficient
implementation of event loop scheduling, to discourage its use. Here we go.
pull/11/head
Paul Sokolovsky 2014-04-26 07:59:18 +03:00
rodzic f0ce7978ce
commit c78c27c1dd
1 zmienionych plików z 144 dodań i 0 usunięć

Wyświetl plik

@ -0,0 +1,144 @@
import time
import logging
log = logging.getLogger("asyncio")
# Workaround for not being able to subclass builtin types
DoneException = AssertionError
class InvalidStateError:
pass
# Object not matching any other object
_sentinel = []
class EventLoop:
def __init__(self):
self.q = []
def call_soon(self, c, *args):
self.q.append(c)
def run_forever(self):
while self.q:
c = self.q.pop(0)
c()
# I mean, forever
while True:
time.sleep(1)
def run_until_complete(self, coro):
def _cb(val):
raise DoneException
t = async(coro)
t.add_done_callback(_cb)
self.call_soon(t)
try:
self.run_forever()
except DoneException:
pass
def close(self):
pass
_def_event_loop = EventLoop()
class Future:
def __init__(self, loop=_def_event_loop):
self.loop = loop
self.res = _sentinel
self.cbs = []
def result(self):
if self.res is _sentinel:
raise InvalidStateError
return self.res
def add_done_callback(self, fn):
if self.res is _sentinel:
self.cbs.append(fn)
else:
self.loop.call_soon(fn, self)
def set_result(self, val):
self.res = val
for f in self.cbs:
f(self)
class Task(Future):
def __init__(self, coro, loop=_def_event_loop):
super().__init__()
self.loop = loop
self.c = coro
# upstream asyncio forces task to be scheduled on instantiation
self.loop.call_soon(self)
def __call__(self):
try:
next(self.c)
self.loop.call_soon(self)
except StopIteration as e:
log.debug("Coro finished: %s", self.c)
self.set_result(None)
def get_event_loop():
return _def_event_loop
# Decorator
def coroutine(f):
return f
def async(coro):
if isinstance(coro, Future):
return coro
return Task(coro)
class Wait(Future):
def __init__(self, n):
Future.__init__(self)
self.n = n
def _done(self):
self.n -= 1
log.debug("Wait: remaining tasks: %d", self.n)
if not self.n:
self.set_result(None)
def __call__(self):
pass
def wait(coro_list, loop=_def_event_loop):
w = Wait(len(coro_list))
for c in coro_list:
t = async(c)
t.add_done_callback(lambda val: w._done())
loop.call_soon(t)
return w
def sleep(secs):
t = time.time()
log.debug("Started sleep at: %s, targetting: %s", t, t + secs)
while time.time() < t + secs:
time.sleep(0.01)
yield
log.debug("Finished sleeping %ss", secs)