kopia lustrzana https://github.com/micropython/micropython-lib
micropython/uasyncio: Remove uasyncio-v2.
Superceded by uasyncio-v3 in the main repo. Signed-off-by: Jim Mussared <jim.mussared@gmail.com>pull/376/head
rodzic
fa13cbbc8b
commit
bc2b6b0b7f
|
@ -1,17 +0,0 @@
|
||||||
import uasyncio.core as asyncio
|
|
||||||
import time
|
|
||||||
import logging
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
|
||||||
# asyncio.set_debug(True)
|
|
||||||
|
|
||||||
|
|
||||||
def cb():
|
|
||||||
print("callback")
|
|
||||||
time.sleep(0.5)
|
|
||||||
loop.call_soon(cb)
|
|
||||||
|
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
loop.call_soon(cb)
|
|
||||||
loop.run_forever()
|
|
|
@ -1,6 +0,0 @@
|
||||||
srctype = micropython-lib
|
|
||||||
type = package
|
|
||||||
version = 2.0
|
|
||||||
author = Paul Sokolovsky
|
|
||||||
desc = Lightweight asyncio-like library for MicroPython, built around native Python coroutines. (Core event loop).
|
|
||||||
long_desc = Lightweight asyncio-like library for MicroPython, built around native Python coroutines. (Core event loop).
|
|
|
@ -1,24 +0,0 @@
|
||||||
import sys
|
|
||||||
|
|
||||||
# Remove current dir from sys.path, otherwise setuptools will peek up our
|
|
||||||
# module instead of system's.
|
|
||||||
sys.path.pop(0)
|
|
||||||
from setuptools import setup
|
|
||||||
|
|
||||||
sys.path.append("..")
|
|
||||||
import sdist_upip
|
|
||||||
|
|
||||||
setup(
|
|
||||||
name="micropython-uasyncio.core",
|
|
||||||
version="2.0",
|
|
||||||
description="Lightweight asyncio-like library for MicroPython, built around native Python coroutines. (Core event loop).",
|
|
||||||
long_description="Lightweight asyncio-like library for MicroPython, built around native Python coroutines. (Core event loop).",
|
|
||||||
url="https://github.com/micropython/micropython-lib",
|
|
||||||
author="Paul Sokolovsky",
|
|
||||||
author_email="micro-python@googlegroups.com",
|
|
||||||
maintainer="micropython-lib Developers",
|
|
||||||
maintainer_email="micro-python@googlegroups.com",
|
|
||||||
license="MIT",
|
|
||||||
cmdclass={"sdist": sdist_upip.sdist},
|
|
||||||
packages=["uasyncio"],
|
|
||||||
)
|
|
|
@ -1,80 +0,0 @@
|
||||||
import time
|
|
||||||
|
|
||||||
try:
|
|
||||||
import uasyncio.core as asyncio
|
|
||||||
|
|
||||||
is_uasyncio = True
|
|
||||||
except ImportError:
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
is_uasyncio = False
|
|
||||||
import logging
|
|
||||||
|
|
||||||
# logging.basicConfig(level=logging.DEBUG)
|
|
||||||
# asyncio.set_debug(True)
|
|
||||||
|
|
||||||
|
|
||||||
output = []
|
|
||||||
cancelled = False
|
|
||||||
|
|
||||||
|
|
||||||
def print1(msg):
|
|
||||||
print(msg)
|
|
||||||
output.append(msg)
|
|
||||||
|
|
||||||
|
|
||||||
def looper1(iters):
|
|
||||||
global cancelled
|
|
||||||
try:
|
|
||||||
for i in range(iters):
|
|
||||||
print1("ping1")
|
|
||||||
# sleep() isn't properly cancellable
|
|
||||||
# yield from asyncio.sleep(1.0)
|
|
||||||
t = time.time()
|
|
||||||
while time.time() - t < 1:
|
|
||||||
yield from asyncio.sleep(0)
|
|
||||||
return 10
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
print1("cancelled")
|
|
||||||
cancelled = True
|
|
||||||
|
|
||||||
|
|
||||||
def looper2(iters):
|
|
||||||
for i in range(iters):
|
|
||||||
print1("ping2")
|
|
||||||
# sleep() isn't properly cancellable
|
|
||||||
# yield from asyncio.sleep(1.0)
|
|
||||||
t = time.time()
|
|
||||||
while time.time() - t < 1:
|
|
||||||
yield from asyncio.sleep(0)
|
|
||||||
return 10
|
|
||||||
|
|
||||||
|
|
||||||
def run_to():
|
|
||||||
coro = looper1(10)
|
|
||||||
task = loop.create_task(coro)
|
|
||||||
yield from asyncio.sleep(3)
|
|
||||||
if is_uasyncio:
|
|
||||||
asyncio.cancel(coro)
|
|
||||||
else:
|
|
||||||
task.cancel()
|
|
||||||
# Need another eventloop iteration for cancellation to be actually
|
|
||||||
# processed and to see side effects of the cancellation.
|
|
||||||
yield from asyncio.sleep(0)
|
|
||||||
assert cancelled
|
|
||||||
|
|
||||||
coro = looper2(10)
|
|
||||||
task = loop.create_task(coro)
|
|
||||||
yield from asyncio.sleep(2)
|
|
||||||
if is_uasyncio:
|
|
||||||
asyncio.cancel(coro)
|
|
||||||
else:
|
|
||||||
task.cancel()
|
|
||||||
yield from asyncio.sleep(0)
|
|
||||||
|
|
||||||
# Once saw 3 ping3's output on CPython 3.5.2
|
|
||||||
assert output == ["ping1", "ping1", "ping1", "cancelled", "ping2", "ping2"]
|
|
||||||
|
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
loop.run_until_complete(run_to())
|
|
|
@ -1,16 +0,0 @@
|
||||||
try:
|
|
||||||
import uasyncio.core as asyncio
|
|
||||||
except:
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
|
|
||||||
def cb(a, b):
|
|
||||||
assert a == "test"
|
|
||||||
assert b == "test2"
|
|
||||||
loop.stop()
|
|
||||||
|
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
loop.call_soon(cb, "test", "test2")
|
|
||||||
loop.run_forever()
|
|
||||||
print("OK")
|
|
|
@ -1,39 +0,0 @@
|
||||||
# Test that uasyncio scheduling is fair, i.e. gives all
|
|
||||||
# coroutines equal chance to run (this specifically checks
|
|
||||||
# round-robin scheduling).
|
|
||||||
import uasyncio.core as asyncio
|
|
||||||
|
|
||||||
|
|
||||||
COROS = 10
|
|
||||||
ITERS = 20
|
|
||||||
|
|
||||||
|
|
||||||
result = []
|
|
||||||
test_finished = False
|
|
||||||
|
|
||||||
|
|
||||||
async def coro(n):
|
|
||||||
for i in range(ITERS):
|
|
||||||
result.append(n)
|
|
||||||
yield
|
|
||||||
|
|
||||||
|
|
||||||
async def done():
|
|
||||||
global test_finished
|
|
||||||
while True:
|
|
||||||
if len(result) == COROS * ITERS:
|
|
||||||
# print(result)
|
|
||||||
assert result == list(range(COROS)) * ITERS
|
|
||||||
test_finished = True
|
|
||||||
return
|
|
||||||
yield
|
|
||||||
|
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
|
|
||||||
for n in range(COROS):
|
|
||||||
loop.create_task(coro(n))
|
|
||||||
|
|
||||||
loop.run_until_complete(done())
|
|
||||||
|
|
||||||
assert test_finished
|
|
|
@ -1,56 +0,0 @@
|
||||||
# Test that coros scheduled to run at some time don't run prematurely
|
|
||||||
# in case of I/O completion before that.
|
|
||||||
import uasyncio.core as uasyncio
|
|
||||||
import logging
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
|
||||||
# uasyncio.set_debug(True)
|
|
||||||
|
|
||||||
|
|
||||||
class MockEventLoop(uasyncio.EventLoop):
|
|
||||||
def __init__(self):
|
|
||||||
super().__init__()
|
|
||||||
self.t = 0
|
|
||||||
self.msgs = []
|
|
||||||
|
|
||||||
def time(self):
|
|
||||||
return self.t
|
|
||||||
|
|
||||||
def pass_time(self, delta):
|
|
||||||
self.t += delta
|
|
||||||
|
|
||||||
def wait(self, delay):
|
|
||||||
# print("%d: wait(%d)" % (self.t, delay))
|
|
||||||
self.pass_time(100)
|
|
||||||
|
|
||||||
if self.t == 100:
|
|
||||||
|
|
||||||
def cb_1st():
|
|
||||||
self.msgs.append("I should be run first, time: %s" % self.time())
|
|
||||||
|
|
||||||
self.call_soon(cb_1st)
|
|
||||||
|
|
||||||
if self.t == 1000:
|
|
||||||
raise StopIteration
|
|
||||||
|
|
||||||
|
|
||||||
loop = MockEventLoop()
|
|
||||||
|
|
||||||
|
|
||||||
def cb_2nd():
|
|
||||||
loop.msgs.append("I should be run second, time: %s" % loop.time())
|
|
||||||
|
|
||||||
|
|
||||||
loop.call_later_ms(500, cb_2nd)
|
|
||||||
|
|
||||||
try:
|
|
||||||
loop.run_forever()
|
|
||||||
except StopIteration:
|
|
||||||
pass
|
|
||||||
|
|
||||||
print(loop.msgs)
|
|
||||||
# .wait() is now called on each loop iteration, and for our mock case, it means that
|
|
||||||
# at the time of running, self.time() will be skewed by 100 virtual time units.
|
|
||||||
assert loop.msgs == ["I should be run first, time: 100", "I should be run second, time: 500"], str(
|
|
||||||
loop.msgs
|
|
||||||
)
|
|
|
@ -1,47 +0,0 @@
|
||||||
try:
|
|
||||||
import uasyncio.core as asyncio
|
|
||||||
except ImportError:
|
|
||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
|
|
||||||
# logging.basicConfig(level=logging.DEBUG)
|
|
||||||
# asyncio.set_debug(True)
|
|
||||||
|
|
||||||
|
|
||||||
def looper(iters):
|
|
||||||
for i in range(iters):
|
|
||||||
print("ping")
|
|
||||||
yield from asyncio.sleep(1.0)
|
|
||||||
return 10
|
|
||||||
|
|
||||||
|
|
||||||
def run_to():
|
|
||||||
try:
|
|
||||||
ret = yield from asyncio.wait_for(looper(2), 1)
|
|
||||||
print("result:", ret)
|
|
||||||
assert False
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
print("Coro timed out")
|
|
||||||
|
|
||||||
print("=================")
|
|
||||||
|
|
||||||
try:
|
|
||||||
ret = yield from asyncio.wait_for(looper(2), 2)
|
|
||||||
print("result:", ret)
|
|
||||||
assert False
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
print("Coro timed out")
|
|
||||||
|
|
||||||
print("=================")
|
|
||||||
|
|
||||||
try:
|
|
||||||
ret = yield from asyncio.wait_for(looper(2), 3)
|
|
||||||
print("result:", ret)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
print("Coro timed out")
|
|
||||||
assert False
|
|
||||||
|
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
loop.run_until_complete(run_to())
|
|
||||||
loop.run_until_complete(asyncio.sleep(1))
|
|
|
@ -1,332 +0,0 @@
|
||||||
import utime as time
|
|
||||||
import utimeq
|
|
||||||
import ucollections
|
|
||||||
|
|
||||||
|
|
||||||
type_gen = type((lambda: (yield))())
|
|
||||||
|
|
||||||
DEBUG = 0
|
|
||||||
log = None
|
|
||||||
|
|
||||||
|
|
||||||
def set_debug(val):
|
|
||||||
global DEBUG, log
|
|
||||||
DEBUG = val
|
|
||||||
if val:
|
|
||||||
import logging
|
|
||||||
|
|
||||||
log = logging.getLogger("uasyncio.core")
|
|
||||||
|
|
||||||
|
|
||||||
class CancelledError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class TimeoutError(CancelledError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class EventLoop:
|
|
||||||
def __init__(self, runq_len=16, waitq_len=16):
|
|
||||||
self.runq = ucollections.deque((), runq_len, True)
|
|
||||||
self.waitq = utimeq.utimeq(waitq_len)
|
|
||||||
# Current task being run. Task is a top-level coroutine scheduled
|
|
||||||
# in the event loop (sub-coroutines executed transparently by
|
|
||||||
# yield from/await, event loop "doesn't see" them).
|
|
||||||
self.cur_task = None
|
|
||||||
|
|
||||||
def time(self):
|
|
||||||
return time.ticks_ms()
|
|
||||||
|
|
||||||
def create_task(self, coro):
|
|
||||||
# CPython 3.4.2
|
|
||||||
self.call_later_ms(0, coro)
|
|
||||||
# CPython asyncio incompatibility: we don't return Task object
|
|
||||||
|
|
||||||
def call_soon(self, callback, *args):
|
|
||||||
if __debug__ and DEBUG:
|
|
||||||
log.debug("Scheduling in runq: %s", (callback, args))
|
|
||||||
self.runq.append(callback)
|
|
||||||
if not isinstance(callback, type_gen):
|
|
||||||
self.runq.append(args)
|
|
||||||
|
|
||||||
def call_later(self, delay, callback, *args):
|
|
||||||
self.call_at_(time.ticks_add(self.time(), int(delay * 1000)), callback, args)
|
|
||||||
|
|
||||||
def call_later_ms(self, delay, callback, *args):
|
|
||||||
if not delay:
|
|
||||||
return self.call_soon(callback, *args)
|
|
||||||
self.call_at_(time.ticks_add(self.time(), delay), callback, args)
|
|
||||||
|
|
||||||
def call_at_(self, time, callback, args=()):
|
|
||||||
if __debug__ and DEBUG:
|
|
||||||
log.debug("Scheduling in waitq: %s", (time, callback, args))
|
|
||||||
self.waitq.push(time, callback, args)
|
|
||||||
|
|
||||||
def wait(self, delay):
|
|
||||||
# Default wait implementation, to be overriden in subclasses
|
|
||||||
# with IO scheduling
|
|
||||||
if __debug__ and DEBUG:
|
|
||||||
log.debug("Sleeping for: %s", delay)
|
|
||||||
time.sleep_ms(delay)
|
|
||||||
|
|
||||||
def run_forever(self):
|
|
||||||
cur_task = [0, 0, 0]
|
|
||||||
while True:
|
|
||||||
# Expire entries in waitq and move them to runq
|
|
||||||
tnow = self.time()
|
|
||||||
while self.waitq:
|
|
||||||
t = self.waitq.peektime()
|
|
||||||
delay = time.ticks_diff(t, tnow)
|
|
||||||
if delay > 0:
|
|
||||||
break
|
|
||||||
self.waitq.pop(cur_task)
|
|
||||||
if __debug__ and DEBUG:
|
|
||||||
log.debug("Moving from waitq to runq: %s", cur_task[1])
|
|
||||||
self.call_soon(cur_task[1], *cur_task[2])
|
|
||||||
|
|
||||||
# Process runq
|
|
||||||
l = len(self.runq)
|
|
||||||
if __debug__ and DEBUG:
|
|
||||||
log.debug("Entries in runq: %d", l)
|
|
||||||
while l:
|
|
||||||
cb = self.runq.popleft()
|
|
||||||
l -= 1
|
|
||||||
args = ()
|
|
||||||
if not isinstance(cb, type_gen):
|
|
||||||
args = self.runq.popleft()
|
|
||||||
l -= 1
|
|
||||||
if __debug__ and DEBUG:
|
|
||||||
log.info("Next callback to run: %s", (cb, args))
|
|
||||||
cb(*args)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if __debug__ and DEBUG:
|
|
||||||
log.info("Next coroutine to run: %s", (cb, args))
|
|
||||||
self.cur_task = cb
|
|
||||||
delay = 0
|
|
||||||
try:
|
|
||||||
if args is ():
|
|
||||||
ret = next(cb)
|
|
||||||
else:
|
|
||||||
ret = cb.send(*args)
|
|
||||||
if __debug__ and DEBUG:
|
|
||||||
log.info("Coroutine %s yield result: %s", cb, ret)
|
|
||||||
if isinstance(ret, SysCall1):
|
|
||||||
arg = ret.arg
|
|
||||||
if isinstance(ret, SleepMs):
|
|
||||||
delay = arg
|
|
||||||
elif isinstance(ret, IORead):
|
|
||||||
cb.pend_throw(False)
|
|
||||||
self.add_reader(arg, cb)
|
|
||||||
continue
|
|
||||||
elif isinstance(ret, IOWrite):
|
|
||||||
cb.pend_throw(False)
|
|
||||||
self.add_writer(arg, cb)
|
|
||||||
continue
|
|
||||||
elif isinstance(ret, IOReadDone):
|
|
||||||
self.remove_reader(arg)
|
|
||||||
elif isinstance(ret, IOWriteDone):
|
|
||||||
self.remove_writer(arg)
|
|
||||||
elif isinstance(ret, StopLoop):
|
|
||||||
return arg
|
|
||||||
else:
|
|
||||||
assert False, "Unknown syscall yielded: %r (of type %r)" % (
|
|
||||||
ret,
|
|
||||||
type(ret),
|
|
||||||
)
|
|
||||||
elif isinstance(ret, type_gen):
|
|
||||||
self.call_soon(ret)
|
|
||||||
elif isinstance(ret, int):
|
|
||||||
# Delay
|
|
||||||
delay = ret
|
|
||||||
elif ret is None:
|
|
||||||
# Just reschedule
|
|
||||||
pass
|
|
||||||
elif ret is False:
|
|
||||||
# Don't reschedule
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
assert False, "Unsupported coroutine yield value: %r (of type %r)" % (
|
|
||||||
ret,
|
|
||||||
type(ret),
|
|
||||||
)
|
|
||||||
except StopIteration as e:
|
|
||||||
if __debug__ and DEBUG:
|
|
||||||
log.debug("Coroutine finished: %s", cb)
|
|
||||||
continue
|
|
||||||
except CancelledError as e:
|
|
||||||
if __debug__ and DEBUG:
|
|
||||||
log.debug("Coroutine cancelled: %s", cb)
|
|
||||||
continue
|
|
||||||
# Currently all syscalls don't return anything, so we don't
|
|
||||||
# need to feed anything to the next invocation of coroutine.
|
|
||||||
# If that changes, need to pass that value below.
|
|
||||||
if delay:
|
|
||||||
self.call_later_ms(delay, cb)
|
|
||||||
else:
|
|
||||||
self.call_soon(cb)
|
|
||||||
|
|
||||||
# Wait until next waitq task or I/O availability
|
|
||||||
delay = 0
|
|
||||||
if not self.runq:
|
|
||||||
delay = -1
|
|
||||||
if self.waitq:
|
|
||||||
tnow = self.time()
|
|
||||||
t = self.waitq.peektime()
|
|
||||||
delay = time.ticks_diff(t, tnow)
|
|
||||||
if delay < 0:
|
|
||||||
delay = 0
|
|
||||||
self.wait(delay)
|
|
||||||
|
|
||||||
def run_until_complete(self, coro):
|
|
||||||
def _run_and_stop():
|
|
||||||
yield from coro
|
|
||||||
yield StopLoop(0)
|
|
||||||
|
|
||||||
self.call_soon(_run_and_stop())
|
|
||||||
self.run_forever()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self.call_soon((lambda: (yield StopLoop(0)))())
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class SysCall:
|
|
||||||
def __init__(self, *args):
|
|
||||||
self.args = args
|
|
||||||
|
|
||||||
def handle(self):
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
|
|
||||||
# Optimized syscall with 1 arg
|
|
||||||
class SysCall1(SysCall):
|
|
||||||
def __init__(self, arg):
|
|
||||||
self.arg = arg
|
|
||||||
|
|
||||||
|
|
||||||
class StopLoop(SysCall1):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class IORead(SysCall1):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class IOWrite(SysCall1):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class IOReadDone(SysCall1):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class IOWriteDone(SysCall1):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
_event_loop = None
|
|
||||||
_event_loop_class = EventLoop
|
|
||||||
|
|
||||||
|
|
||||||
def get_event_loop(runq_len=16, waitq_len=16):
|
|
||||||
global _event_loop
|
|
||||||
if _event_loop is None:
|
|
||||||
_event_loop = _event_loop_class(runq_len, waitq_len)
|
|
||||||
return _event_loop
|
|
||||||
|
|
||||||
|
|
||||||
def sleep(secs):
|
|
||||||
yield int(secs * 1000)
|
|
||||||
|
|
||||||
|
|
||||||
# Implementation of sleep_ms awaitable with zero heap memory usage
|
|
||||||
class SleepMs(SysCall1):
|
|
||||||
def __init__(self):
|
|
||||||
self.v = None
|
|
||||||
self.arg = None
|
|
||||||
|
|
||||||
def __call__(self, arg):
|
|
||||||
self.v = arg
|
|
||||||
# print("__call__")
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
# print("__iter__")
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __next__(self):
|
|
||||||
if self.v is not None:
|
|
||||||
# print("__next__ syscall enter")
|
|
||||||
self.arg = self.v
|
|
||||||
self.v = None
|
|
||||||
return self
|
|
||||||
# print("__next__ syscall exit")
|
|
||||||
_stop_iter.__traceback__ = None
|
|
||||||
raise _stop_iter
|
|
||||||
|
|
||||||
|
|
||||||
_stop_iter = StopIteration()
|
|
||||||
sleep_ms = SleepMs()
|
|
||||||
|
|
||||||
|
|
||||||
def cancel(coro):
|
|
||||||
prev = coro.pend_throw(CancelledError())
|
|
||||||
if prev is False:
|
|
||||||
_event_loop.call_soon(coro)
|
|
||||||
|
|
||||||
|
|
||||||
class TimeoutObj:
|
|
||||||
def __init__(self, coro):
|
|
||||||
self.coro = coro
|
|
||||||
|
|
||||||
|
|
||||||
def wait_for_ms(coro, timeout):
|
|
||||||
def waiter(coro, timeout_obj):
|
|
||||||
res = yield from coro
|
|
||||||
if __debug__ and DEBUG:
|
|
||||||
log.debug("waiter: cancelling %s", timeout_obj)
|
|
||||||
timeout_obj.coro = None
|
|
||||||
return res
|
|
||||||
|
|
||||||
def timeout_func(timeout_obj):
|
|
||||||
if timeout_obj.coro:
|
|
||||||
if __debug__ and DEBUG:
|
|
||||||
log.debug("timeout_func: cancelling %s", timeout_obj.coro)
|
|
||||||
prev = timeout_obj.coro.pend_throw(TimeoutError())
|
|
||||||
# print("prev pend", prev)
|
|
||||||
if prev is False:
|
|
||||||
_event_loop.call_soon(timeout_obj.coro)
|
|
||||||
|
|
||||||
timeout_obj = TimeoutObj(_event_loop.cur_task)
|
|
||||||
_event_loop.call_later_ms(timeout, timeout_func, timeout_obj)
|
|
||||||
return (yield from waiter(coro, timeout_obj))
|
|
||||||
|
|
||||||
|
|
||||||
def wait_for(coro, timeout):
|
|
||||||
return wait_for_ms(coro, int(timeout * 1000))
|
|
||||||
|
|
||||||
|
|
||||||
def coroutine(f):
|
|
||||||
return f
|
|
||||||
|
|
||||||
|
|
||||||
#
|
|
||||||
# The functions below are deprecated in uasyncio, and provided only
|
|
||||||
# for compatibility with CPython asyncio
|
|
||||||
#
|
|
||||||
|
|
||||||
|
|
||||||
def ensure_future(coro, loop=_event_loop):
|
|
||||||
_event_loop.call_soon(coro)
|
|
||||||
# CPython asyncio incompatibility: we don't return Task object
|
|
||||||
return coro
|
|
||||||
|
|
||||||
|
|
||||||
# CPython asyncio incompatibility: Task is a function, not a class (for efficiency)
|
|
||||||
def Task(coro, loop=_event_loop):
|
|
||||||
# Same as async()
|
|
||||||
_event_loop.call_soon(coro)
|
|
|
@ -1,5 +0,0 @@
|
||||||
srctype = micropython-lib
|
|
||||||
type = package
|
|
||||||
version = 0.1.2
|
|
||||||
long_desc = Port of asyncio.queues to uasyncio.
|
|
||||||
depends = uasyncio.core, collections.deque
|
|
|
@ -1,25 +0,0 @@
|
||||||
import sys
|
|
||||||
|
|
||||||
# Remove current dir from sys.path, otherwise setuptools will peek up our
|
|
||||||
# module instead of system's.
|
|
||||||
sys.path.pop(0)
|
|
||||||
from setuptools import setup
|
|
||||||
|
|
||||||
sys.path.append("..")
|
|
||||||
import sdist_upip
|
|
||||||
|
|
||||||
setup(
|
|
||||||
name="micropython-uasyncio.queues",
|
|
||||||
version="0.1.2",
|
|
||||||
description="uasyncio.queues module for MicroPython",
|
|
||||||
long_description="Port of asyncio.queues to uasyncio.",
|
|
||||||
url="https://github.com/micropython/micropython-lib",
|
|
||||||
author="micropython-lib Developers",
|
|
||||||
author_email="micro-python@googlegroups.com",
|
|
||||||
maintainer="micropython-lib Developers",
|
|
||||||
maintainer_email="micro-python@googlegroups.com",
|
|
||||||
license="MIT",
|
|
||||||
cmdclass={"sdist": sdist_upip.sdist},
|
|
||||||
packages=["uasyncio"],
|
|
||||||
install_requires=["micropython-uasyncio.core", "micropython-collections.deque"],
|
|
||||||
)
|
|
|
@ -1,57 +0,0 @@
|
||||||
from unittest import TestCase, run_class
|
|
||||||
import sys
|
|
||||||
|
|
||||||
sys.path.insert(0, "../uasyncio")
|
|
||||||
import queues
|
|
||||||
|
|
||||||
|
|
||||||
class QueueTestCase(TestCase):
|
|
||||||
def _val(self, gen):
|
|
||||||
"""Returns val from generator."""
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
gen.send(None)
|
|
||||||
except StopIteration as e:
|
|
||||||
return e.value
|
|
||||||
|
|
||||||
def test_get_put(self):
|
|
||||||
q = queues.Queue(maxsize=1)
|
|
||||||
self._val(q.put(42))
|
|
||||||
self.assertEqual(self._val(q.get()), 42)
|
|
||||||
|
|
||||||
def test_get_put_nowait(self):
|
|
||||||
q = queues.Queue(maxsize=1)
|
|
||||||
q.put_nowait(12)
|
|
||||||
try:
|
|
||||||
q.put_nowait(42)
|
|
||||||
self.assertTrue(False)
|
|
||||||
except Exception as e:
|
|
||||||
self.assertEqual(type(e), queues.QueueFull)
|
|
||||||
self.assertEqual(q.get_nowait(), 12)
|
|
||||||
try:
|
|
||||||
q.get_nowait()
|
|
||||||
self.assertTrue(False)
|
|
||||||
except Exception as e:
|
|
||||||
self.assertEqual(type(e), queues.QueueEmpty)
|
|
||||||
|
|
||||||
def test_qsize(self):
|
|
||||||
q = queues.Queue()
|
|
||||||
for n in range(10):
|
|
||||||
q.put_nowait(10)
|
|
||||||
self.assertEqual(q.qsize(), 10)
|
|
||||||
|
|
||||||
def test_empty(self):
|
|
||||||
q = queues.Queue()
|
|
||||||
self.assertTrue(q.empty())
|
|
||||||
q.put_nowait(10)
|
|
||||||
self.assertFalse(q.empty())
|
|
||||||
|
|
||||||
def test_full(self):
|
|
||||||
q = queues.Queue(maxsize=1)
|
|
||||||
self.assertFalse(q.full())
|
|
||||||
q.put_nowait(10)
|
|
||||||
self.assertTrue(q.full())
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
run_class(QueueTestCase)
|
|
|
@ -1,95 +0,0 @@
|
||||||
from collections.deque import deque
|
|
||||||
from uasyncio.core import sleep
|
|
||||||
|
|
||||||
|
|
||||||
class QueueEmpty(Exception):
|
|
||||||
"""Exception raised by get_nowait()."""
|
|
||||||
|
|
||||||
|
|
||||||
class QueueFull(Exception):
|
|
||||||
"""Exception raised by put_nowait()."""
|
|
||||||
|
|
||||||
|
|
||||||
class Queue:
|
|
||||||
"""A queue, useful for coordinating producer and consumer coroutines.
|
|
||||||
|
|
||||||
If maxsize is less than or equal to zero, the queue size is infinite. If it
|
|
||||||
is an integer greater than 0, then "yield from put()" will block when the
|
|
||||||
queue reaches maxsize, until an item is removed by get().
|
|
||||||
|
|
||||||
Unlike the standard library Queue, you can reliably know this Queue's size
|
|
||||||
with qsize(), since your single-threaded uasyncio application won't be
|
|
||||||
interrupted between calling qsize() and doing an operation on the Queue.
|
|
||||||
"""
|
|
||||||
|
|
||||||
_attempt_delay = 0.1
|
|
||||||
|
|
||||||
def __init__(self, maxsize=0):
|
|
||||||
self.maxsize = maxsize
|
|
||||||
self._queue = deque()
|
|
||||||
|
|
||||||
def _get(self):
|
|
||||||
return self._queue.popleft()
|
|
||||||
|
|
||||||
def get(self):
|
|
||||||
"""Returns generator, which can be used for getting (and removing)
|
|
||||||
an item from a queue.
|
|
||||||
|
|
||||||
Usage::
|
|
||||||
|
|
||||||
item = yield from queue.get()
|
|
||||||
"""
|
|
||||||
while not self._queue:
|
|
||||||
yield from sleep(self._attempt_delay)
|
|
||||||
return self._get()
|
|
||||||
|
|
||||||
def get_nowait(self):
|
|
||||||
"""Remove and return an item from the queue.
|
|
||||||
|
|
||||||
Return an item if one is immediately available, else raise QueueEmpty.
|
|
||||||
"""
|
|
||||||
if not self._queue:
|
|
||||||
raise QueueEmpty()
|
|
||||||
return self._get()
|
|
||||||
|
|
||||||
def _put(self, val):
|
|
||||||
self._queue.append(val)
|
|
||||||
|
|
||||||
def put(self, val):
|
|
||||||
"""Returns generator which can be used for putting item in a queue.
|
|
||||||
|
|
||||||
Usage::
|
|
||||||
|
|
||||||
yield from queue.put(item)
|
|
||||||
"""
|
|
||||||
while self.qsize() >= self.maxsize and self.maxsize:
|
|
||||||
yield from sleep(self._attempt_delay)
|
|
||||||
self._put(val)
|
|
||||||
|
|
||||||
def put_nowait(self, val):
|
|
||||||
"""Put an item into the queue without blocking.
|
|
||||||
|
|
||||||
If no free slot is immediately available, raise QueueFull.
|
|
||||||
"""
|
|
||||||
if self.qsize() >= self.maxsize and self.maxsize:
|
|
||||||
raise QueueFull()
|
|
||||||
self._put(val)
|
|
||||||
|
|
||||||
def qsize(self):
|
|
||||||
"""Number of items in the queue."""
|
|
||||||
return len(self._queue)
|
|
||||||
|
|
||||||
def empty(self):
|
|
||||||
"""Return True if the queue is empty, False otherwise."""
|
|
||||||
return not self._queue
|
|
||||||
|
|
||||||
def full(self):
|
|
||||||
"""Return True if there are maxsize items in the queue.
|
|
||||||
|
|
||||||
Note: if the Queue was initialized with maxsize=0 (the default),
|
|
||||||
then full() is never True.
|
|
||||||
"""
|
|
||||||
if self.maxsize <= 0:
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
return self.qsize() >= self.maxsize
|
|
|
@ -1,27 +0,0 @@
|
||||||
try:
|
|
||||||
import uasyncio.core as asyncio
|
|
||||||
from uasyncio.synchro import Lock
|
|
||||||
except ImportError:
|
|
||||||
import asyncio
|
|
||||||
from asyncio import Lock
|
|
||||||
|
|
||||||
|
|
||||||
def task(i, lock):
|
|
||||||
print(lock)
|
|
||||||
while 1:
|
|
||||||
yield from lock.acquire()
|
|
||||||
print("Acquired lock in task", i)
|
|
||||||
yield from asyncio.sleep(0.5)
|
|
||||||
# yield lock.release()
|
|
||||||
lock.release()
|
|
||||||
|
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
|
|
||||||
lock = Lock()
|
|
||||||
|
|
||||||
loop.create_task(task(1, lock))
|
|
||||||
loop.create_task(task(2, lock))
|
|
||||||
loop.create_task(task(3, lock))
|
|
||||||
|
|
||||||
loop.run_forever()
|
|
|
@ -1,5 +0,0 @@
|
||||||
srctype = micropython-lib
|
|
||||||
type = package
|
|
||||||
version = 0.1.1
|
|
||||||
desc = Synchronization primitives for uasyncio.
|
|
||||||
depends = uasyncio.core
|
|
|
@ -1,25 +0,0 @@
|
||||||
import sys
|
|
||||||
|
|
||||||
# Remove current dir from sys.path, otherwise setuptools will peek up our
|
|
||||||
# module instead of system's.
|
|
||||||
sys.path.pop(0)
|
|
||||||
from setuptools import setup
|
|
||||||
|
|
||||||
sys.path.append("..")
|
|
||||||
import sdist_upip
|
|
||||||
|
|
||||||
setup(
|
|
||||||
name="micropython-uasyncio.synchro",
|
|
||||||
version="0.1.1",
|
|
||||||
description="Synchronization primitives for uasyncio.",
|
|
||||||
long_description="This is a module reimplemented specifically for MicroPython standard library,\nwith efficient and lean design in mind. Note that this module is likely work\nin progress and likely supports just a subset of CPython's corresponding\nmodule. Please help with the development if you are interested in this\nmodule.",
|
|
||||||
url="https://github.com/micropython/micropython-lib",
|
|
||||||
author="micropython-lib Developers",
|
|
||||||
author_email="micro-python@googlegroups.com",
|
|
||||||
maintainer="micropython-lib Developers",
|
|
||||||
maintainer_email="micro-python@googlegroups.com",
|
|
||||||
license="MIT",
|
|
||||||
cmdclass={"sdist": sdist_upip.sdist},
|
|
||||||
packages=["uasyncio"],
|
|
||||||
install_requires=["micropython-uasyncio.core"],
|
|
||||||
)
|
|
|
@ -1,28 +0,0 @@
|
||||||
from uasyncio import core
|
|
||||||
|
|
||||||
|
|
||||||
class Lock:
|
|
||||||
def __init__(self):
|
|
||||||
self.locked = False
|
|
||||||
self.wlist = []
|
|
||||||
|
|
||||||
def release(self):
|
|
||||||
assert self.locked
|
|
||||||
self.locked = False
|
|
||||||
if self.wlist:
|
|
||||||
# print(self.wlist)
|
|
||||||
coro = self.wlist.pop(0)
|
|
||||||
core.get_event_loop().call_soon(coro)
|
|
||||||
|
|
||||||
def acquire(self):
|
|
||||||
# As release() is not coro, assume we just released and going to acquire again
|
|
||||||
# so, yield first to let someone else to acquire it first
|
|
||||||
yield
|
|
||||||
# print("acquire:", self.locked)
|
|
||||||
while 1:
|
|
||||||
if not self.locked:
|
|
||||||
self.locked = True
|
|
||||||
return True
|
|
||||||
# print("putting", core.get_event_loop().cur_task, "on waiting list")
|
|
||||||
self.wlist.append(core.get_event_loop().cur_task)
|
|
||||||
yield False
|
|
|
@ -1,28 +0,0 @@
|
||||||
# This example is intended to run with dnsmasq running on localhost
|
|
||||||
# (Ubuntu comes configured like that by default). Dnsmasq, receiving
|
|
||||||
# some junk, is still kind to reply something back, which we employ
|
|
||||||
# here.
|
|
||||||
import uasyncio
|
|
||||||
import uasyncio.udp
|
|
||||||
import usocket
|
|
||||||
|
|
||||||
|
|
||||||
def udp_req(addr):
|
|
||||||
s = uasyncio.udp.socket()
|
|
||||||
print(s)
|
|
||||||
yield from uasyncio.udp.sendto(s, b"!eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee", addr)
|
|
||||||
try:
|
|
||||||
resp = yield from uasyncio.wait_for(uasyncio.udp.recv(s, 1024), 1)
|
|
||||||
print(resp)
|
|
||||||
except uasyncio.TimeoutError:
|
|
||||||
print("timed out")
|
|
||||||
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
|
||||||
|
|
||||||
addr = usocket.getaddrinfo("127.0.0.1", 53)[0][-1]
|
|
||||||
loop = uasyncio.get_event_loop()
|
|
||||||
loop.run_until_complete(udp_req(addr))
|
|
||||||
loop.close()
|
|
|
@ -1,6 +0,0 @@
|
||||||
srctype = micropython-lib
|
|
||||||
type = package
|
|
||||||
version = 0.1.1
|
|
||||||
author = Paul Sokolovsky
|
|
||||||
desc = UDP support for MicroPython's uasyncio
|
|
||||||
depends = uasyncio
|
|
|
@ -1,25 +0,0 @@
|
||||||
import sys
|
|
||||||
|
|
||||||
# Remove current dir from sys.path, otherwise setuptools will peek up our
|
|
||||||
# module instead of system's.
|
|
||||||
sys.path.pop(0)
|
|
||||||
from setuptools import setup
|
|
||||||
|
|
||||||
sys.path.append("..")
|
|
||||||
import sdist_upip
|
|
||||||
|
|
||||||
setup(
|
|
||||||
name="micropython-uasyncio.udp",
|
|
||||||
version="0.1.1",
|
|
||||||
description="UDP support for MicroPython's uasyncio",
|
|
||||||
long_description="This is a module reimplemented specifically for MicroPython standard library,\nwith efficient and lean design in mind. Note that this module is likely work\nin progress and likely supports just a subset of CPython's corresponding\nmodule. Please help with the development if you are interested in this\nmodule.",
|
|
||||||
url="https://github.com/micropython/micropython-lib",
|
|
||||||
author="Paul Sokolovsky",
|
|
||||||
author_email="micro-python@googlegroups.com",
|
|
||||||
maintainer="micropython-lib Developers",
|
|
||||||
maintainer_email="micro-python@googlegroups.com",
|
|
||||||
license="MIT",
|
|
||||||
cmdclass={"sdist": sdist_upip.sdist},
|
|
||||||
packages=["uasyncio"],
|
|
||||||
install_requires=["micropython-uasyncio"],
|
|
||||||
)
|
|
|
@ -1,64 +0,0 @@
|
||||||
import usocket
|
|
||||||
from uasyncio import core
|
|
||||||
|
|
||||||
|
|
||||||
DEBUG = 0
|
|
||||||
log = None
|
|
||||||
|
|
||||||
|
|
||||||
def set_debug(val):
|
|
||||||
global DEBUG, log
|
|
||||||
DEBUG = val
|
|
||||||
if val:
|
|
||||||
import logging
|
|
||||||
|
|
||||||
log = logging.getLogger("uasyncio.udp")
|
|
||||||
|
|
||||||
|
|
||||||
def socket(af=usocket.AF_INET):
|
|
||||||
s = usocket.socket(af, usocket.SOCK_DGRAM)
|
|
||||||
s.setblocking(False)
|
|
||||||
return s
|
|
||||||
|
|
||||||
|
|
||||||
def recv(s, n):
|
|
||||||
try:
|
|
||||||
yield core.IORead(s)
|
|
||||||
return s.recv(n)
|
|
||||||
except:
|
|
||||||
# print("recv: exc, cleaning up")
|
|
||||||
# print(uasyncio.core._event_loop.objmap, uasyncio.core._event_loop.poller)
|
|
||||||
# uasyncio.core._event_loop.poller.dump()
|
|
||||||
yield core.IOReadDone(s)
|
|
||||||
# print(uasyncio.core._event_loop.objmap)
|
|
||||||
# uasyncio.core._event_loop.poller.dump()
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def recvfrom(s, n):
|
|
||||||
try:
|
|
||||||
yield core.IORead(s)
|
|
||||||
return s.recvfrom(n)
|
|
||||||
except:
|
|
||||||
# print("recv: exc, cleaning up")
|
|
||||||
# print(uasyncio.core._event_loop.objmap, uasyncio.core._event_loop.poller)
|
|
||||||
# uasyncio.core._event_loop.poller.dump()
|
|
||||||
yield core.IOReadDone(s)
|
|
||||||
# print(uasyncio.core._event_loop.objmap)
|
|
||||||
# uasyncio.core._event_loop.poller.dump()
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def sendto(s, buf, addr=None):
|
|
||||||
while 1:
|
|
||||||
res = s.sendto(buf, addr)
|
|
||||||
# print("send res:", res)
|
|
||||||
if res == len(buf):
|
|
||||||
return
|
|
||||||
print("sendto: IOWrite")
|
|
||||||
yield core.IOWrite(s)
|
|
||||||
|
|
||||||
|
|
||||||
def close(s):
|
|
||||||
yield core.IOReadDone(s)
|
|
||||||
s.close()
|
|
|
@ -1,28 +0,0 @@
|
||||||
import uasyncio
|
|
||||||
from uasyncio.websocket.server import WSReader, WSWriter
|
|
||||||
|
|
||||||
|
|
||||||
def echo(reader, writer):
|
|
||||||
# Consume GET line
|
|
||||||
yield from reader.readline()
|
|
||||||
|
|
||||||
reader = yield from WSReader(reader, writer)
|
|
||||||
writer = WSWriter(reader, writer)
|
|
||||||
|
|
||||||
while 1:
|
|
||||||
l = yield from reader.read(256)
|
|
||||||
print(l)
|
|
||||||
if l == b"\r":
|
|
||||||
await writer.awrite(b"\r\n")
|
|
||||||
else:
|
|
||||||
await writer.awrite(l)
|
|
||||||
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
# logging.basicConfig(level=logging.INFO)
|
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
|
||||||
loop = uasyncio.get_event_loop()
|
|
||||||
loop.create_task(uasyncio.start_server(echo, "127.0.0.1", 8081))
|
|
||||||
loop.run_forever()
|
|
||||||
loop.close()
|
|
|
@ -1,5 +0,0 @@
|
||||||
srctype = micropython-lib
|
|
||||||
type = package
|
|
||||||
version = 0.1
|
|
||||||
author = Paul Sokolovsky
|
|
||||||
depends = uasyncio
|
|
|
@ -1,25 +0,0 @@
|
||||||
import sys
|
|
||||||
|
|
||||||
# Remove current dir from sys.path, otherwise setuptools will peek up our
|
|
||||||
# module instead of system's.
|
|
||||||
sys.path.pop(0)
|
|
||||||
from setuptools import setup
|
|
||||||
|
|
||||||
sys.path.append("..")
|
|
||||||
import sdist_upip
|
|
||||||
|
|
||||||
setup(
|
|
||||||
name="micropython-uasyncio.websocket.server",
|
|
||||||
version="0.1",
|
|
||||||
description="uasyncio.websocket.server module for MicroPython",
|
|
||||||
long_description="This is a module reimplemented specifically for MicroPython standard library,\nwith efficient and lean design in mind. Note that this module is likely work\nin progress and likely supports just a subset of CPython's corresponding\nmodule. Please help with the development if you are interested in this\nmodule.",
|
|
||||||
url="https://github.com/micropython/micropython-lib",
|
|
||||||
author="Paul Sokolovsky",
|
|
||||||
author_email="micro-python@googlegroups.com",
|
|
||||||
maintainer="micropython-lib Developers",
|
|
||||||
maintainer_email="micro-python@googlegroups.com",
|
|
||||||
license="MIT",
|
|
||||||
cmdclass={"sdist": sdist_upip.sdist},
|
|
||||||
packages=["uasyncio.websocket"],
|
|
||||||
install_requires=["micropython-uasyncio"],
|
|
||||||
)
|
|
|
@ -1,64 +0,0 @@
|
||||||
import uasyncio
|
|
||||||
import uhashlib, ubinascii
|
|
||||||
import websocket
|
|
||||||
|
|
||||||
|
|
||||||
def make_respkey(webkey):
|
|
||||||
d = uhashlib.sha1(webkey)
|
|
||||||
d.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
|
|
||||||
respkey = d.digest()
|
|
||||||
respkey = ubinascii.b2a_base64(respkey) # [:-1]
|
|
||||||
# Return with trailing "\n".
|
|
||||||
return respkey
|
|
||||||
|
|
||||||
|
|
||||||
class WSWriter:
|
|
||||||
def __init__(self, reader, writer):
|
|
||||||
# Reader is passed for symmetry with WSReader() and ignored.
|
|
||||||
self.s = writer
|
|
||||||
|
|
||||||
async def awrite(self, data):
|
|
||||||
assert len(data) < 126
|
|
||||||
await self.s.awrite(b"\x81")
|
|
||||||
await self.s.awrite(bytes([len(data)]))
|
|
||||||
await self.s.awrite(data)
|
|
||||||
|
|
||||||
|
|
||||||
def WSReader(reader, writer):
|
|
||||||
|
|
||||||
webkey = None
|
|
||||||
while 1:
|
|
||||||
l = yield from reader.readline()
|
|
||||||
print(l)
|
|
||||||
if not l:
|
|
||||||
raise ValueError()
|
|
||||||
if l == b"\r\n":
|
|
||||||
break
|
|
||||||
if l.startswith(b"Sec-WebSocket-Key"):
|
|
||||||
webkey = l.split(b":", 1)[1]
|
|
||||||
webkey = webkey.strip()
|
|
||||||
|
|
||||||
if not webkey:
|
|
||||||
raise ValueError("Not a websocker request")
|
|
||||||
|
|
||||||
respkey = make_respkey(webkey)
|
|
||||||
|
|
||||||
await writer.awrite(
|
|
||||||
b"""\
|
|
||||||
HTTP/1.1 101 Switching Protocols\r
|
|
||||||
Upgrade: websocket\r
|
|
||||||
Connection: Upgrade\r
|
|
||||||
Sec-WebSocket-Accept: """
|
|
||||||
)
|
|
||||||
await writer.awrite(respkey)
|
|
||||||
# This will lead to "<key>\n\r\n" being written. Not exactly
|
|
||||||
# "\r\n\r\n", but browsers seem to eat it.
|
|
||||||
await writer.awrite("\r\n")
|
|
||||||
# await writer.awrite("\r\n\r\n")
|
|
||||||
|
|
||||||
print("Finished webrepl handshake")
|
|
||||||
|
|
||||||
ws = websocket.websocket(reader.ios)
|
|
||||||
rws = uasyncio.StreamReader(reader.ios, ws)
|
|
||||||
|
|
||||||
return rws
|
|
|
@ -1,43 +0,0 @@
|
||||||
uasyncio
|
|
||||||
========
|
|
||||||
|
|
||||||
uasyncio is MicroPython's asynchronous sheduling library, roughly
|
|
||||||
modeled after CPython's asyncio.
|
|
||||||
|
|
||||||
uasyncio doesn't use naive always-iterating scheduling algorithm,
|
|
||||||
but performs a real time-based scheduling, which allows it (and
|
|
||||||
thus the whole system) to sleep when there is nothing to do (actual
|
|
||||||
implementation of that depends on I/O scheduling algorithm which
|
|
||||||
actually performs the wait operation).
|
|
||||||
|
|
||||||
Major conceptual differences to asyncio:
|
|
||||||
|
|
||||||
* Avoids defining a notion of Future, and especially wrapping coroutines
|
|
||||||
in Futures, like CPython asyncio does. uasyncio works directly with
|
|
||||||
coroutines (and callbacks).
|
|
||||||
* Methods provided are more consistently coroutines.
|
|
||||||
* uasyncio uses wrap-around millisecond timebase (as native to all
|
|
||||||
MicroPython ports.)
|
|
||||||
* Instead of single large package, number of subpackages are provided
|
|
||||||
(each installable separately).
|
|
||||||
|
|
||||||
Specific differences:
|
|
||||||
|
|
||||||
* For millisecond scheduling, ``loop.call_later_ms()`` and
|
|
||||||
``uasyncio.sleep_ms()`` are provided.
|
|
||||||
* As there's no monotonic time, ``loop.call_at()`` is not provided.
|
|
||||||
Instead, there's ``loop.call_at_()`` which is considered an internal
|
|
||||||
function and has slightly different signature.
|
|
||||||
* ``call_*`` funcions don't return Handle and callbacks scheduled by
|
|
||||||
them aren't cancellable. If they need to be cancellable, they should
|
|
||||||
accept an object as an argument, and a "cancel" flag should be set
|
|
||||||
in the object, for a callback to test.
|
|
||||||
* ``Future`` object is not available.
|
|
||||||
* ``ensure_future()`` and ``Task()`` perform just scheduling operations
|
|
||||||
and return a native coroutine, not Future/Task objects.
|
|
||||||
* Some other functions are not (yet) implemented.
|
|
||||||
* StreamWriter method(s) are coroutines. While in CPython asyncio,
|
|
||||||
StreamWriter.write() is a normal function (which potentially buffers
|
|
||||||
unlimited amount of data), uasyncio offers coroutine StreamWriter.awrite()
|
|
||||||
instead. Also, both StreamReader and StreamWriter have .aclose()
|
|
||||||
coroutine method.
|
|
|
@ -1,45 +0,0 @@
|
||||||
Testing and Validating
|
|
||||||
----------------------
|
|
||||||
|
|
||||||
To test uasyncio correctness and performance, HTTP server samples can be
|
|
||||||
used. The simplest test is with test_http_server.py and Apache Benchmark
|
|
||||||
(ab). In one window, run:
|
|
||||||
|
|
||||||
micropython -O test_http_server.py
|
|
||||||
|
|
||||||
(-O is needed to short-circuit debug logging calls.)
|
|
||||||
|
|
||||||
In another:
|
|
||||||
|
|
||||||
ab -n10000 -c10 http://localhost:8081/
|
|
||||||
|
|
||||||
ab tests that all responses have the same length, but doesn't check
|
|
||||||
content. test_http_server.py also serves very short, static reply.
|
|
||||||
|
|
||||||
|
|
||||||
For more heavy testing, test_http_server_heavy.py is provided. It serves
|
|
||||||
large response split among several async writes. It is also dynamic -
|
|
||||||
includes incrementing counter, so each response will be different. The
|
|
||||||
response size generates is more 4Mb, because under Linux, socket writes
|
|
||||||
can buffer up to 4Mb of content (this appear to be controlled by
|
|
||||||
/proc/sys/net/ipv4/tcp_wmem and not /proc/sys/net/core/wmem_default).
|
|
||||||
test_http_server_heavy.py also includes (trivial) handling of
|
|
||||||
client-induced errors like EPIPE and ECONNRESET. To validate content
|
|
||||||
served, a post-hook script for "boom" tool
|
|
||||||
(https://github.com/tarekziade/boom) is provided.
|
|
||||||
|
|
||||||
Before start, you may want to bump .listen() value in uasyncio/__init__.py
|
|
||||||
from default 10 to at least 30.
|
|
||||||
|
|
||||||
Start:
|
|
||||||
|
|
||||||
micropython -X heapsize=300000000 -O test_http_server_heavy.py
|
|
||||||
|
|
||||||
(Yes, that's 300Mb of heap - we'll be serving 4+Mb of content with 30
|
|
||||||
concurrent connections).
|
|
||||||
|
|
||||||
And:
|
|
||||||
|
|
||||||
PYTHONPATH=. boom -n1000 -c30 http://localhost:8081 --post-hook=boom_uasyncio.validate
|
|
||||||
|
|
||||||
There should be no Python exceptions in the output.
|
|
|
@ -1,39 +0,0 @@
|
||||||
#
|
|
||||||
# This is validation script for "boom" tool https://github.com/tarekziade/boom
|
|
||||||
# To use it:
|
|
||||||
#
|
|
||||||
# boom -n1000 --post-hook=boom_uasyncio.validate <rest of boom args>
|
|
||||||
#
|
|
||||||
# Note that if you'll use other -n value, you should update NUM_REQS below
|
|
||||||
# to match.
|
|
||||||
#
|
|
||||||
|
|
||||||
NUM_REQS = 1000
|
|
||||||
seen = []
|
|
||||||
cnt = 0
|
|
||||||
|
|
||||||
|
|
||||||
def validate(resp):
|
|
||||||
global cnt
|
|
||||||
t = resp.text
|
|
||||||
l = t.split("\r\n", 1)[0]
|
|
||||||
no = int(l.split()[1])
|
|
||||||
seen.append(no)
|
|
||||||
c = t.count(l + "\r\n")
|
|
||||||
assert c == 400101, str(c)
|
|
||||||
assert t.endswith("=== END ===")
|
|
||||||
|
|
||||||
cnt += 1
|
|
||||||
if cnt == NUM_REQS:
|
|
||||||
seen.sort()
|
|
||||||
print
|
|
||||||
print seen
|
|
||||||
print
|
|
||||||
el = None
|
|
||||||
for i in seen:
|
|
||||||
if el is None:
|
|
||||||
el = i
|
|
||||||
else:
|
|
||||||
el += 1
|
|
||||||
assert i == el
|
|
||||||
return resp
|
|
|
@ -1,17 +0,0 @@
|
||||||
#!/bin/sh
|
|
||||||
#
|
|
||||||
# This in one-shot scripts to test "light load" uasyncio HTTP server using
|
|
||||||
# Apache Bench (ab).
|
|
||||||
#
|
|
||||||
|
|
||||||
#python3.4.2 test_http_server_light.py &
|
|
||||||
#micropython -O test_http_server_light.py &
|
|
||||||
|
|
||||||
#python3.4.2 test_http_server_medium.py &
|
|
||||||
micropython -O -X heapsize=200wK test_http_server_medium.py &
|
|
||||||
|
|
||||||
sleep 1
|
|
||||||
|
|
||||||
ab -n10000 -c100 http://127.0.0.1:8081/
|
|
||||||
|
|
||||||
kill %1
|
|
|
@ -1,28 +0,0 @@
|
||||||
#!/bin/sh
|
|
||||||
#
|
|
||||||
# This in one-shot scripts to test "heavy load" uasyncio HTTP server using
|
|
||||||
# Boom tool https://github.com/tarekziade/boom .
|
|
||||||
#
|
|
||||||
# Note that this script doesn't test performance, but rather test functional
|
|
||||||
# correctness of uasyncio server implementation, while serving large amounts
|
|
||||||
# of data (guaranteedly more than a socket buffer). Thus, this script should
|
|
||||||
# not be used for benchmarking.
|
|
||||||
#
|
|
||||||
|
|
||||||
if [ ! -d .venv-boom ]; then
|
|
||||||
virtualenv .venv-boom
|
|
||||||
. .venv-boom/bin/activate
|
|
||||||
# PyPI currently has 0.8 which is too old
|
|
||||||
#pip install boom
|
|
||||||
pip install git+https://github.com/tarekziade/boom
|
|
||||||
else
|
|
||||||
. .venv-boom/bin/activate
|
|
||||||
fi
|
|
||||||
|
|
||||||
|
|
||||||
micropython -X heapsize=300000000 -O test_http_server_heavy.py &
|
|
||||||
sleep 1
|
|
||||||
|
|
||||||
PYTHONPATH=. boom -n1000 -c30 http://localhost:8081 --post-hook=boom_uasyncio.validate
|
|
||||||
|
|
||||||
kill %1
|
|
|
@ -1,42 +0,0 @@
|
||||||
import uasyncio as asyncio
|
|
||||||
import signal
|
|
||||||
import errno
|
|
||||||
|
|
||||||
|
|
||||||
cnt = 0
|
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
|
||||||
def serve(reader, writer):
|
|
||||||
global cnt
|
|
||||||
# s = "Hello.\r\n"
|
|
||||||
s = "Hello. %07d\r\n" % cnt
|
|
||||||
cnt += 1
|
|
||||||
yield from reader.read()
|
|
||||||
yield from writer.awrite("HTTP/1.0 200 OK\r\n\r\n")
|
|
||||||
try:
|
|
||||||
yield from writer.awrite(s)
|
|
||||||
yield from writer.awrite(s * 100)
|
|
||||||
yield from writer.awrite(s * 400000)
|
|
||||||
yield from writer.awrite("=== END ===")
|
|
||||||
except OSError as e:
|
|
||||||
if e.args[0] == errno.EPIPE:
|
|
||||||
print("EPIPE")
|
|
||||||
elif e.args[0] == errno.ECONNRESET:
|
|
||||||
print("ECONNRESET")
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
finally:
|
|
||||||
yield from writer.aclose()
|
|
||||||
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
|
||||||
# logging.basicConfig(level=logging.DEBUG)
|
|
||||||
signal.signal(signal.SIGPIPE, signal.SIG_IGN)
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
# mem_info()
|
|
||||||
loop.call_soon(asyncio.start_server(serve, "0.0.0.0", 8081, backlog=100))
|
|
||||||
loop.run_forever()
|
|
||||||
loop.close()
|
|
|
@ -1,22 +0,0 @@
|
||||||
import uasyncio as asyncio
|
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
|
||||||
def serve(reader, writer):
|
|
||||||
# print(reader, writer)
|
|
||||||
# print("================")
|
|
||||||
yield from reader.read(512)
|
|
||||||
yield from writer.awrite("HTTP/1.0 200 OK\r\n\r\nHello.\r\n")
|
|
||||||
yield from writer.aclose()
|
|
||||||
# print("Finished processing request")
|
|
||||||
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
# logging.basicConfig(level=logging.INFO)
|
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
# mem_info()
|
|
||||||
loop.create_task(asyncio.start_server(serve, "127.0.0.1", 8081, backlog=100))
|
|
||||||
loop.run_forever()
|
|
||||||
loop.close()
|
|
|
@ -1,24 +0,0 @@
|
||||||
import uasyncio as asyncio
|
|
||||||
|
|
||||||
resp = "HTTP/1.0 200 OK\r\n\r\n" + "Hello.\r\n" * 1500
|
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
|
||||||
def serve(reader, writer):
|
|
||||||
# print(reader, writer)
|
|
||||||
# print("================")
|
|
||||||
yield from reader.read(512)
|
|
||||||
yield from writer.awrite(resp)
|
|
||||||
yield from writer.aclose()
|
|
||||||
# print("Finished processing request")
|
|
||||||
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
# logging.basicConfig(level=logging.INFO)
|
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
|
||||||
loop = asyncio.get_event_loop(80)
|
|
||||||
# mem_info()
|
|
||||||
loop.create_task(asyncio.start_server(serve, "127.0.0.1", 8081, backlog=100))
|
|
||||||
loop.run_forever()
|
|
||||||
loop.close()
|
|
|
@ -1,27 +0,0 @@
|
||||||
import uasyncio as asyncio
|
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
|
||||||
def print_http_headers(url):
|
|
||||||
reader, writer = yield from asyncio.open_connection(url, 80)
|
|
||||||
print(reader, writer)
|
|
||||||
print("================")
|
|
||||||
query = "GET / HTTP/1.0\r\n\r\n"
|
|
||||||
yield from writer.awrite(query.encode("latin-1"))
|
|
||||||
while True:
|
|
||||||
line = yield from reader.readline()
|
|
||||||
if not line:
|
|
||||||
break
|
|
||||||
if line:
|
|
||||||
print(line.rstrip())
|
|
||||||
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
|
||||||
url = "google.com"
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
# task = asyncio.async(print_http_headers(url))
|
|
||||||
# loop.run_until_complete(task)
|
|
||||||
loop.run_until_complete(print_http_headers(url))
|
|
||||||
loop.close()
|
|
|
@ -1,22 +0,0 @@
|
||||||
import uasyncio as asyncio
|
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
|
||||||
def serve(reader, writer):
|
|
||||||
print(reader, writer)
|
|
||||||
print("================")
|
|
||||||
print((yield from reader.read()))
|
|
||||||
yield from writer.awrite("HTTP/1.0 200 OK\r\n\r\nHello.\r\n")
|
|
||||||
print("After response write")
|
|
||||||
yield from writer.aclose()
|
|
||||||
print("Finished processing request")
|
|
||||||
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
# logging.basicConfig(level=logging.INFO)
|
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
loop.call_soon(asyncio.start_server(serve, "127.0.0.1", 8081))
|
|
||||||
loop.run_forever()
|
|
||||||
loop.close()
|
|
|
@ -1,7 +0,0 @@
|
||||||
srctype = micropython-lib
|
|
||||||
type = package
|
|
||||||
version = 2.0
|
|
||||||
author = Paul Sokolovsky
|
|
||||||
desc = Lightweight asyncio-like library for MicroPython, built around native Python coroutines.
|
|
||||||
long_desc = README.rst
|
|
||||||
depends = uasyncio.core
|
|
|
@ -1,25 +0,0 @@
|
||||||
import sys
|
|
||||||
|
|
||||||
# Remove current dir from sys.path, otherwise setuptools will peek up our
|
|
||||||
# module instead of system's.
|
|
||||||
sys.path.pop(0)
|
|
||||||
from setuptools import setup
|
|
||||||
|
|
||||||
sys.path.append("..")
|
|
||||||
import sdist_upip
|
|
||||||
|
|
||||||
setup(
|
|
||||||
name="micropython-uasyncio",
|
|
||||||
version="2.0",
|
|
||||||
description="Lightweight asyncio-like library for MicroPython, built around native Python coroutines.",
|
|
||||||
long_description=open("README.rst").read(),
|
|
||||||
url="https://github.com/micropython/micropython-lib",
|
|
||||||
author="Paul Sokolovsky",
|
|
||||||
author_email="micro-python@googlegroups.com",
|
|
||||||
maintainer="micropython-lib Developers",
|
|
||||||
maintainer_email="micro-python@googlegroups.com",
|
|
||||||
license="MIT",
|
|
||||||
cmdclass={"sdist": sdist_upip.sdist},
|
|
||||||
packages=["uasyncio"],
|
|
||||||
install_requires=["micropython-uasyncio.core"],
|
|
||||||
)
|
|
|
@ -1,32 +0,0 @@
|
||||||
from uasyncio import get_event_loop, open_connection, start_server, sleep_ms
|
|
||||||
from unittest import main, TestCase
|
|
||||||
|
|
||||||
|
|
||||||
class EchoTestCase(TestCase):
|
|
||||||
def test_client_server(self):
|
|
||||||
"""Simple client-server echo test"""
|
|
||||||
sockaddr = ("127.0.0.1", 8080)
|
|
||||||
l = get_event_loop()
|
|
||||||
|
|
||||||
async def echo_server(reader, writer):
|
|
||||||
data = await reader.readline()
|
|
||||||
await writer.awrite(data)
|
|
||||||
await writer.aclose()
|
|
||||||
|
|
||||||
async def echo_client(line, result):
|
|
||||||
await sleep_ms(10) # Allow server to get up
|
|
||||||
reader, writer = await open_connection(*sockaddr)
|
|
||||||
await writer.awrite(line)
|
|
||||||
data = await reader.readline()
|
|
||||||
await writer.aclose()
|
|
||||||
result.append(data) # capture response
|
|
||||||
|
|
||||||
result = []
|
|
||||||
l.create_task(start_server(echo_server, *sockaddr))
|
|
||||||
l.run_until_complete(echo_client(b"Hello\r\n", result))
|
|
||||||
|
|
||||||
self.assertEqual(result[0], b"Hello\r\n")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
|
@ -1,37 +0,0 @@
|
||||||
try:
|
|
||||||
import uasyncio as asyncio
|
|
||||||
except:
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
try:
|
|
||||||
import utime as time
|
|
||||||
except:
|
|
||||||
import time
|
|
||||||
|
|
||||||
done = False
|
|
||||||
|
|
||||||
|
|
||||||
async def receiver():
|
|
||||||
global done
|
|
||||||
with open("test_io_starve.py", "rb") as f:
|
|
||||||
sreader = asyncio.StreamReader(f)
|
|
||||||
while True:
|
|
||||||
await asyncio.sleep(0.1)
|
|
||||||
res = await sreader.readline()
|
|
||||||
# Didn't get there with the original problem this test shows
|
|
||||||
done = True
|
|
||||||
|
|
||||||
|
|
||||||
async def foo():
|
|
||||||
start = time.time()
|
|
||||||
while time.time() - start < 1:
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
loop.stop()
|
|
||||||
|
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
loop.create_task(foo())
|
|
||||||
loop.create_task(receiver())
|
|
||||||
loop.run_forever()
|
|
||||||
assert done
|
|
||||||
print("OK")
|
|
|
@ -1,39 +0,0 @@
|
||||||
from uasyncio import StreamReader
|
|
||||||
|
|
||||||
|
|
||||||
class MockSock:
|
|
||||||
def __init__(self, data_list):
|
|
||||||
self.data = data_list
|
|
||||||
|
|
||||||
def read(self, sz):
|
|
||||||
try:
|
|
||||||
return self.data.pop(0)
|
|
||||||
except IndexError:
|
|
||||||
return b""
|
|
||||||
|
|
||||||
|
|
||||||
mock = MockSock(
|
|
||||||
[
|
|
||||||
b"123",
|
|
||||||
b"234",
|
|
||||||
b"5",
|
|
||||||
b"a",
|
|
||||||
b"b",
|
|
||||||
b"c",
|
|
||||||
b"d",
|
|
||||||
b"e",
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def func():
|
|
||||||
sr = StreamReader(mock)
|
|
||||||
assert await sr.readexactly(3) == b"123"
|
|
||||||
assert await sr.readexactly(4) == b"2345"
|
|
||||||
assert await sr.readexactly(5) == b"abcde"
|
|
||||||
# This isn't how it should be, but the current behavior
|
|
||||||
assert await sr.readexactly(10) == b""
|
|
||||||
|
|
||||||
|
|
||||||
for i in func():
|
|
||||||
pass
|
|
|
@ -1,35 +0,0 @@
|
||||||
from uasyncio import StreamReader
|
|
||||||
|
|
||||||
|
|
||||||
class MockSock:
|
|
||||||
def __init__(self, data_list):
|
|
||||||
self.data = data_list
|
|
||||||
|
|
||||||
def readline(self):
|
|
||||||
try:
|
|
||||||
return self.data.pop(0)
|
|
||||||
except IndexError:
|
|
||||||
return b""
|
|
||||||
|
|
||||||
|
|
||||||
mock = MockSock(
|
|
||||||
[
|
|
||||||
b"line1\n",
|
|
||||||
b"parts ",
|
|
||||||
b"of ",
|
|
||||||
b"line2\n",
|
|
||||||
b"unterminated",
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def func():
|
|
||||||
sr = StreamReader(mock)
|
|
||||||
assert await sr.readline() == b"line1\n"
|
|
||||||
assert await sr.readline() == b"parts of line2\n"
|
|
||||||
assert await sr.readline() == b"unterminated"
|
|
||||||
assert await sr.readline() == b""
|
|
||||||
|
|
||||||
|
|
||||||
for i in func():
|
|
||||||
pass
|
|
|
@ -1,259 +0,0 @@
|
||||||
import uerrno
|
|
||||||
import uselect as select
|
|
||||||
import usocket as _socket
|
|
||||||
from uasyncio.core import *
|
|
||||||
|
|
||||||
|
|
||||||
DEBUG = 0
|
|
||||||
log = None
|
|
||||||
|
|
||||||
|
|
||||||
def set_debug(val):
|
|
||||||
global DEBUG, log
|
|
||||||
DEBUG = val
|
|
||||||
if val:
|
|
||||||
import logging
|
|
||||||
|
|
||||||
log = logging.getLogger("uasyncio")
|
|
||||||
|
|
||||||
|
|
||||||
class PollEventLoop(EventLoop):
|
|
||||||
def __init__(self, runq_len=16, waitq_len=16):
|
|
||||||
EventLoop.__init__(self, runq_len, waitq_len)
|
|
||||||
self.poller = select.poll()
|
|
||||||
self.objmap = {}
|
|
||||||
|
|
||||||
def add_reader(self, sock, cb, *args):
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("add_reader%s", (sock, cb, args))
|
|
||||||
if args:
|
|
||||||
self.poller.register(sock, select.POLLIN)
|
|
||||||
self.objmap[id(sock)] = (cb, args)
|
|
||||||
else:
|
|
||||||
self.poller.register(sock, select.POLLIN)
|
|
||||||
self.objmap[id(sock)] = cb
|
|
||||||
|
|
||||||
def remove_reader(self, sock):
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("remove_reader(%s)", sock)
|
|
||||||
self.poller.unregister(sock)
|
|
||||||
del self.objmap[id(sock)]
|
|
||||||
|
|
||||||
def add_writer(self, sock, cb, *args):
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("add_writer%s", (sock, cb, args))
|
|
||||||
if args:
|
|
||||||
self.poller.register(sock, select.POLLOUT)
|
|
||||||
self.objmap[id(sock)] = (cb, args)
|
|
||||||
else:
|
|
||||||
self.poller.register(sock, select.POLLOUT)
|
|
||||||
self.objmap[id(sock)] = cb
|
|
||||||
|
|
||||||
def remove_writer(self, sock):
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("remove_writer(%s)", sock)
|
|
||||||
try:
|
|
||||||
self.poller.unregister(sock)
|
|
||||||
self.objmap.pop(id(sock), None)
|
|
||||||
except OSError as e:
|
|
||||||
# StreamWriter.awrite() first tries to write to a socket,
|
|
||||||
# and if that succeeds, yield IOWrite may never be called
|
|
||||||
# for that socket, and it will never be added to poller. So,
|
|
||||||
# ignore such error.
|
|
||||||
if e.args[0] != uerrno.ENOENT:
|
|
||||||
raise
|
|
||||||
|
|
||||||
def wait(self, delay):
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("poll.wait(%d)", delay)
|
|
||||||
# We need one-shot behavior (second arg of 1 to .poll())
|
|
||||||
res = self.poller.ipoll(delay, 1)
|
|
||||||
# log.debug("poll result: %s", res)
|
|
||||||
# Remove "if res" workaround after
|
|
||||||
# https://github.com/micropython/micropython/issues/2716 fixed.
|
|
||||||
if res:
|
|
||||||
for sock, ev in res:
|
|
||||||
cb = self.objmap[id(sock)]
|
|
||||||
if ev & (select.POLLHUP | select.POLLERR):
|
|
||||||
# These events are returned even if not requested, and
|
|
||||||
# are sticky, i.e. will be returned again and again.
|
|
||||||
# If the caller doesn't do proper error handling and
|
|
||||||
# unregister this sock, we'll busy-loop on it, so we
|
|
||||||
# as well can unregister it now "just in case".
|
|
||||||
self.remove_reader(sock)
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("Calling IO callback: %r", cb)
|
|
||||||
if isinstance(cb, tuple):
|
|
||||||
cb[0](*cb[1])
|
|
||||||
else:
|
|
||||||
cb.pend_throw(None)
|
|
||||||
self.call_soon(cb)
|
|
||||||
|
|
||||||
|
|
||||||
class StreamReader:
|
|
||||||
def __init__(self, polls, ios=None):
|
|
||||||
if ios is None:
|
|
||||||
ios = polls
|
|
||||||
self.polls = polls
|
|
||||||
self.ios = ios
|
|
||||||
|
|
||||||
def read(self, n=-1):
|
|
||||||
while True:
|
|
||||||
yield IORead(self.polls)
|
|
||||||
res = self.ios.read(n)
|
|
||||||
if res is not None:
|
|
||||||
break
|
|
||||||
# This should not happen for real sockets, but can easily
|
|
||||||
# happen for stream wrappers (ssl, websockets, etc.)
|
|
||||||
# log.warn("Empty read")
|
|
||||||
if not res:
|
|
||||||
yield IOReadDone(self.polls)
|
|
||||||
return res
|
|
||||||
|
|
||||||
def readexactly(self, n):
|
|
||||||
buf = b""
|
|
||||||
while n:
|
|
||||||
yield IORead(self.polls)
|
|
||||||
res = self.ios.read(n)
|
|
||||||
assert res is not None
|
|
||||||
if not res:
|
|
||||||
yield IOReadDone(self.polls)
|
|
||||||
break
|
|
||||||
buf += res
|
|
||||||
n -= len(res)
|
|
||||||
return buf
|
|
||||||
|
|
||||||
def readline(self):
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("StreamReader.readline()")
|
|
||||||
buf = b""
|
|
||||||
while True:
|
|
||||||
yield IORead(self.polls)
|
|
||||||
res = self.ios.readline()
|
|
||||||
assert res is not None
|
|
||||||
if not res:
|
|
||||||
yield IOReadDone(self.polls)
|
|
||||||
break
|
|
||||||
buf += res
|
|
||||||
if buf[-1] == 0x0A:
|
|
||||||
break
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("StreamReader.readline(): %s", buf)
|
|
||||||
return buf
|
|
||||||
|
|
||||||
def aclose(self):
|
|
||||||
yield IOReadDone(self.polls)
|
|
||||||
self.ios.close()
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<StreamReader %r %r>" % (self.polls, self.ios)
|
|
||||||
|
|
||||||
|
|
||||||
class StreamWriter:
|
|
||||||
def __init__(self, s, extra):
|
|
||||||
self.s = s
|
|
||||||
self.extra = extra
|
|
||||||
|
|
||||||
def awrite(self, buf, off=0, sz=-1):
|
|
||||||
# This method is called awrite (async write) to not proliferate
|
|
||||||
# incompatibility with original asyncio. Unlike original asyncio
|
|
||||||
# whose .write() method is both not a coroutine and guaranteed
|
|
||||||
# to return immediately (which means it has to buffer all the
|
|
||||||
# data), this method is a coroutine.
|
|
||||||
if sz == -1:
|
|
||||||
sz = len(buf) - off
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("StreamWriter.awrite(): spooling %d bytes", sz)
|
|
||||||
while True:
|
|
||||||
res = self.s.write(buf, off, sz)
|
|
||||||
# If we spooled everything, return immediately
|
|
||||||
if res == sz:
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("StreamWriter.awrite(): completed spooling %d bytes", res)
|
|
||||||
return
|
|
||||||
if res is None:
|
|
||||||
res = 0
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("StreamWriter.awrite(): spooled partial %d bytes", res)
|
|
||||||
assert res < sz
|
|
||||||
off += res
|
|
||||||
sz -= res
|
|
||||||
yield IOWrite(self.s)
|
|
||||||
# assert s2.fileno() == self.s.fileno()
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("StreamWriter.awrite(): can write more")
|
|
||||||
|
|
||||||
# Write piecewise content from iterable (usually, a generator)
|
|
||||||
def awriteiter(self, iterable):
|
|
||||||
for buf in iterable:
|
|
||||||
yield from self.awrite(buf)
|
|
||||||
|
|
||||||
def aclose(self):
|
|
||||||
yield IOWriteDone(self.s)
|
|
||||||
self.s.close()
|
|
||||||
|
|
||||||
def get_extra_info(self, name, default=None):
|
|
||||||
return self.extra.get(name, default)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<StreamWriter %r>" % self.s
|
|
||||||
|
|
||||||
|
|
||||||
def open_connection(host, port, ssl=False):
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("open_connection(%s, %s)", host, port)
|
|
||||||
ai = _socket.getaddrinfo(host, port, 0, _socket.SOCK_STREAM)
|
|
||||||
ai = ai[0]
|
|
||||||
s = _socket.socket(ai[0], ai[1], ai[2])
|
|
||||||
s.setblocking(False)
|
|
||||||
try:
|
|
||||||
s.connect(ai[-1])
|
|
||||||
except OSError as e:
|
|
||||||
if e.args[0] != uerrno.EINPROGRESS:
|
|
||||||
raise
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("open_connection: After connect")
|
|
||||||
yield IOWrite(s)
|
|
||||||
# if __debug__:
|
|
||||||
# assert s2.fileno() == s.fileno()
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("open_connection: After iowait: %s", s)
|
|
||||||
if ssl:
|
|
||||||
print("Warning: uasyncio SSL support is alpha")
|
|
||||||
import ussl
|
|
||||||
|
|
||||||
s.setblocking(True)
|
|
||||||
s2 = ussl.wrap_socket(s)
|
|
||||||
s.setblocking(False)
|
|
||||||
return StreamReader(s, s2), StreamWriter(s2, {})
|
|
||||||
return StreamReader(s), StreamWriter(s, {})
|
|
||||||
|
|
||||||
|
|
||||||
def start_server(client_coro, host, port, backlog=10):
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("start_server(%s, %s)", host, port)
|
|
||||||
ai = _socket.getaddrinfo(host, port, 0, _socket.SOCK_STREAM)
|
|
||||||
ai = ai[0]
|
|
||||||
s = _socket.socket(ai[0], ai[1], ai[2])
|
|
||||||
s.setblocking(False)
|
|
||||||
|
|
||||||
s.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1)
|
|
||||||
s.bind(ai[-1])
|
|
||||||
s.listen(backlog)
|
|
||||||
while True:
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("start_server: Before accept")
|
|
||||||
yield IORead(s)
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("start_server: After iowait")
|
|
||||||
s2, client_addr = s.accept()
|
|
||||||
s2.setblocking(False)
|
|
||||||
if DEBUG and __debug__:
|
|
||||||
log.debug("start_server: After accept: %s", s2)
|
|
||||||
extra = {"peername": client_addr}
|
|
||||||
yield client_coro(StreamReader(s2), StreamWriter(s2, extra))
|
|
||||||
|
|
||||||
|
|
||||||
import uasyncio.core
|
|
||||||
|
|
||||||
uasyncio.core._event_loop_class = PollEventLoop
|
|
Ładowanie…
Reference in New Issue