From 7bc6f92dd7183d6dbab5e2dbb4e479d8000db1da Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Mon, 2 Dec 2019 16:35:03 +0000 Subject: [PATCH] uasyncio Revert name of task queue instance. --- uasyncio_iostream/README.md | 6 ++-- uasyncio_iostream/uasyncio/__init__.py | 38 +++++++++++++------------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/uasyncio_iostream/README.md b/uasyncio_iostream/README.md index 9bba541..1b31acc 100644 --- a/uasyncio_iostream/README.md +++ b/uasyncio_iostream/README.md @@ -1,15 +1,15 @@ # Changes to usayncio This archive contains suggestions for changes to new `uasyncio`. Item 3 below -added 2 Dec. +added 2 Dec, task queue name reverted to `_queue` as this can now be private. 1. Implement as a Python package. 2. Implement synchronisation primitives as package modules to conserve RAM. 3. `Primitive` class has methods common to most synchronisation primitives. + Avoids the need for primitives to access the task queue directly. 4. Add `.priority` method to `Stream` class. Enables I/O to be handled at high priority on a per-device basis. 5. Rename task queue class `TQueue` to avoid name clash with Queue primitive. - 6. Rename task queue instance to `tqueue` as it might be used by primitives. ## Minor changes @@ -22,7 +22,7 @@ added 2 Dec. # CPython-compatible synchronisation primitives These aim to work efficiently with the new version. All are separate modules to -conserve RAM. +conserve RAM. Items 1-4 use classes based on `uasyncio.Primitive`. 1. `Event`: just moved to separate module. 2. `Lock`: Kevin Köck's solution. diff --git a/uasyncio_iostream/uasyncio/__init__.py b/uasyncio_iostream/uasyncio/__init__.py index 40e2312..14a8468 100644 --- a/uasyncio_iostream/uasyncio/__init__.py +++ b/uasyncio_iostream/uasyncio/__init__.py @@ -17,11 +17,11 @@ class Primitive: def run_next(self): awt = self.waiting.next if awt: # Schedule next task waiting on primitive - tqueue.push_head(self.waiting.pop_head()) + _queue.push_head(self.waiting.pop_head()) return awt def run_all(self): while self.waiting.next: # Schedule all tasks waiting on primitive - tqueue.push_head(self.waiting.pop_head()) + _queue.push_head(self.waiting.pop_head()) def save_current(self): # Postpone currently running task self.waiting.push_head(cur_task) # Set calling task's data to this event that it waits on, to double-link it @@ -128,15 +128,15 @@ class Task: if hasattr(self.data, 'waiting'): self.data.waiting.remove(self) else: - tqueue.remove(self) - tqueue.push_error(self, CancelledError) + _queue.remove(self) + _queue.push_error(self, CancelledError) return True # Create and schedule a new task from a coroutine def create_task(coro): assert not isinstance(coro, type_genf), 'Coroutine arg expected.' # upy issue #3241 t = Task(coro) - tqueue.push_head(t) + _queue.push_head(t) return t # "Yield" once, then raise StopIteration @@ -157,7 +157,7 @@ class SingletonGenerator: # Pause task execution for the given time (integer in milliseconds, uPy extension) # Use a SingletonGenerator to do it without allocating on the heap def sleep_ms(t, sgen=SingletonGenerator()): - tqueue.push_sorted(cur_task, ticks_add(ticks(), t)) + _queue.push_sorted(cur_task, ticks_add(ticks(), t)) sgen.state = 1 return sgen @@ -188,7 +188,7 @@ async def wait_for(aw, timeout): # Ignore CancelledError from aw, it's probably due to timeout pass finally: - tqueue.remove(cancel_task) + _queue.remove(cancel_task) if cancel_task.coro is None: # Cancel task ran to completion, ie there was a timeout raise TimeoutError @@ -265,15 +265,15 @@ class IOQueue: #print('poll', s, sm, ev, err) if ev & select.POLLIN or (err and sm[0] is not None): if fast: - tqueue.push_priority(sm[0]) + _queue.push_priority(sm[0]) else: - tqueue.push_head(sm[0]) + _queue.push_head(sm[0]) sm[0] = None if ev & select.POLLOUT or (err and sm[1] is not None): if fast: - tqueue.push_priority(sm[1]) + _queue.push_priority(sm[1]) else: - tqueue.push_head(sm[1]) + _queue.push_head(sm[1]) sm[1] = None if sm[0] is None and sm[1] is None: self._dequeue(s) @@ -391,7 +391,7 @@ async def start_server(cb, host, port, backlog=5): # Main run loop # Queue of Task instances -tqueue = TQueue() +_queue = TQueue() # Task queue and poller for stream IO _io_queue = IOQueue() @@ -402,15 +402,15 @@ def run_until_complete(main_task=None): excs_all = (CancelledError, Exception) # To prevent heap allocation in loop excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop while True: - # Wait until the head of tqueue is ready to run + # Wait until the head of _queue is ready to run dt = 1 while dt > 0: dt = -1 - if tqueue.next: - # A task waiting on tqueue - if isinstance(tqueue.next.data, int): + if _queue.next: + # A task waiting on _queue + if isinstance(_queue.next.data, int): # "data" is time to schedule task at - dt = max(0, ticks_diff(tqueue.next.data, ticks())) + dt = max(0, ticks_diff(_queue.next.data, ticks())) else: # "data" is an exception to throw into the task dt = 0 @@ -421,7 +421,7 @@ def run_until_complete(main_task=None): _io_queue.wait_io_event(dt) # Get next task to run and continue it - t = tqueue.pop_head() + t = _queue.pop_head() cur_task = t try: # Continue running the coroutine, it's responsible for rescheduling itself @@ -439,7 +439,7 @@ def run_until_complete(main_task=None): waiting = False if hasattr(t, 'waiting'): while t.waiting.next: - tqueue.push_head(t.waiting.pop_head()) + _queue.push_head(t.waiting.pop_head()) waiting = True t.waiting = None # Free waiting queue head _io_queue.remove(t) # Remove task from the IO queue (if it's on it)