From b2130c84ce2f5f08aa94657343b01f5ab54dca7b Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Fri, 24 Oct 2014 00:24:13 +0300 Subject: [PATCH] uasyncio: Split into "core" and "extended" modules. --- uasyncio/uasyncio/__init__.py | 133 +------------------------------ uasyncio/uasyncio/core.py | 143 ++++++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+), 132 deletions(-) create mode 100644 uasyncio/uasyncio/core.py diff --git a/uasyncio/uasyncio/__init__.py b/uasyncio/uasyncio/__init__.py index f34123e6..ce08094d 100644 --- a/uasyncio/uasyncio/__init__.py +++ b/uasyncio/uasyncio/__init__.py @@ -1,106 +1,12 @@ -import __main__ import time import heapq import errno import logging +from uasyncio.core import * log = logging.getLogger("asyncio") -type_gen = type((lambda: (yield))()) - -class EventLoop: - - def __init__(self): - self.q = [] - self.cnt = 0 - - def time(self): - return time.time() - - def call_soon(self, callback, *args): - self.call_at(0, callback, *args) - - def call_later(self, delay, callback, *args): - self.call_at(self.time() + delay, callback, *args) - - def call_at(self, time, callback, *args): - # Including self.cnt is a workaround per heapq docs - log.debug("Scheduling %s", (time, self.cnt, callback, args)) - heapq.heappush(self.q, (time, self.cnt, callback, args)) -# print(self.q) - self.cnt += 1 - - def wait(self, delay): - # Default wait implementation, to be overriden in subclasses - # with IO scheduling - log.debug("Sleeping for: %s", delay) - time.sleep(delay) - - def run_forever(self): - while True: - if self.q: - t, cnt, cb, args = heapq.heappop(self.q) - log.debug("Next coroutine to run: %s", (t, cnt, cb, args)) -# __main__.mem_info() - tnow = self.time() - delay = t - tnow - if delay > 0: - self.wait(delay) - else: - self.wait(-1) - # Assuming IO completion scheduled some tasks - continue - if callable(cb): - cb(*args) - else: - delay = 0 - try: - if args == (): - args = (None,) - log.debug("Coroutine %s send args: %s", cb, args) - ret = cb.send(*args) - log.debug("Coroutine %s yield result: %s", cb, ret) - if isinstance(ret, SysCall): - arg = ret.args[0] - if isinstance(ret, Sleep): - delay = arg - elif isinstance(ret, IORead): -# self.add_reader(ret.obj.fileno(), lambda self, c, f: self.call_soon(c, f), self, cb, ret.obj) -# self.add_reader(ret.obj.fileno(), lambda c, f: self.call_soon(c, f), cb, ret.obj) - self.add_reader(arg.fileno(), lambda cb, f: self.call_soon(cb, f), cb, arg) - continue - elif isinstance(ret, IOWrite): - self.add_writer(arg.fileno(), lambda cb, f: self.call_soon(cb, f), cb, arg) - continue - elif isinstance(ret, IOReadDone): - self.remove_reader(arg.fileno()) - elif isinstance(ret, IOWriteDone): - self.remove_writer(arg.fileno()) - elif isinstance(ret, StopLoop): - return arg - elif isinstance(ret, type_gen): - self.call_soon(ret) - elif ret is None: - # Just reschedule - pass - else: - assert False, "Unsupported coroutine yield value: %r (of type %r)" % (ret, type(ret)) - except StopIteration as e: - log.debug("Coroutine finished: %s", cb) - continue - self.call_later(delay, cb, *args) - - def run_until_complete(self, coro): - def _run_and_stop(): - yield from coro - yield StopLoop(0) - self.call_soon(_run_and_stop()) - self.run_forever() - - def close(self): - pass - import select @@ -138,46 +44,9 @@ class EpollEventLoop(EventLoop): cb[0](*cb[1]) -class SysCall: - - def __init__(self, *args): - self.args = args - - def handle(self): - raise NotImplementedError - -class Sleep(SysCall): - pass - -class StopLoop(SysCall): - pass - -class IORead(SysCall): - pass - -class IOWrite(SysCall): - pass - -class IOReadDone(SysCall): - pass - -class IOWriteDone(SysCall): - pass - - def get_event_loop(): return EpollEventLoop() -def coroutine(f): - return f - -def async(coro): - # We don't have Task bloat, so op is null - return coro - -def sleep(secs): - yield Sleep(secs) - import usocket as _socket diff --git a/uasyncio/uasyncio/core.py b/uasyncio/uasyncio/core.py new file mode 100644 index 00000000..a2bbfadf --- /dev/null +++ b/uasyncio/uasyncio/core.py @@ -0,0 +1,143 @@ +import __main__ +import time +import heapq +import errno +import logging + + +log = logging.getLogger("asyncio") + +type_gen = type((lambda: (yield))()) + +class EventLoop: + + def __init__(self): + self.q = [] + self.cnt = 0 + + def time(self): + return time.time() + + def call_soon(self, callback, *args): + self.call_at(0, callback, *args) + + def call_later(self, delay, callback, *args): + self.call_at(self.time() + delay, callback, *args) + + def call_at(self, time, callback, *args): + # Including self.cnt is a workaround per heapq docs + log.debug("Scheduling %s", (time, self.cnt, callback, args)) + heapq.heappush(self.q, (time, self.cnt, callback, args)) +# print(self.q) + self.cnt += 1 + + def wait(self, delay): + # Default wait implementation, to be overriden in subclasses + # with IO scheduling + log.debug("Sleeping for: %s", delay) + time.sleep(delay) + + def run_forever(self): + while True: + if self.q: + t, cnt, cb, args = heapq.heappop(self.q) + log.debug("Next coroutine to run: %s", (t, cnt, cb, args)) +# __main__.mem_info() + tnow = self.time() + delay = t - tnow + if delay > 0: + self.wait(delay) + else: + self.wait(-1) + # Assuming IO completion scheduled some tasks + continue + if callable(cb): + cb(*args) + else: + delay = 0 + try: + if args == (): + args = (None,) + log.debug("Coroutine %s send args: %s", cb, args) + ret = cb.send(*args) + log.debug("Coroutine %s yield result: %s", cb, ret) + if isinstance(ret, SysCall): + arg = ret.args[0] + if isinstance(ret, Sleep): + delay = arg + elif isinstance(ret, IORead): +# self.add_reader(ret.obj.fileno(), lambda self, c, f: self.call_soon(c, f), self, cb, ret.obj) +# self.add_reader(ret.obj.fileno(), lambda c, f: self.call_soon(c, f), cb, ret.obj) + self.add_reader(arg.fileno(), lambda cb, f: self.call_soon(cb, f), cb, arg) + continue + elif isinstance(ret, IOWrite): + self.add_writer(arg.fileno(), lambda cb, f: self.call_soon(cb, f), cb, arg) + continue + elif isinstance(ret, IOReadDone): + self.remove_reader(arg.fileno()) + elif isinstance(ret, IOWriteDone): + self.remove_writer(arg.fileno()) + elif isinstance(ret, StopLoop): + return arg + elif isinstance(ret, type_gen): + self.call_soon(ret) + elif ret is None: + # Just reschedule + pass + else: + assert False, "Unsupported coroutine yield value: %r (of type %r)" % (ret, type(ret)) + except StopIteration as e: + log.debug("Coroutine finished: %s", cb) + continue + self.call_later(delay, cb, *args) + + def run_until_complete(self, coro): + def _run_and_stop(): + yield from coro + yield StopLoop(0) + self.call_soon(_run_and_stop()) + self.run_forever() + + def close(self): + pass + + +class SysCall: + + def __init__(self, *args): + self.args = args + + def handle(self): + raise NotImplementedError + +class Sleep(SysCall): + pass + +class StopLoop(SysCall): + pass + +class IORead(SysCall): + pass + +class IOWrite(SysCall): + pass + +class IOReadDone(SysCall): + pass + +class IOWriteDone(SysCall): + pass + + +def get_event_loop(): + return EventLoop() + +def coroutine(f): + return f + +def async(coro): + # We don't have Task bloat, so op is null + return coro + +def sleep(secs): + yield Sleep(secs)