Simplify queue.py, prim_test.py tests queue cancellation.

pull/12/head
Peter Hinch 2019-12-02 08:22:52 +00:00
rodzic 888d017762
commit 7b021ad645
2 zmienionych plików z 35 dodań i 20 usunięć

Wyświetl plik

@ -407,9 +407,19 @@ async def queue_go():
myq = asyncio.Queue(5)
asyncio.create_task(fillq(myq))
await mtq(myq)
t = asyncio.create_task(fillq(myq))
await asyncio.sleep(1)
print('Queue filled. Cancelling fill task. Queue should be full.')
t.cancel()
await mtq(myq)
t = asyncio.create_task(myq.get())
await asyncio.sleep(1)
print('Cancelling attempt to get from empty queue.')
t.cancel()
print('Queue size:', myq.qsize())
def queue_test():
printexp('''Running (runtime = 3s):
printexp('''Running (runtime = 7s):
Waiting to put item 0 on queue
Waiting to put item 1 on queue
Waiting to put item 2 on queue
@ -426,5 +436,19 @@ Retrieved 4 from queue
Retrieved 5 from queue
Retrieved 6 from queue
Retrieved 7 from queue
''', 3)
Waiting to put item 0 on queue
Waiting to put item 1 on queue
Waiting to put item 2 on queue
Waiting to put item 3 on queue
Waiting to put item 4 on queue
Waiting to put item 5 on queue
Queue filled. Cancelling fill task. Queue should be full.
Retrieved 0 from queue
Retrieved 1 from queue
Retrieved 2 from queue
Retrieved 3 from queue
Retrieved 4 from queue
Cancelling attempt to get from empty queue.
Queue size: 0
''', 7)
asyncio.get_event_loop().run_until_complete(queue_go())

Wyświetl plik

@ -28,8 +28,7 @@ class Queue:
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._queue = deque((), maxsize)
self.p_tasks = uasyncio.TQueue() # Queue of Tasks waiting to put to queue
self.g_tasks = uasyncio.TQueue() # Queue of Tasks waiting to get from queue
self.waiting = uasyncio.TQueue() # Linked list of Tasks waiting on queue
def _get(self):
return self._queue.popleft()
@ -38,17 +37,13 @@ class Queue:
if not self._queue:
# Queue is empty, put the calling Task on the waiting queue
task = uasyncio.cur_task
self.g_tasks.push_head(task)
self.waiting.push_head(task)
# Set calling task's data to double-link it
task.data = self
try:
yield
except asyncio.CancelledError:
self.g_tasks.remove(task)
raise
if self.p_tasks.next:
yield
if self.waiting.next:
# Task(s) waiting to put on queue, schedule first Task
uasyncio.tqueue.push_head(self.p_tasks.pop_head())
uasyncio.tqueue.push_head(self.waiting.pop_head())
return self._get()
def get_nowait(self): # Remove and return an item from the queue.
@ -64,17 +59,13 @@ class Queue:
if self.qsize() >= self.maxsize and self.maxsize:
# Queue full, put the calling Task on the waiting queue
task = uasyncio.cur_task
self.p_tasks.push_head(task)
self.waiting.push_head(task)
# Set calling task's data to double-link it
uasyncio.cur_task.data = self
try:
yield
except asyncio.CancelledError:
self.p_tasks.remove(task)
raise
if self.g_tasks.next:
yield
if self.waiting.next:
# Task(s) waiting to get from queue, schedule first Task
uasyncio.tqueue.push_head(self.g_tasks.pop_head())
uasyncio.tqueue.push_head(self.waiting.pop_head())
self._put(val)
def put_nowait(self, val): # Put an item into the queue without blocking.