uasyncio: implement as package, add primitives.

pull/12/head
Peter Hinch 2019-11-20 11:18:38 +00:00
rodzic 1a2084425a
commit 9a697a8591
12 zmienionych plików z 962 dodań i 374 usunięć

Wyświetl plik

@ -0,0 +1,45 @@
# Changes to usayncio
1. Implement as a Python package.
2. Implement synchronisation primitives as package modules to conserve RAM.
3. Add .priority method to Stream class. Enables I/O to be handled at high
priority on a per-device basis.
4. Rename task queue class TQueue to avoid name clash with Queue primitive.
5. Rename task queue instance to tqueue as it is used by primitives.
## Minor changes
1. Move StreamReader and StreamWriter assignments out of legacy section of code.
2. CreateTask produces an assertion fail if called with a generator function.
Avoids obscure traceback if someone omits the parens.
3. Add machine readable version.
# CPython-compatible synchronisation primitives
These have been adapted to work efficiently with the new version.
1. `Event`: moved to separate module for consistency with other primitives.
2. `Lock`: Kevin Köck's solution.
3. `Queue`: Paul's solution adapted for efficiency.
4. `Semaphore`: Also implements BoundedSemaphore.
5. `Condition`.
# Other primitives
1. Message: Awaitable `Event` subclass with a data payload.
2. Barrier: Multiple tasks wait until all reach a Barrier instance. Or some tasks
wait until others have triggered the Barrier instance.
# Test scripts
Hopefully these are self-documenting on import.
1. `prim_test.py` Tests for synchronisation primitives.
2. `test_fast_scheduling.py` Demonstrates difference between normal and priority
I/O scheduling. Runs on Pyboard.
3. `ms_timer.py` and `ms_timer_test.py` A practical use of priority scheduling to
implement a timer with higher precision than `asyncio.sleep_ms`. Runs on Pyboard.
# Note
Use of I/O is still incompatible with Unix.

Wyświetl plik

@ -1,284 +0,0 @@
import uerrno
import uselect as select
import usocket as _socket
from uasyncio.core import *
DEBUG = 0
log = None
def set_debug(val):
global DEBUG, log
DEBUG = val
if val:
import logging
log = logging.getLogger("uasyncio")
# add_writer causes read failure if passed the same sock instance as was passed
# to add_reader. Cand we fix this by maintaining two object maps?
class PollEventLoop(EventLoop):
def __init__(self, runq_len=16, waitq_len=16):
EventLoop.__init__(self, runq_len, waitq_len)
self.poller = select.poll()
self.rdobjmap = {}
self.wrobjmap = {}
self.flags = {}
# Remove registration of sock for reading or writing.
def _unregister(self, sock, objmap, flag):
# If StreamWriter.awrite() wrote entire buf on 1st pass sock will never
# have been registered. So test for presence in .flags.
if id(sock) in self.flags:
flags = self.flags[id(sock)]
if flags & flag: # flag is currently registered
flags &= ~flag
if flags:
self.flags[id(sock)] = flags
self.poller.register(sock, flags)
else:
del self.flags[id(sock)]
self.poller.unregister(sock)
del objmap[id(sock)]
# Additively register sock for reading or writing
def _register(self, sock, flag):
if id(sock) in self.flags:
self.flags[id(sock)] |= flag
else:
self.flags[id(sock)] = flag
self.poller.register(sock, self.flags[id(sock)])
def add_reader(self, sock, cb, *args):
if DEBUG and __debug__:
log.debug("add_reader%s", (sock, cb, args))
self._register(sock, select.POLLIN)
if args:
self.rdobjmap[id(sock)] = (cb, args)
else:
self.rdobjmap[id(sock)] = cb
def remove_reader(self, sock):
if DEBUG and __debug__:
log.debug("remove_reader(%s)", sock)
self._unregister(sock, self.rdobjmap, select.POLLIN)
def add_writer(self, sock, cb, *args):
if DEBUG and __debug__:
log.debug("add_writer%s", (sock, cb, args))
self._register(sock, select.POLLOUT)
if args:
self.wrobjmap[id(sock)] = (cb, args)
else:
self.wrobjmap[id(sock)] = cb
def remove_writer(self, sock):
if DEBUG and __debug__:
log.debug("remove_writer(%s)", sock)
self._unregister(sock, self.wrobjmap, select.POLLOUT)
def wait(self, delay):
if DEBUG and __debug__:
log.debug("poll.wait(%d)", delay)
# We need one-shot behavior (second arg of 1 to .poll())
res = self.poller.ipoll(delay, 1)
#log.debug("poll result: %s", res)
for sock, ev in res:
if ev & select.POLLOUT:
cb = self.wrobjmap[id(sock)]
# Test code. Invalidate objmap: this ensures an exception is thrown
# rather than exhibiting weird behaviour when testing.
self.wrobjmap[id(sock)] = None # TEST
if DEBUG and __debug__:
log.debug("Calling IO callback: %r", cb)
if isinstance(cb, tuple):
cb[0](*cb[1])
else:
cb.pend_throw(None)
self.call_soon(cb)
if ev & select.POLLIN:
cb = self.rdobjmap[id(sock)]
self.rdobjmap[id(sock)] = None # TEST
if ev & (select.POLLHUP | select.POLLERR):
# These events are returned even if not requested, and
# are sticky, i.e. will be returned again and again.
# If the caller doesn't do proper error handling and
# unregister this sock, we'll busy-loop on it, so we
# as well can unregister it now "just in case".
self.remove_reader(sock)
if DEBUG and __debug__:
log.debug("Calling IO callback: %r", cb)
if isinstance(cb, tuple):
cb[0](*cb[1])
else:
cb.pend_throw(None)
self.call_soon(cb)
class StreamReader:
def __init__(self, polls, ios=None):
if ios is None:
ios = polls
self.polls = polls
self.ios = ios
def read(self, n=-1):
while True:
yield IORead(self.polls)
res = self.ios.read(n)
if res is not None:
break
# This should not happen for real sockets, but can easily
# happen for stream wrappers (ssl, websockets, etc.)
#log.warn("Empty read")
yield IOReadDone(self.polls)
return res
def readexactly(self, n):
buf = b""
while n:
yield IORead(self.polls)
res = self.ios.read(n)
assert res is not None
if not res:
break
buf += res
n -= len(res)
yield IOReadDone(self.polls)
return buf
def readline(self):
if DEBUG and __debug__:
log.debug("StreamReader.readline()")
buf = b""
while True:
yield IORead(self.polls)
res = self.ios.readline()
assert res is not None
if not res:
break
buf += res
if buf[-1] == 0x0a:
break
if DEBUG and __debug__:
log.debug("StreamReader.readline(): %s", buf)
yield IOReadDone(self.polls)
return buf
def aclose(self):
yield IOReadDone(self.polls)
self.ios.close()
def __repr__(self):
return "<StreamReader %r %r>" % (self.polls, self.ios)
class StreamWriter:
def __init__(self, s, extra):
self.s = s
self.extra = extra
def awrite(self, buf, off=0, sz=-1):
# This method is called awrite (async write) to not proliferate
# incompatibility with original asyncio. Unlike original asyncio
# whose .write() method is both not a coroutine and guaranteed
# to return immediately (which means it has to buffer all the
# data), this method is a coroutine.
if sz == -1:
sz = len(buf) - off
if DEBUG and __debug__:
log.debug("StreamWriter.awrite(): spooling %d bytes", sz)
while True:
res = self.s.write(buf, off, sz)
# If we spooled everything, return immediately
if res == sz:
if DEBUG and __debug__:
log.debug("StreamWriter.awrite(): completed spooling %d bytes", res)
yield IOWriteDone(self.s)
return
if res is None:
res = 0
if DEBUG and __debug__:
log.debug("StreamWriter.awrite(): spooled partial %d bytes", res)
assert res < sz
off += res
sz -= res
yield IOWrite(self.s)
#assert s2.fileno() == self.s.fileno()
if DEBUG and __debug__:
log.debug("StreamWriter.awrite(): can write more")
# Write piecewise content from iterable (usually, a generator)
def awriteiter(self, iterable):
for buf in iterable:
yield from self.awrite(buf)
def aclose(self):
yield IOWriteDone(self.s)
self.s.close()
def get_extra_info(self, name, default=None):
return self.extra.get(name, default)
def __repr__(self):
return "<StreamWriter %r>" % self.s
def open_connection(host, port, ssl=False):
if DEBUG and __debug__:
log.debug("open_connection(%s, %s)", host, port)
ai = _socket.getaddrinfo(host, port, 0, _socket.SOCK_STREAM)
ai = ai[0]
s = _socket.socket(ai[0], ai[1], ai[2])
s.setblocking(False)
try:
s.connect(ai[-1])
except OSError as e:
if e.args[0] != uerrno.EINPROGRESS:
raise
if DEBUG and __debug__:
log.debug("open_connection: After connect")
yield IOWrite(s)
# if __debug__:
# assert s2.fileno() == s.fileno()
if DEBUG and __debug__:
log.debug("open_connection: After iowait: %s", s)
if ssl:
print("Warning: uasyncio SSL support is alpha")
import ussl
s.setblocking(True)
s2 = ussl.wrap_socket(s)
s.setblocking(False)
return StreamReader(s, s2), StreamWriter(s2, {})
return StreamReader(s), StreamWriter(s, {})
def start_server(client_coro, host, port, backlog=10):
if DEBUG and __debug__:
log.debug("start_server(%s, %s)", host, port)
ai = _socket.getaddrinfo(host, port, 0, _socket.SOCK_STREAM)
ai = ai[0]
s = _socket.socket(ai[0], ai[1], ai[2])
s.setblocking(False)
s.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1)
s.bind(ai[-1])
s.listen(backlog)
while True:
if DEBUG and __debug__:
log.debug("start_server: Before accept")
yield IORead(s)
if DEBUG and __debug__:
log.debug("start_server: After iowait")
s2, client_addr = s.accept()
s2.setblocking(False)
if DEBUG and __debug__:
log.debug("start_server: After accept: %s", s2)
extra = {"peername": client_addr}
yield client_coro(StreamReader(s2), StreamWriter(s2, extra))
import uasyncio.core
uasyncio.core._event_loop_class = PollEventLoop

Wyświetl plik

@ -0,0 +1,34 @@
# ms_timer.py A relatively high precision delay class for the fast_io version
# of uasyncio
import uasyncio as asyncio
import utime
import io
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
class MillisecTimer(io.IOBase):
def __init__(self, fast=True):
self.end = 0
self.sreader = asyncio.StreamReader(self)
self.sreader.priority(fast)
def __iter__(self):
await self.sreader.readline()
def __call__(self, ms):
self.end = utime.ticks_add(utime.ticks_ms(), ms)
return self
def readline(self):
return b'\n'
def ioctl(self, req, arg):
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
if utime.ticks_diff(utime.ticks_ms(), self.end) >= 0:
ret |= MP_STREAM_POLL_RD
return ret

Wyświetl plik

@ -0,0 +1,42 @@
# ms_timer_test.py Test/demo program for MillisecTimer. Adapted for new uasyncio.
import uasyncio as asyncio
import utime
import ms_timer
async def timer_test(n, fast):
timer = ms_timer.MillisecTimer(fast)
while True:
t = utime.ticks_ms()
await timer(30)
print('Task {} time {}ms'.format(n, utime.ticks_diff(utime.ticks_ms(), t)))
await asyncio.sleep(0.5 + n/5)
async def foo():
while True:
await asyncio.sleep(0)
utime.sleep_ms(10) # Emulate slow processing
def main(fast=True):
for _ in range(10):
asyncio.create_task(foo())
for n in range(3):
asyncio.create_task(timer_test(n, fast))
await asyncio.sleep(10)
def test(fast=True):
asyncio.run(main(fast))
s = '''This test creates ten tasks each of which blocks for 10ms.
It also creates three tasks each of which runs a MillisecTimer for 30ms,
timing the period which elapses while it runs. With fast I/O scheduling
the elapsed time is ~30ms as expected. With normal scheduling it is
about 130ms because of competetion from the blocking coros.
Run test() to test fast I/O, test(False) to test normal I/O.
Test prints the task number followed by the actual elapsed time in ms.
Test runs for 10s.'''
print(s)

Wyświetl plik

@ -0,0 +1,421 @@
# prim_test.py Test/demo of the 'micro' synchronisation primitives
# for the new uasyncio
# The MIT License (MIT)
#
# Copyright (c) 2017-2019 Peter Hinch
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import uasyncio as asyncio
import uasyncio.lock
import uasyncio.event
import uasyncio.barrier
import uasyncio.semaphore
import uasyncio.condition
from uasyncio.queue import Queue # Name collision in __init__.py
def print_tests():
st = '''Available functions:
print_tests() Print this list.
ack_test() Test event acknowledge and Message class.
message_test() Test Message class.
event_test() Test Event and Lock objects.
barrier_test() Test the Barrier class.
semaphore_test(bounded=False) Test Semaphore or BoundedSemaphore.
condition_test() Test the Condition class.
queue_test() Test the Queue class
Recommended to issue ctrl-D after running each test.
'''
print('\x1b[32m')
print(st)
print('\x1b[39m')
print_tests()
def printexp(exp, runtime=0):
print('Expected output:')
print('\x1b[32m')
print(exp)
print('\x1b[39m')
if runtime:
print('Running (runtime = {}s):'.format(runtime))
else:
print('Running (runtime < 1s):')
# ************ Test Message class ************
# Demo use of acknowledge event
async def event_wait(message, ack_event, n):
await message
print('Eventwait {} got message with value {}'.format(n, message.value()))
ack_event.set()
async def run_ack():
message = asyncio.Message()
ack1 = asyncio.Event()
ack2 = asyncio.Event()
count = 0
while True:
asyncio.create_task(event_wait(message, ack1, 1))
asyncio.create_task(event_wait(message, ack2, 2))
message.set(count)
count += 1
print('message was set')
await ack1.wait()
ack1.clear()
print('Cleared ack1')
await ack2.wait()
ack2.clear()
print('Cleared ack2')
message.clear()
print('Cleared message')
await asyncio.sleep(1)
async def ack_coro(delay):
asyncio.create_task(run_ack())
await asyncio.sleep(delay)
print("I've seen attack ships burn on the shoulder of Orion...")
print("Time to die...")
def ack_test():
printexp('''Running (runtime = 10s):
message was set
Eventwait 1 got message with value 0
Eventwait 2 got message with value 0
Cleared ack1
Cleared ack2
Cleared message
message was set
Eventwait 1 got message with value 1
Eventwait 2 got message with value 1
Cleared ack1
Cleared ack2
Cleared message
message was set
... text omitted ...
Eventwait 1 got message with value 9
Eventwait 2 got message with value 9
Cleared ack1
Cleared ack2
Cleared message
I've seen attack ships burn on the shoulder of Orion...
Time to die...
''', 10)
asyncio.run(ack_coro(10))
# ************ Test Message class ************
async def wait_message(message):
print('Waiting for message')
msg = await message
message.clear()
print('Got message {}'.format(msg))
async def run_message_test():
message = asyncio.Message()
asyncio.create_task(wait_message(message))
await asyncio.sleep(1)
message.set('Hello world')
await asyncio.sleep(1)
def message_test():
printexp('''Running (runtime = 2s):
Waiting for message
Got message Hello world
''', 2)
asyncio.run(run_message_test())
# ************ Test Lock and Event classes ************
async def run_lock(n, lock):
print('run_lock {} waiting for lock'.format(n))
await lock.acquire()
print('run_lock {} acquired lock'.format(n))
await asyncio.sleep(1) # Delay to demo other coros waiting for lock
lock.release()
print('run_lock {} released lock'.format(n))
async def eventset(event):
print('Waiting 5 secs before setting event')
await asyncio.sleep(5)
event.set()
print('event was set')
async def eventwait(event):
print('waiting for event')
await event.wait()
print('got event')
event.clear()
async def run_event_test():
print('Test Lock class')
lock = asyncio.Lock()
asyncio.create_task(run_lock(1, lock))
asyncio.create_task(run_lock(2, lock))
asyncio.create_task(run_lock(3, lock))
print('Test Event class')
event = asyncio.Event()
asyncio.create_task(eventset(event))
await eventwait(event) # run_event_test runs fast until this point
print('Event status {}'.format('Incorrect' if event.is_set() else 'OK'))
print('Tasks complete')
def event_test():
printexp('''Test Lock class
Test Event class
waiting for event
run_lock 1 waiting for lock
run_lock 1 acquired lock
run_lock 2 waiting for lock
run_lock 3 waiting for lock
Waiting 5 secs before setting event
run_lock 1 released lock
run_lock 2 acquired lock
run_lock 2 released lock
run_lock 3 acquired lock
run_lock 3 released lock
event was set
got event
Event status OK
Tasks complete
''', 5)
asyncio.run(run_event_test())
# ************ Barrier test ************
async def killer(duration):
await asyncio.sleep(duration)
def callback(text):
print(text)
async def report(barrier):
for i in range(5):
print('{} '.format(i), end='')
await barrier
def barrier_test():
printexp('''0 0 0 Synch
1 1 1 Synch
2 2 2 Synch
3 3 3 Synch
4 4 4 Synch
''')
barrier = asyncio.Barrier(3, callback, ('Synch',))
for _ in range(3):
asyncio.create_task(report(barrier))
asyncio.run(killer(2))
# ************ Semaphore test ************
async def run_sema(n, sema, barrier):
print('run_sema {} trying to access semaphore'.format(n))
async with sema:
print('run_sema {} acquired semaphore'.format(n))
# Delay demonstrates other coros waiting for semaphore
await asyncio.sleep(1 + n/10) # n/10 ensures deterministic printout
print('run_sema {} has released semaphore'.format(n))
barrier.trigger()
async def run_sema_test(bounded):
num_coros = 5
barrier = asyncio.Barrier(num_coros + 1)
if bounded:
semaphore = asyncio.BoundedSemaphore(3)
else:
semaphore = asyncio.Semaphore(3)
for n in range(num_coros):
asyncio.create_task(run_sema(n, semaphore, barrier))
await barrier # Quit when all coros complete
try:
semaphore.release()
except ValueError:
print('Bounded semaphore exception test OK')
def semaphore_test(bounded=False):
if bounded:
exp = '''run_sema 0 trying to access semaphore
run_sema 0 acquired semaphore
run_sema 1 trying to access semaphore
run_sema 1 acquired semaphore
run_sema 2 trying to access semaphore
run_sema 2 acquired semaphore
run_sema 3 trying to access semaphore
run_sema 4 trying to access semaphore
run_sema 0 has released semaphore
run_sema 4 acquired semaphore
run_sema 1 has released semaphore
run_sema 3 acquired semaphore
run_sema 2 has released semaphore
run_sema 4 has released semaphore
run_sema 3 has released semaphore
Bounded semaphore exception test OK
Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
else:
exp = '''run_sema 0 trying to access semaphore
run_sema 0 acquired semaphore
run_sema 1 trying to access semaphore
run_sema 1 acquired semaphore
run_sema 2 trying to access semaphore
run_sema 2 acquired semaphore
run_sema 3 trying to access semaphore
run_sema 4 trying to access semaphore
run_sema 0 has released semaphore
run_sema 3 acquired semaphore
run_sema 1 has released semaphore
run_sema 4 acquired semaphore
run_sema 2 has released semaphore
run_sema 3 has released semaphore
run_sema 4 has released semaphore
Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
printexp(exp, 3)
asyncio.run(run_sema_test(bounded))
# ************ Condition test ************
tim = 0
async def cond01():
while True:
await asyncio.sleep(2)
with await cond:
cond.notify(2) # Notify 2 tasks
async def cond03(): # Maintain a count of seconds
global tim
await asyncio.sleep(0.5)
while True:
await asyncio.sleep(1)
tim += 1
async def cond01_new(cond):
while True:
await asyncio.sleep(2)
async with cond:
cond.notify(2) # Notify 2 tasks
async def cond03_new(): # Maintain a count of seconds
global tim
await asyncio.sleep(0.5)
while True:
await asyncio.sleep(1)
tim += 1
async def cond02(n, cond, barrier):
async with cond:
print('cond02', n, 'Awaiting notification.')
await cond.wait()
print('cond02', n, 'triggered. tim =', tim)
barrier.trigger()
def predicate():
return tim >= 8 # 12
async def cond04(n, cond, barrier):
async with cond:
print('cond04', n, 'Awaiting notification and predicate.')
await cond.wait_for(predicate)
print('cond04', n, 'triggered. tim =', tim)
barrier.trigger()
async def cond_go():
cond = asyncio.Condition()
ntasks = 7
barrier = asyncio.Barrier(ntasks + 1)
t1 = asyncio.create_task(cond01_new(cond))
t3 = asyncio.create_task(cond03_new())
for n in range(ntasks):
asyncio.create_task(cond02(n, cond, barrier))
await barrier # All instances of cond02 have completed
# Test wait_for
barrier = asyncio.Barrier(2)
asyncio.create_task(cond04(99, cond, barrier))
await barrier
# cancel continuously running coros.
t1.cancel()
t3.cancel()
await asyncio.sleep_ms(0)
print('Done.')
def condition_test():
printexp('''cond02 0 Awaiting notification.
cond02 1 Awaiting notification.
cond02 2 Awaiting notification.
cond02 3 Awaiting notification.
cond02 4 Awaiting notification.
cond02 5 Awaiting notification.
cond02 6 Awaiting notification.
cond02 5 triggered. tim = 1
cond02 6 triggered. tim = 1
cond02 3 triggered. tim = 3
cond02 4 triggered. tim = 3
cond02 1 triggered. tim = 5
cond02 2 triggered. tim = 5
cond02 0 triggered. tim = 7
cond04 99 Awaiting notification and predicate.
cond04 99 triggered. tim = 9
Done.
''', 13)
asyncio.run(cond_go())
# ************ Queue test ************
async def fillq(myq):
for x in range(8):
print('Waiting to put item {} on queue'.format(x))
await myq.put(x)
async def mtq(myq):
await asyncio.sleep(1) # let q fill
while myq.qsize():
res = await myq.get()
print('Retrieved {} from queue'.format(res))
await asyncio.sleep(0.2)
async def queue_go():
myq = Queue(5)
asyncio.create_task(fillq(myq))
await mtq(myq)
def queue_test():
printexp('''Running (runtime = 3s):
Waiting to put item 0 on queue
Waiting to put item 1 on queue
Waiting to put item 2 on queue
Waiting to put item 3 on queue
Waiting to put item 4 on queue
Waiting to put item 5 on queue
Retrieved 0 from queue
Waiting to put item 6 on queue
Retrieved 1 from queue
Waiting to put item 7 on queue
Retrieved 2 from queue
Retrieved 3 from queue
Retrieved 4 from queue
Retrieved 5 from queue
Retrieved 6 from queue
Retrieved 7 from queue
''', 3)
asyncio.run(queue_go())

Wyświetl plik

@ -6,10 +6,12 @@ MIT license; Copyright (c) 2019 Damien P. George
from time import ticks_ms as ticks, ticks_diff, ticks_add
import sys, select
################################################################################
# Queue class
type_genf = type((lambda: (yield))) # Type of a generator function upy iss #3241
class Queue:
################################################################################
# Task Queue class renamed to avoid conflict with Queue class
class TQueue:
def __init__(self):
self.next = None
self.last = None
@ -85,7 +87,7 @@ class Task:
def __iter__(self):
if not hasattr(self, 'waiting'):
# Lazily allocated head of linked list of Tasks waiting on completion of this task
self.waiting = Queue()
self.waiting = TQueue()
return self
def send(self, v):
if not self.coro:
@ -106,14 +108,15 @@ class Task:
if hasattr(self.data, 'waiting'):
self.data.waiting.remove(self)
else:
_queue.remove(self)
_queue.push_error(self, CancelledError)
tqueue.remove(self)
tqueue.push_error(self, CancelledError)
return True
# Create and schedule a new task from a coroutine
def create_task(coro):
assert not isinstance(coro, type_genf), 'Coroutine arg expected.' # upy issue #3241
t = Task(coro)
_queue.push_head(t)
tqueue.push_head(t)
return t
# "Yield" once, then raise StopIteration
@ -134,7 +137,7 @@ class SingletonGenerator:
# Pause task execution for the given time (integer in milliseconds, uPy extension)
# Use a SingletonGenerator to do it without allocating on the heap
def sleep_ms(t, sgen=SingletonGenerator()):
_queue.push_sorted(cur_task, ticks_add(ticks(), t))
tqueue.push_sorted(cur_task, ticks_add(ticks(), t))
sgen.state = 1
return sgen
@ -165,7 +168,7 @@ async def wait_for(aw, timeout):
# Ignore CancelledError from aw, it's probably due to timeout
pass
finally:
_queue.remove(cancel_task)
tqueue.remove(cancel_task)
if cancel_task.coro is None:
# Cancel task ran to completion, ie there was a timeout
raise TimeoutError
@ -191,72 +194,6 @@ async def gather(*aws, return_exceptions=False):
raise er
return ts
################################################################################
# Lock (optional component)
# Lock class for primitive mutex capability
class Lock:
def __init__(self):
self.state = 0 # 0=unlocked; 1=unlocked but waiting task pending resume; 2=locked
self.waiting = Queue() # Queue of Tasks waiting to acquire this Lock
def locked(self):
return self.state == 2
def release(self):
if self.state != 2:
raise RuntimeError
if self.waiting.next:
# Task(s) waiting on lock, schedule first Task
_queue.push_head(self.waiting.pop_head())
self.state = 1
else:
# No Task waiting so unlock
self.state = 0
async def acquire(self):
if self.state != 0 or self.waiting.next:
# Lock unavailable, put the calling Task on the waiting queue
self.waiting.push_head(cur_task)
# Set calling task's data to double-link it
cur_task.data = self
try:
yield
except CancelledError:
if self.state == 1:
# Cancelled while pending on resume, schedule next waiting Task
self.state = 2
self.release()
raise
# Lock available, set it as locked
self.state = 2
return True
async def __aenter__(self):
return await self.acquire()
async def __aexit__(self, exc_type, exc, tb):
return self.release()
################################################################################
# Event (optional component)
# Event class for primitive events that can be waited on, set, and cleared
class Event:
def __init__(self):
self.state = 0 # 0=unset; 1=set
self.waiting = Queue() # Queue of Tasks waiting on completion of this event
def set(self):
# Event becomes set, schedule any tasks waiting on it
while self.waiting.next:
_queue.push_head(self.waiting.pop_head())
self.state = 1
def clear(self):
self.state = 0
async def wait(self):
if self.state == 0:
# Event not set, put the calling task on the event's waiting queue
self.waiting.push_head(cur_task)
# Set calling task's data to this event that it waits on, to double-link it
cur_task.data = self
yield
return True
################################################################################
# General streams
@ -308,15 +245,15 @@ class IOQueue:
#print('poll', s, sm, ev, err)
if ev & select.POLLIN or (err and sm[0] is not None):
if fast:
_queue.push_priority(sm[0])
tqueue.push_priority(sm[0])
else:
_queue.push_head(sm[0])
tqueue.push_head(sm[0])
sm[0] = None
if ev & select.POLLOUT or (err and sm[1] is not None):
if fast:
_queue.push_priority(sm[0])
tqueue.push_priority(sm[1])
else:
_queue.push_head(sm[0])
tqueue.push_head(sm[1])
sm[1] = None
if sm[0] is None and sm[1] is None:
self._dequeue(s)
@ -434,7 +371,7 @@ async def start_server(cb, host, port, backlog=5):
# Main run loop
# Queue of Task instances
_queue = Queue()
tqueue = TQueue()
# Task queue and poller for stream IO
_io_queue = IOQueue()
@ -445,15 +382,15 @@ def run_until_complete(main_task=None):
excs_all = (CancelledError, Exception) # To prevent heap allocation in loop
excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop
while True:
# Wait until the head of _queue is ready to run
# Wait until the head of tqueue is ready to run
dt = 1
while dt > 0:
dt = -1
if _queue.next:
# A task waiting on _queue
if isinstance(_queue.next.data, int):
if tqueue.next:
# A task waiting on tqueue
if isinstance(tqueue.next.data, int):
# "data" is time to schedule task at
dt = max(0, ticks_diff(_queue.next.data, ticks()))
dt = max(0, ticks_diff(tqueue.next.data, ticks()))
else:
# "data" is an exception to throw into the task
dt = 0
@ -464,7 +401,7 @@ def run_until_complete(main_task=None):
_io_queue.wait_io_event(dt)
# Get next task to run and continue it
t = _queue.pop_head()
t = tqueue.pop_head()
cur_task = t
try:
# Continue running the coroutine, it's responsible for rescheduling itself
@ -482,7 +419,7 @@ def run_until_complete(main_task=None):
waiting = False
if hasattr(t, 'waiting'):
while t.waiting.next:
_queue.push_head(t.waiting.pop_head())
tqueue.push_head(t.waiting.pop_head())
waiting = True
t.waiting = None # Free waiting queue head
_io_queue.remove(t) # Remove task from the IO queue (if it's on it)
@ -492,6 +429,9 @@ def run_until_complete(main_task=None):
print('task raised exception:', t.coro)
sys.print_exception(er)
StreamReader = Stream
StreamWriter = Stream # CPython 3.8 compatibility
################################################################################
# Legacy uasyncio compatibility
@ -508,9 +448,6 @@ Stream.aclose = Stream.wait_closed
Stream.awrite = stream_awrite
Stream.awritestr = stream_awrite # TODO explicitly convert to bytes?
StreamReader = Stream
StreamWriter = Stream
class Loop:
def create_task(self, coro):
return create_task(coro)
@ -524,3 +461,5 @@ class Loop:
def get_event_loop(runq_len=0, waitq_len=0):
return Loop()
version = (3, 0, 0)

Wyświetl plik

@ -0,0 +1,76 @@
# barrier.py
# A Barrier synchronises N coros. Each issues await barrier.
# Execution pauses until all other participant coros are waiting on it.
# At that point the callback is executed. Then the barrier is 'opened' and
# execution of all participants resumes.
import uasyncio
async def _g():
pass
type_coro = type(_g())
# If a callback is passed, run it and return.
# If a coro is passed initiate it and return.
# coros are passed by name i.e. not using function call syntax.
def launch(func, tup_args):
res = func(*tup_args)
if isinstance(res, type_coro):
uasyncio.create_task(res)
class Barrier():
def __init__(self, participants, func=None, args=()):
self._participants = participants
self._func = func
self._args = args
self.waiting = uasyncio.TQueue() # Linked list of Tasks waiting on completion of barrier
self._reset(True)
def trigger(self):
self._update()
if self._at_limit(): # All other coros are also at limit
if self._func is not None:
launch(self._func, self._args)
self._reset(not self._down) # Toggle direction and release others
while self.waiting.next:
uasyncio.tqueue.push_head(self.waiting.pop_head())
def __iter__(self):
self._update()
if self._at_limit(): # All other coros are also at limit
if self._func is not None:
launch(self._func, self._args)
self._reset(not self._down) # Toggle direction and release others
while self.waiting.next:
uasyncio.tqueue.push_head(self.waiting.pop_head())
return
direction = self._down
# Other tasks have not reached barrier, put the calling task on the barrier's waiting queue
self.waiting.push_head(uasyncio.cur_task)
# Set calling task's data to this barrier that it waits on, to double-link it
uasyncio.cur_task.data = self
yield
def _reset(self, down):
self._down = down
self._count = self._participants if down else 0
def busy(self):
if self._down:
done = self._count == self._participants
else:
done = self._count == 0
return not done
def _at_limit(self): # Has count reached up or down limit?
limit = 0 if self._down else self._participants
return self._count == limit
def _update(self):
self._count += -1 if self._down else 1
if self._count < 0 or self._count > self._participants:
raise ValueError('Too many tasks accessing Barrier')
uasyncio.Barrier = Barrier

Wyświetl plik

@ -0,0 +1,60 @@
import uasyncio
import uasyncio.lock
import uasyncio.event
class Condition():
def __init__(self, lock=None):
self.lock = uasyncio.Lock() if lock is None else lock
self.events = []
async def acquire(self):
await self.lock.acquire()
# enable this syntax:
# with await condition [as cond]:
#def __iter__(self):
#await self.lock.acquire()
#return self
async def __aenter__(self):
await self.lock.acquire()
return self
async def __aexit__(self, *_):
self.lock.release()
def locked(self):
return self.lock.locked()
def release(self):
self.lock.release() # Will raise RuntimeError if not locked
def notify(self, n=1): # Caller controls lock
if not self.lock.locked():
raise RuntimeError('Condition notify with lock not acquired.')
for _ in range(min(n, len(self.events))):
ev = self.events.pop()
ev.set()
def notify_all(self):
self.notify(len(self.events))
async def wait(self):
if not self.lock.locked():
raise RuntimeError('Condition wait with lock not acquired.')
ev = uasyncio.Event()
self.events.append(ev)
self.lock.release()
await ev.wait()
await self.lock.acquire()
assert ev not in self.events, 'condition wait assertion fail'
return True # CPython compatibility
async def wait_for(self, predicate):
result = predicate()
while not result:
await self.wait()
result = predicate()
return result
uasyncio.Condition = Condition

Wyświetl plik

@ -0,0 +1,51 @@
import uasyncio
# Event class for primitive events that can be waited on, set, and cleared
class Event:
def __init__(self):
self.state = 0 # 0=unset; 1=set
self.waiting = uasyncio.TQueue() # Queue of Tasks waiting on completion of this event
def set(self):
# Event becomes set, schedule any tasks waiting on it
while self.waiting.next:
uasyncio.tqueue.push_head(self.waiting.pop_head())
self.state = 1
def clear(self):
self.state = 0
def is_set(self):
return self.state # CPython compatibility
async def wait(self):
if self.state == 0:
# Event not set, put the calling task on the event's waiting queue
self.waiting.push_head(uasyncio.cur_task)
# Set calling task's data to this event that it waits on, to double-link it
uasyncio.cur_task.data = self
yield
return True
uasyncio.Event = Event
# A coro waiting on a message issues msg = await Message_instance
# A coro rasing the message issues event.set(msg)
# When all waiting coros have run
# Message.clear() should be issued
class Message(uasyncio.Event):
def __init__(self, delay_ms=0):
super().__init__()
self._data = None
def clear(self):
super().clear()
def __iter__(self):
await self.wait()
return self._data
def set(self, data=None):
super().set()
self._data = data
def value(self):
return self._data
uasyncio.Message = Message

Wyświetl plik

@ -0,0 +1,58 @@
import uasyncio
################################################################################
# Lock (optional component)
# Lock class for primitive mutex capability
import uasyncio
class Lock:
def __init__(self):
self._locked = False
self.waiting = uasyncio.TQueue() # Linked list of Tasks waiting on completion of this event
self._awt = None # task that is going to acquire the lock. Needed to prevent race
# condition between pushing the next waiting task and the task actually acquiring
# the lock because during that time another newly started task could acquire the
# lock out-of-order instead of being pushed to the waiting list.
# Also needed to not release another waiting Task if multiple Tasks are cancelled.
async def acquire(self):
if self._locked or self._awt:
# Lock set or just released but has tasks waiting on it,
# put the calling task on the Lock's waiting queue and yield
self.waiting.push_head(uasyncio.cur_task)
uasyncio.cur_task.data = self
try:
yield
except uasyncio.CancelledError:
if self._awt is uasyncio.cur_task:
# Task that was going to acquire got cancelled after being scheduled.
# Schedule next waiting task
self._locked = True
self.release()
raise
self._locked = True
return True
async def __aenter__(self):
await self.acquire()
return self
def locked(self):
return self._locked
def release(self):
if not self._locked:
raise RuntimeError("Lock is not acquired.")
self._locked = False
self._awt = self.waiting.next # Task which will get lock
if self.waiting.next:
# Lock becomes available, schedule next task waiting on it
uasyncio.tqueue.push_head(self.waiting.pop_head())
async def __aexit__(self, *args):
return self.release()
uasyncio.Lock = Lock

Wyświetl plik

@ -0,0 +1,101 @@
# queue.py: adapted from uasyncio V2
from ucollections import deque
import uasyncio
# Exception raised by get_nowait().
class QueueEmpty(Exception):
pass
# Exception raised by put_nowait().
class QueueFull(Exception):
pass
# A queue, useful for coordinating producer and consumer coroutines.
# If maxsize is less than or equal to zero, the queue size is infinite. If it
# is an integer greater than 0, then "await put()" will block when the
# queue reaches maxsize, until an item is removed by get().
# Unlike the standard library Queue, you can reliably know this Queue's size
# with qsize(), since your single-threaded uasyncio application won't be
# interrupted between calling qsize() and doing an operation on the Queue.
class Queue:
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._queue = deque((), maxsize)
self.p_tasks = uasyncio.TQueue() # Queue of Tasks waiting to put to queue
self.g_tasks = uasyncio.TQueue() # Queue of Tasks waiting to get from queue
def _get(self):
return self._queue.popleft()
async def get(self): # Usage: item = await queue.get()
if not self._queue:
# Queue is empty, put the calling Task on the waiting queue
task = uasyncio.cur_task
self.g_tasks.push_head(task)
# Set calling task's data to double-link it
task.data = self
try:
yield
except asyncio.CancelledError:
self.g_tasks.remove(task)
raise
if self.p_tasks.next:
# Task(s) waiting to put on queue, schedule first Task
uasyncio.tqueue.push_head(self.p_tasks.pop_head())
return self._get()
def get_nowait(self): # Remove and return an item from the queue.
# Return an item if one is immediately available, else raise QueueEmpty.
if not self._queue:
raise QueueEmpty()
return self._get()
def _put(self, val):
self._queue.append(val)
async def put(self, val): # Usage: await queue.put(item)
if self.qsize() >= self.maxsize and self.maxsize:
# Queue full, put the calling Task on the waiting queue
task = uasyncio.cur_task
self.p_tasks.push_head(task)
# Set calling task's data to double-link it
uasyncio.cur_task.data = self
try:
yield
except asyncio.CancelledError:
self.p_tasks.remove(task)
raise
if self.g_tasks.next:
# Task(s) waiting to get from queue, schedule first Task
uasyncio.tqueue.push_head(self.g_tasks.pop_head())
self._put(val)
def put_nowait(self, val): # Put an item into the queue without blocking.
if self.qsize() >= self.maxsize and self.maxsize:
raise QueueFull()
self._put(val)
def qsize(self): # Number of items in the queue.
return len(self._queue)
def empty(self): # Return True if the queue is empty, False otherwise.
return not self._queue
def full(self): # Return True if there are maxsize items in the queue.
# Note: if the Queue was initialized with maxsize=0 (the default),
# then full() is never True.
if self.maxsize <= 0:
return False
else:
return self.qsize() >= self.maxsize
# Name collision fixed
uasyncio.Queue = Queue

Wyświetl plik

@ -0,0 +1,45 @@
# semaphore.py
import uasyncio
class Semaphore():
def __init__(self, value=1):
self._count = value
self.waiting = uasyncio.TQueue() # Linked list of Tasks waiting on completion of this event
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exc_type, exc, tb):
self.release()
async def acquire(self):
if self._count == 0:
# Semaphore unavailable, put the calling Task on the waiting queue
self.waiting.push_head(uasyncio.cur_task)
# Set calling task's data to double-link it
uasyncio.cur_task.data = self
yield
self._count -= 1
def release(self):
self._count += 1
if self.waiting.next:
# Task(s) waiting on semaphore, schedule first Task
uasyncio.tqueue.push_head(self.waiting.pop_head())
class BoundedSemaphore(Semaphore):
def __init__(self, value=1):
super().__init__(value)
self._initial_value = value
def release(self):
if self._count < self._initial_value:
super().release()
else:
raise ValueError('Semaphore released more than acquired')
uasyncio.Semaphore = Semaphore
uasyncio.BoundedSemaphore = BoundedSemaphore