micropython-lib/uasyncio/uasyncio.py

283 wiersze
8.2 KiB
Python

import __main__
import time
import heapq
import errno
import logging
log = logging.getLogger("asyncio")
type_gen = type((lambda: (yield))())
class EventLoop:
def __init__(self):
self.q = []
self.cnt = 0
def time(self):
return time.time()
def call_soon(self, callback, *args):
self.call_at(0, callback, *args)
def call_later(self, delay, callback, *args):
self.call_at(self.time() + delay, callback, *args)
def call_at(self, time, callback, *args):
# Including self.cnt is a workaround per heapq docs
log.debug("Scheduling %s", (time, self.cnt, callback, args))
heapq.heappush(self.q, (time, self.cnt, callback, args))
# print(self.q)
self.cnt += 1
def wait(self, delay):
# Default wait implementation, to be overriden in subclasses
# with IO scheduling
log.debug("Sleeping for: %s", delay)
time.sleep(delay)
def run_forever(self):
while True:
if self.q:
t, cnt, cb, args = heapq.heappop(self.q)
log.debug("Next coroutine to run: %s", (t, cnt, cb, args))
# __main__.mem_info()
tnow = self.time()
delay = t - tnow
if delay > 0:
self.wait(delay)
else:
self.wait(-1)
# Assuming IO completion scheduled some tasks
continue
if callable(cb):
cb(*args)
else:
delay = 0
try:
if args == ():
args = (None,)
log.debug("Coroutine %s send args: %s", cb, args)
ret = cb.send(*args)
log.debug("Coroutine %s yield result: %s", cb, ret)
if isinstance(ret, SysCall):
arg = ret.args[0]
if isinstance(ret, Sleep):
delay = arg
elif isinstance(ret, IORead):
# self.add_reader(ret.obj.fileno(), lambda self, c, f: self.call_soon(c, f), self, cb, ret.obj)
# self.add_reader(ret.obj.fileno(), lambda c, f: self.call_soon(c, f), cb, ret.obj)
self.add_reader(arg.fileno(), lambda cb, f: self.call_soon(cb, f), cb, arg)
continue
elif isinstance(ret, IOWrite):
self.add_writer(arg.fileno(), lambda cb, f: self.call_soon(cb, f), cb, arg)
continue
elif isinstance(ret, IOReadDone):
self.remove_reader(arg.fileno())
elif isinstance(ret, IOWriteDone):
self.remove_writer(arg.fileno())
elif isinstance(ret, StopLoop):
return arg
elif isinstance(ret, type_gen):
self.call_soon(ret)
elif ret is None:
# Just reschedule
pass
else:
assert False, "Unsupported coroutine yield value: %r (of type %r)" % (ret, type(ret))
except StopIteration as e:
log.debug("Coroutine finished: %s", cb)
continue
self.call_later(delay, cb, *args)
def run_until_complete(self, coro):
def _run_and_stop():
yield from coro
yield StopLoop(0)
self.call_soon(_run_and_stop())
self.run_forever()
def close(self):
pass
import select
class EpollEventLoop(EventLoop):
def __init__(self):
EventLoop.__init__(self)
self.poller = select.epoll(1)
def add_reader(self, fd, cb, *args):
log.debug("add_reader%s", (fd, cb, args))
self.poller.register(fd, select.EPOLLIN, (cb, args))
def remove_reader(self, fd):
log.debug("remove_reader(%s)", fd)
self.poller.unregister(fd)
def add_writer(self, fd, cb, *args):
log.debug("add_writer%s", (fd, cb, args))
self.poller.register(fd, select.EPOLLOUT, (cb, args))
def remove_writer(self, fd):
log.debug("remove_writer(%s)", fd)
self.poller.unregister(fd)
def wait(self, delay):
log.debug("epoll.wait(%d)", delay)
if delay == -1:
res = self.poller.poll(-1)
else:
res = self.poller.poll(int(delay * 1000))
log.debug("epoll result: %s", res)
for cb, ev in res:
log.debug("Calling IO callback: %s%s", cb[0], cb[1])
cb[0](*cb[1])
class SysCall:
def __init__(self, *args):
self.args = args
def handle(self):
raise NotImplementedError
class Sleep(SysCall):
pass
class StopLoop(SysCall):
pass
class IORead(SysCall):
pass
class IOWrite(SysCall):
pass
class IOReadDone(SysCall):
pass
class IOWriteDone(SysCall):
pass
def get_event_loop():
return EpollEventLoop()
def coroutine(f):
return f
def async(coro):
# We don't have Task bloat, so op is null
return coro
def sleep(secs):
yield Sleep(secs)
import usocket as _socket
class StreamReader:
def __init__(self, s):
self.s = s
def read(self, n=-1):
s = yield IORead(self.s)
while True:
res = self.s.read(n)
if res is not None:
break
log.warn("Empty read")
if not res:
yield IOReadDone(self.s)
return res
def readline(self):
log.debug("StreamReader.readline()")
s = yield IORead(self.s)
log.debug("StreamReader.readline(): after IORead: %s", s)
while True:
res = self.s.readline()
if res is not None:
break
log.warn("Empty read")
if not res:
yield IOReadDone(self.s)
log.debug("StreamReader.readline(): res: %s", res)
return res
class StreamWriter:
def __init__(self, s):
self.s = s
def awrite(self, buf):
# This method is called awrite (async write) to not proliferate
# incompatibility with original asyncio. Unlike original asyncio
# whose .write() method is both not a coroutine and guaranteed
# to return immediately (which means it has to buffer all the
# data), this method is a coroutine.
sz = len(buf)
log.debug("StreamWriter.awrite(): spooling %d bytes", sz)
while True:
res = self.s.write(buf)
# If we spooled everything, return immediately
if res == sz:
log.debug("StreamWriter.awrite(): completed spooling %d bytes", res)
return
if res is None:
res = 0
log.debug("StreamWriter.awrite(): spooled partial %d bytes", res)
assert res < sz
buf = buf[res:]
sz -= res
s = yield IOWrite(self.s)
log.debug("StreamWriter.awrite(): can write more")
def close(self):
yield IOWriteDone(self.s)
self.s.close()
def open_connection(host, port):
log.debug("open_connection(%s, %s)", host, port)
s = _socket.socket()
s.setblocking(False)
ai = _socket.getaddrinfo(host, port)
addr = ai[0][4]
try:
s.connect(addr)
except OSError as e:
if e.args[0] != errno.EINPROGRESS:
raise
log.debug("open_connection: After connect")
s = yield IOWrite(s)
log.debug("open_connection: After iowait: %s", s)
return StreamReader(s), StreamWriter(s)
def start_server(client_coro, host, port):
log.debug("start_server(%s, %s)", host, port)
s = _socket.socket()
s.setblocking(False)
ai = _socket.getaddrinfo(host, port)
addr = ai[0][4]
s.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(10)
while True:
log.debug("start_server: Before accept")
yield IORead(s)
log.debug("start_server: After iowait")
s2, client_addr = s.accept()
s2.setblocking(False)
log.debug("start_server: After accept: %s", s2)
yield client_coro(StreamReader(s2), StreamWriter(s2))