diff --git a/uasyncio_iostream/README.md b/uasyncio_iostream/README.md index 6ac839e..1cfad6e 100644 --- a/uasyncio_iostream/README.md +++ b/uasyncio_iostream/README.md @@ -1,34 +1,40 @@ # Changes to usayncio +This archive contains suggestions for changes to new `uasyncio`: + 1. Implement as a Python package. 2. Implement synchronisation primitives as package modules to conserve RAM. - 3. Add .priority method to Stream class. Enables I/O to be handled at high + 3. Add `.priority` method to `Stream` class. Enables I/O to be handled at high priority on a per-device basis. - 4. Rename task queue class TQueue to avoid name clash with Queue primitive. - 5. Rename task queue instance to tqueue as it is used by primitives. + 4. Rename task queue class `TQueue` to avoid name clash with Queue primitive. + 5. Rename task queue instance to `tqueue` as it is used by primitives. ## Minor changes - 1. Move StreamReader and StreamWriter assignments out of legacy section of code. - 2. CreateTask produces an assertion fail if called with a generator function. + 1. Move `StreamReader` and `StreamWriter` assignments out of legacy section of + code: these classes exist in `asyncio` 3.8. + 2. `.CreateTask` produces an assertion fail if called with a generator function. Avoids obscure traceback if someone omits the parens. - 3. Add machine readable version. + 3. Add machine readable version info. Useful in testing. # CPython-compatible synchronisation primitives -These have been adapted to work efficiently with the new version. +The ones I implemented are adapted to work efficiently with the new version. +All are separate modules to conserve RAM. - 1. `Event`: moved to separate module for consistency with other primitives. + 1. `Event`: just moved to separate module. 2. `Lock`: Kevin Köck's solution. 3. `Queue`: Paul's solution adapted for efficiency. - 4. `Semaphore`: Also implements BoundedSemaphore. + 4. `Semaphore`: Also implements `BoundedSemaphore`. 5. `Condition`. # Other primitives +Included as examples of user-contributed primitives. + 1. `Message`: Awaitable `Event` subclass with a data payload. - 2. `Barrier`: Multiple tasks wait until all reach a Barrier instance. Or some tasks - wait until others have triggered the Barrier instance. + 2. `Barrier`: Multiple tasks wait until all reach a Barrier instance. Or some + tasks wait until others have triggered the Barrier instance. # Test scripts @@ -40,6 +46,18 @@ Hopefully these are self-documenting on import. 3. `ms_timer.py` and `ms_timer_test.py` A practical use of priority scheduling to implement a timer with higher precision than `asyncio.sleep_ms`. Runs on Pyboard. -# Note +# CPython compatibility -Use of I/O is still incompatible with Unix. +`prim_test.py` runs on MicroPython or CPython 3.8, demonstrating that MicroPython +primitives behave similarly to the native CPython ones. + +`Message` is common to CPython and MicroPython. +There are two implementations of `Barrier` with the same functionality: a CPython +version and a MicroPython version with specific optimisations. The `Barrier` class +is loosely based on +[a Microsoft concept](https://docs.microsoft.com/en-us/windows/win32/sync/synchronization-barriers). + +## Directory structure + +MicroPython optimised primitives are in `uasyncio/`. Primitives compatible with +`asyncio` are in `primitives/`. diff --git a/uasyncio_iostream/prim_test.py b/uasyncio_iostream/prim_test.py index 34f28c1..c4f70ab 100644 --- a/uasyncio_iostream/prim_test.py +++ b/uasyncio_iostream/prim_test.py @@ -23,13 +23,22 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. -import uasyncio as asyncio -import uasyncio.lock -import uasyncio.event -import uasyncio.barrier -import uasyncio.semaphore -import uasyncio.condition -from uasyncio.queue import Queue # Name collision in __init__.py +try: + import asyncio +except ImportError: + # Specific imports under MicroPython to conserve RAM + import uasyncio as asyncio + import uasyncio.lock + import uasyncio.event + import uasyncio.semaphore + import uasyncio.condition + import uasyncio.queue + from uasyncio.barrier import Barrier # MicroPython optimised +else: + from primitives.barrier import Barrier # CPython generic +from primitives.message import Message # Portable + + def print_tests(): st = '''Available functions: @@ -69,7 +78,7 @@ async def event_wait(message, ack_event, n): ack_event.set() async def run_ack(): - message = asyncio.Message() + message = Message() ack1 = asyncio.Event() ack2 = asyncio.Event() count = 0 @@ -121,7 +130,7 @@ Cleared message I've seen attack ships burn on the shoulder of Orion... Time to die... ''', 10) - asyncio.run(ack_coro(10)) + asyncio.get_event_loop().run_until_complete(ack_coro(10)) # ************ Test Message class ************ @@ -132,7 +141,7 @@ async def wait_message(message): print('Got message {}'.format(msg)) async def run_message_test(): - message = asyncio.Message() + message = Message() asyncio.create_task(wait_message(message)) await asyncio.sleep(1) message.set('Hello world') @@ -143,7 +152,7 @@ def message_test(): Waiting for message Got message Hello world ''', 2) - asyncio.run(run_message_test()) + asyncio.get_event_loop().run_until_complete(run_message_test()) # ************ Test Lock and Event classes ************ @@ -199,11 +208,14 @@ got event Event status OK Tasks complete ''', 5) - asyncio.run(run_event_test()) + asyncio.get_event_loop().run_until_complete(run_event_test()) # ************ Barrier test ************ -async def killer(duration): +async def main(duration): + barrier = Barrier(3, callback, ('Synch',)) + for _ in range(3): + asyncio.create_task(report(barrier)) await asyncio.sleep(duration) def callback(text): @@ -221,10 +233,7 @@ def barrier_test(): 3 3 3 Synch 4 4 4 Synch ''') - barrier = asyncio.Barrier(3, callback, ('Synch',)) - for _ in range(3): - asyncio.create_task(report(barrier)) - asyncio.run(killer(2)) + asyncio.get_event_loop().run_until_complete(main(2)) # ************ Semaphore test ************ @@ -239,7 +248,7 @@ async def run_sema(n, sema, barrier): async def run_sema_test(bounded): num_coros = 5 - barrier = asyncio.Barrier(num_coros + 1) + barrier = Barrier(num_coros + 1) if bounded: semaphore = asyncio.BoundedSemaphore(3) else: @@ -291,7 +300,7 @@ run_sema 4 has released semaphore Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.''' printexp(exp, 3) - asyncio.run(run_sema_test(bounded)) + asyncio.get_event_loop().run_until_complete(run_sema_test(bounded)) # ************ Condition test ************ @@ -343,20 +352,20 @@ async def cond04(n, cond, barrier): async def cond_go(): cond = asyncio.Condition() ntasks = 7 - barrier = asyncio.Barrier(ntasks + 1) + barrier = Barrier(ntasks + 1) t1 = asyncio.create_task(cond01_new(cond)) t3 = asyncio.create_task(cond03_new()) for n in range(ntasks): asyncio.create_task(cond02(n, cond, barrier)) await barrier # All instances of cond02 have completed # Test wait_for - barrier = asyncio.Barrier(2) + barrier = Barrier(2) asyncio.create_task(cond04(99, cond, barrier)) await barrier # cancel continuously running coros. t1.cancel() t3.cancel() - await asyncio.sleep_ms(0) + await asyncio.sleep(0) print('Done.') def condition_test(): @@ -378,7 +387,7 @@ cond04 99 Awaiting notification and predicate. cond04 99 triggered. tim = 9 Done. ''', 13) - asyncio.run(cond_go()) + asyncio.get_event_loop().run_until_complete(cond_go()) # ************ Queue test ************ @@ -395,7 +404,7 @@ async def mtq(myq): await asyncio.sleep(0.2) async def queue_go(): - myq = Queue(5) + myq = asyncio.Queue(5) asyncio.create_task(fillq(myq)) await mtq(myq) @@ -418,4 +427,4 @@ Retrieved 5 from queue Retrieved 6 from queue Retrieved 7 from queue ''', 3) - asyncio.run(queue_go()) + asyncio.get_event_loop().run_until_complete(queue_go()) diff --git a/uasyncio_iostream/primitives/__init__.py b/uasyncio_iostream/primitives/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/uasyncio_iostream/primitives/barrier.py b/uasyncio_iostream/primitives/barrier.py new file mode 100644 index 0000000..2cb91ee --- /dev/null +++ b/uasyncio_iostream/primitives/barrier.py @@ -0,0 +1,73 @@ +# Generic Barrier class: runs under CPython 3.8 + +# A Barrier synchronises N coros. In normal use each issues await barrier. +# Execution pauses until all other participant coros are waiting on it. +# At that point the callback is executed. Then the barrier is 'opened' and +# execution of all participants resumes. +# .trigger enables a coro to signal it has passed the barrier without waiting. + +import asyncio + +# Ignore "coroutine '_g' was never awaited" warning. +async def _g(): + pass +type_coro = type(_g()) + +# If a callback is passed, run it and return. +# If a coro is passed initiate it and return. +# coros are passed by name i.e. not using function call syntax. + +def launch(func, tup_args): + res = func(*tup_args) + if isinstance(res, type_coro): + asyncio.create_task(res) + + + +class Barrier(): + def __init__(self, participants, func=None, args=()): + self._participants = participants + self._func = func + self._args = args + self._reset(True) + + def __await__(self): + self._update() + if self._at_limit(): # All other threads are also at limit + if self._func is not None: + launch(self._func, self._args) + self._reset(not self._down) # Toggle direction to release others + return + + direction = self._down + while True: # Wait until last waiting thread changes the direction + if direction != self._down: + return + yield + + def trigger(self): + self._update() + if self._at_limit(): # All other threads are also at limit + if self._func is not None: + launch(self._func, self._args) + self._reset(not self._down) # Toggle direction to release others + + def _reset(self, down): + self._down = down + self._count = self._participants if down else 0 + + def busy(self): + if self._down: + done = self._count == self._participants + else: + done = self._count == 0 + return not done + + def _at_limit(self): # Has count reached up or down limit? + limit = 0 if self._down else self._participants + return self._count == limit + + def _update(self): + self._count += -1 if self._down else 1 + if self._count < 0 or self._count > self._participants: + raise ValueError('Too many tasks accessing Barrier') diff --git a/uasyncio_iostream/primitives/message.py b/uasyncio_iostream/primitives/message.py new file mode 100644 index 0000000..86eb9d6 --- /dev/null +++ b/uasyncio_iostream/primitives/message.py @@ -0,0 +1,34 @@ +# message.py +# A coro waiting on a message issues msg = await message_instance +# A coro rasing the message issues message_instance.set(msg) +# When all waiting coros have run +# message_instance.clear() should be issued + +try: + import asyncio +except ImportError: + import uasyncio as asyncio + + +class Message(asyncio.Event): + def __init__(self): + super().__init__() + self._data = None + + def clear(self): + super().clear() + + def __await__(self): + yield from self.wait().__await__() # CPython + return self._data + + def __iter__(self): + yield from self.wait() # MicroPython + return self._data + + def set(self, data=None): + super().set() + self._data = data + + def value(self): + return self._data diff --git a/uasyncio_iostream/uasyncio/barrier.py b/uasyncio_iostream/uasyncio/barrier.py index 827fdb9..09d574e 100644 --- a/uasyncio_iostream/uasyncio/barrier.py +++ b/uasyncio_iostream/uasyncio/barrier.py @@ -1,11 +1,16 @@ -# barrier.py +# barrier.py MicroPython optimised version -# A Barrier synchronises N coros. Each issues await barrier. +# A Barrier synchronises N coros. In normal use each issues await barrier. # Execution pauses until all other participant coros are waiting on it. # At that point the callback is executed. Then the barrier is 'opened' and # execution of all participants resumes. +# .trigger enables a coro to signal it has passed the barrier without waiting. -import uasyncio +try: + import asyncio + raise RuntimeError('This version of barrier is MicroPython specific') +except ImportError: + import uasyncio async def _g(): pass @@ -37,7 +42,7 @@ class Barrier(): while self.waiting.next: uasyncio.tqueue.push_head(self.waiting.pop_head()) - def __iter__(self): + def __iter__(self): # MicroPython self._update() if self._at_limit(): # All other coros are also at limit if self._func is not None: @@ -72,5 +77,3 @@ class Barrier(): self._count += -1 if self._down else 1 if self._count < 0 or self._count > self._participants: raise ValueError('Too many tasks accessing Barrier') - -uasyncio.Barrier = Barrier diff --git a/uasyncio_iostream/uasyncio/event.py b/uasyncio_iostream/uasyncio/event.py index 610067a..e5a5304 100644 --- a/uasyncio_iostream/uasyncio/event.py +++ b/uasyncio_iostream/uasyncio/event.py @@ -24,28 +24,3 @@ class Event: return True uasyncio.Event = Event - -# A coro waiting on a message issues msg = await Message_instance -# A coro rasing the message issues event.set(msg) -# When all waiting coros have run -# Message.clear() should be issued -class Message(uasyncio.Event): - def __init__(self, delay_ms=0): - super().__init__() - self._data = None - - def clear(self): - super().clear() - - def __iter__(self): - await self.wait() - return self._data - - def set(self, data=None): - super().set() - self._data = data - - def value(self): - return self._data - -uasyncio.Message = Message