uasyncio Revert name of task queue instance.

pull/12/head
Peter Hinch 2019-12-02 16:35:03 +00:00
rodzic 6853171823
commit 7bc6f92dd7
2 zmienionych plików z 22 dodań i 22 usunięć

Wyświetl plik

@ -1,15 +1,15 @@
# Changes to usayncio # Changes to usayncio
This archive contains suggestions for changes to new `uasyncio`. Item 3 below This archive contains suggestions for changes to new `uasyncio`. Item 3 below
added 2 Dec. added 2 Dec, task queue name reverted to `_queue` as this can now be private.
1. Implement as a Python package. 1. Implement as a Python package.
2. Implement synchronisation primitives as package modules to conserve RAM. 2. Implement synchronisation primitives as package modules to conserve RAM.
3. `Primitive` class has methods common to most synchronisation primitives. 3. `Primitive` class has methods common to most synchronisation primitives.
Avoids the need for primitives to access the task queue directly.
4. Add `.priority` method to `Stream` class. Enables I/O to be handled at high 4. Add `.priority` method to `Stream` class. Enables I/O to be handled at high
priority on a per-device basis. priority on a per-device basis.
5. Rename task queue class `TQueue` to avoid name clash with Queue primitive. 5. Rename task queue class `TQueue` to avoid name clash with Queue primitive.
6. Rename task queue instance to `tqueue` as it might be used by primitives.
## Minor changes ## Minor changes
@ -22,7 +22,7 @@ added 2 Dec.
# CPython-compatible synchronisation primitives # CPython-compatible synchronisation primitives
These aim to work efficiently with the new version. All are separate modules to These aim to work efficiently with the new version. All are separate modules to
conserve RAM. conserve RAM. Items 1-4 use classes based on `uasyncio.Primitive`.
1. `Event`: just moved to separate module. 1. `Event`: just moved to separate module.
2. `Lock`: Kevin Köck's solution. 2. `Lock`: Kevin Köck's solution.

Wyświetl plik

@ -17,11 +17,11 @@ class Primitive:
def run_next(self): def run_next(self):
awt = self.waiting.next awt = self.waiting.next
if awt: # Schedule next task waiting on primitive if awt: # Schedule next task waiting on primitive
tqueue.push_head(self.waiting.pop_head()) _queue.push_head(self.waiting.pop_head())
return awt return awt
def run_all(self): def run_all(self):
while self.waiting.next: # Schedule all tasks waiting on primitive while self.waiting.next: # Schedule all tasks waiting on primitive
tqueue.push_head(self.waiting.pop_head()) _queue.push_head(self.waiting.pop_head())
def save_current(self): # Postpone currently running task def save_current(self): # Postpone currently running task
self.waiting.push_head(cur_task) self.waiting.push_head(cur_task)
# Set calling task's data to this event that it waits on, to double-link it # Set calling task's data to this event that it waits on, to double-link it
@ -128,15 +128,15 @@ class Task:
if hasattr(self.data, 'waiting'): if hasattr(self.data, 'waiting'):
self.data.waiting.remove(self) self.data.waiting.remove(self)
else: else:
tqueue.remove(self) _queue.remove(self)
tqueue.push_error(self, CancelledError) _queue.push_error(self, CancelledError)
return True return True
# Create and schedule a new task from a coroutine # Create and schedule a new task from a coroutine
def create_task(coro): def create_task(coro):
assert not isinstance(coro, type_genf), 'Coroutine arg expected.' # upy issue #3241 assert not isinstance(coro, type_genf), 'Coroutine arg expected.' # upy issue #3241
t = Task(coro) t = Task(coro)
tqueue.push_head(t) _queue.push_head(t)
return t return t
# "Yield" once, then raise StopIteration # "Yield" once, then raise StopIteration
@ -157,7 +157,7 @@ class SingletonGenerator:
# Pause task execution for the given time (integer in milliseconds, uPy extension) # Pause task execution for the given time (integer in milliseconds, uPy extension)
# Use a SingletonGenerator to do it without allocating on the heap # Use a SingletonGenerator to do it without allocating on the heap
def sleep_ms(t, sgen=SingletonGenerator()): def sleep_ms(t, sgen=SingletonGenerator()):
tqueue.push_sorted(cur_task, ticks_add(ticks(), t)) _queue.push_sorted(cur_task, ticks_add(ticks(), t))
sgen.state = 1 sgen.state = 1
return sgen return sgen
@ -188,7 +188,7 @@ async def wait_for(aw, timeout):
# Ignore CancelledError from aw, it's probably due to timeout # Ignore CancelledError from aw, it's probably due to timeout
pass pass
finally: finally:
tqueue.remove(cancel_task) _queue.remove(cancel_task)
if cancel_task.coro is None: if cancel_task.coro is None:
# Cancel task ran to completion, ie there was a timeout # Cancel task ran to completion, ie there was a timeout
raise TimeoutError raise TimeoutError
@ -265,15 +265,15 @@ class IOQueue:
#print('poll', s, sm, ev, err) #print('poll', s, sm, ev, err)
if ev & select.POLLIN or (err and sm[0] is not None): if ev & select.POLLIN or (err and sm[0] is not None):
if fast: if fast:
tqueue.push_priority(sm[0]) _queue.push_priority(sm[0])
else: else:
tqueue.push_head(sm[0]) _queue.push_head(sm[0])
sm[0] = None sm[0] = None
if ev & select.POLLOUT or (err and sm[1] is not None): if ev & select.POLLOUT or (err and sm[1] is not None):
if fast: if fast:
tqueue.push_priority(sm[1]) _queue.push_priority(sm[1])
else: else:
tqueue.push_head(sm[1]) _queue.push_head(sm[1])
sm[1] = None sm[1] = None
if sm[0] is None and sm[1] is None: if sm[0] is None and sm[1] is None:
self._dequeue(s) self._dequeue(s)
@ -391,7 +391,7 @@ async def start_server(cb, host, port, backlog=5):
# Main run loop # Main run loop
# Queue of Task instances # Queue of Task instances
tqueue = TQueue() _queue = TQueue()
# Task queue and poller for stream IO # Task queue and poller for stream IO
_io_queue = IOQueue() _io_queue = IOQueue()
@ -402,15 +402,15 @@ def run_until_complete(main_task=None):
excs_all = (CancelledError, Exception) # To prevent heap allocation in loop excs_all = (CancelledError, Exception) # To prevent heap allocation in loop
excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop
while True: while True:
# Wait until the head of tqueue is ready to run # Wait until the head of _queue is ready to run
dt = 1 dt = 1
while dt > 0: while dt > 0:
dt = -1 dt = -1
if tqueue.next: if _queue.next:
# A task waiting on tqueue # A task waiting on _queue
if isinstance(tqueue.next.data, int): if isinstance(_queue.next.data, int):
# "data" is time to schedule task at # "data" is time to schedule task at
dt = max(0, ticks_diff(tqueue.next.data, ticks())) dt = max(0, ticks_diff(_queue.next.data, ticks()))
else: else:
# "data" is an exception to throw into the task # "data" is an exception to throw into the task
dt = 0 dt = 0
@ -421,7 +421,7 @@ def run_until_complete(main_task=None):
_io_queue.wait_io_event(dt) _io_queue.wait_io_event(dt)
# Get next task to run and continue it # Get next task to run and continue it
t = tqueue.pop_head() t = _queue.pop_head()
cur_task = t cur_task = t
try: try:
# Continue running the coroutine, it's responsible for rescheduling itself # Continue running the coroutine, it's responsible for rescheduling itself
@ -439,7 +439,7 @@ def run_until_complete(main_task=None):
waiting = False waiting = False
if hasattr(t, 'waiting'): if hasattr(t, 'waiting'):
while t.waiting.next: while t.waiting.next:
tqueue.push_head(t.waiting.pop_head()) _queue.push_head(t.waiting.pop_head())
waiting = True waiting = True
t.waiting = None # Free waiting queue head t.waiting = None # Free waiting queue head
_io_queue.remove(t) # Remove task from the IO queue (if it's on it) _io_queue.remove(t) # Remove task from the IO queue (if it's on it)