diff --git a/uasyncio_iostream/prim_test.py b/uasyncio_iostream/prim_test.py index c4f70ab..9f84589 100644 --- a/uasyncio_iostream/prim_test.py +++ b/uasyncio_iostream/prim_test.py @@ -407,9 +407,19 @@ async def queue_go(): myq = asyncio.Queue(5) asyncio.create_task(fillq(myq)) await mtq(myq) + t = asyncio.create_task(fillq(myq)) + await asyncio.sleep(1) + print('Queue filled. Cancelling fill task. Queue should be full.') + t.cancel() + await mtq(myq) + t = asyncio.create_task(myq.get()) + await asyncio.sleep(1) + print('Cancelling attempt to get from empty queue.') + t.cancel() + print('Queue size:', myq.qsize()) def queue_test(): - printexp('''Running (runtime = 3s): + printexp('''Running (runtime = 7s): Waiting to put item 0 on queue Waiting to put item 1 on queue Waiting to put item 2 on queue @@ -426,5 +436,19 @@ Retrieved 4 from queue Retrieved 5 from queue Retrieved 6 from queue Retrieved 7 from queue -''', 3) +Waiting to put item 0 on queue +Waiting to put item 1 on queue +Waiting to put item 2 on queue +Waiting to put item 3 on queue +Waiting to put item 4 on queue +Waiting to put item 5 on queue +Queue filled. Cancelling fill task. Queue should be full. +Retrieved 0 from queue +Retrieved 1 from queue +Retrieved 2 from queue +Retrieved 3 from queue +Retrieved 4 from queue +Cancelling attempt to get from empty queue. +Queue size: 0 +''', 7) asyncio.get_event_loop().run_until_complete(queue_go()) diff --git a/uasyncio_iostream/uasyncio/queue.py b/uasyncio_iostream/uasyncio/queue.py index 43c0ca1..87ad058 100644 --- a/uasyncio_iostream/uasyncio/queue.py +++ b/uasyncio_iostream/uasyncio/queue.py @@ -28,8 +28,7 @@ class Queue: def __init__(self, maxsize=0): self.maxsize = maxsize self._queue = deque((), maxsize) - self.p_tasks = uasyncio.TQueue() # Queue of Tasks waiting to put to queue - self.g_tasks = uasyncio.TQueue() # Queue of Tasks waiting to get from queue + self.waiting = uasyncio.TQueue() # Linked list of Tasks waiting on queue def _get(self): return self._queue.popleft() @@ -38,17 +37,13 @@ class Queue: if not self._queue: # Queue is empty, put the calling Task on the waiting queue task = uasyncio.cur_task - self.g_tasks.push_head(task) + self.waiting.push_head(task) # Set calling task's data to double-link it task.data = self - try: - yield - except asyncio.CancelledError: - self.g_tasks.remove(task) - raise - if self.p_tasks.next: + yield + if self.waiting.next: # Task(s) waiting to put on queue, schedule first Task - uasyncio.tqueue.push_head(self.p_tasks.pop_head()) + uasyncio.tqueue.push_head(self.waiting.pop_head()) return self._get() def get_nowait(self): # Remove and return an item from the queue. @@ -64,17 +59,13 @@ class Queue: if self.qsize() >= self.maxsize and self.maxsize: # Queue full, put the calling Task on the waiting queue task = uasyncio.cur_task - self.p_tasks.push_head(task) + self.waiting.push_head(task) # Set calling task's data to double-link it uasyncio.cur_task.data = self - try: - yield - except asyncio.CancelledError: - self.p_tasks.remove(task) - raise - if self.g_tasks.next: + yield + if self.waiting.next: # Task(s) waiting to get from queue, schedule first Task - uasyncio.tqueue.push_head(self.g_tasks.pop_head()) + uasyncio.tqueue.push_head(self.waiting.pop_head()) self._put(val) def put_nowait(self, val): # Put an item into the queue without blocking.