diff --git a/uasyncio.queues/metadata.txt b/uasyncio.queues/metadata.txt new file mode 100644 index 00000000..9e4c1f19 --- /dev/null +++ b/uasyncio.queues/metadata.txt @@ -0,0 +1,5 @@ +srctype = micropython-lib +type = package +version = 0.1 +long_desc = Port of asyncio.queues to uasyncio. +depends = uasyncio.core, collections.deque diff --git a/uasyncio.queues/setup.py b/uasyncio.queues/setup.py new file mode 100644 index 00000000..22b0660a --- /dev/null +++ b/uasyncio.queues/setup.py @@ -0,0 +1,19 @@ +import sys +# Remove current dir from sys.path, otherwise setuptools will peek up our +# module instead of system. +sys.path.pop(0) +from setuptools import setup + + +setup(name='micropython-uasyncio.queues', + version='0.1', + description='uasyncio.queues module for MicroPython', + long_description='Port of asyncio.queues to uasyncio.', + url='https://github.com/micropython/micropython/issues/405', + author='MicroPython Developers', + author_email='micro-python@googlegroups.com', + maintainer='MicroPython Developers', + maintainer_email='micro-python@googlegroups.com', + license='MIT', + packages=['uasyncio'], + install_requires=['micropython-uasyncio.core', 'micropython-collections.deque']) diff --git a/uasyncio.queues/tests/test.py b/uasyncio.queues/tests/test.py new file mode 100644 index 00000000..72b3d85c --- /dev/null +++ b/uasyncio.queues/tests/test.py @@ -0,0 +1,57 @@ +from unittest import TestCase, run_class +import sys +sys.path.insert(0, '../uasyncio') +import queues + + +class QueueTestCase(TestCase): + + def _val(self, gen): + """Returns val from generator.""" + while True: + try: + gen.send(None) + except StopIteration as e: + return e.value + + def test_get_put(self): + q = queues.Queue(maxsize=1) + self._val(q.put(42)) + self.assertEqual(self._val(q.get()), 42) + + def test_get_put_nowait(self): + q = queues.Queue(maxsize=1) + q.put_nowait(12) + try: + q.put_nowait(42) + self.assertTrue(False) + except Exception as e: + self.assertEqual(type(e), queues.QueueFull) + self.assertEqual(q.get_nowait(), 12) + try: + q.get_nowait() + self.assertTrue(False) + except Exception as e: + self.assertEqual(type(e), queues.QueueEmpty) + + def test_qsize(self): + q = queues.Queue() + for n in range(10): + q.put_nowait(10) + self.assertEqual(q.qsize(), 10) + + def test_empty(self): + q = queues.Queue() + self.assertTrue(q.empty()) + q.put_nowait(10) + self.assertFalse(q.empty()) + + def test_full(self): + q = queues.Queue(maxsize=1) + self.assertFalse(q.full()) + q.put_nowait(10) + self.assertTrue(q.full()) + + +if __name__ == '__main__': + run_class(QueueTestCase) diff --git a/uasyncio.queues/uasyncio/queues.py b/uasyncio.queues/uasyncio/queues.py new file mode 100644 index 00000000..4a8ae5fe --- /dev/null +++ b/uasyncio.queues/uasyncio/queues.py @@ -0,0 +1,94 @@ +from collections.deque import deque +from uasyncio.core import sleep + + +class QueueEmpty(Exception): + """Exception raised by get_nowait().""" + + +class QueueFull(Exception): + """Exception raised by put_nowait().""" + + +class Queue: + """A queue, useful for coordinating producer and consumer coroutines. + + If maxsize is less than or equal to zero, the queue size is infinite. If it + is an integer greater than 0, then "yield from put()" will block when the + queue reaches maxsize, until an item is removed by get(). + + Unlike the standard library Queue, you can reliably know this Queue's size + with qsize(), since your single-threaded uasyncio application won't be + interrupted between calling qsize() and doing an operation on the Queue. + """ + _attempt_delay = 0.1 + + def __init__(self, maxsize=0): + self.maxsize = maxsize + self._queue = deque() + + def _get(self): + return self._queue.popleft() + + def get(self): + """Returns generator, which can be used for getting (and removing) + an item from a queue. + + Usage:: + + item = yield from queue.get() + """ + while not self._queue: + yield from sleep(self._attempt_delay) + return self._get() + + def get_nowait(self): + """Remove and return an item from the queue. + + Return an item if one is immediately available, else raise QueueEmpty. + """ + if not self._queue: + raise QueueEmpty() + return self._get() + + def _put(self, val): + self._queue.append(val) + + def put(self, val): + """Returns generator which can be used for putting item in a queue. + + Usage:: + + yield from queue.put(item) + """ + while self.qsize() > self.maxsize and self.maxsize: + yield from sleep(self._attempt_delay) + self._put(val) + + def put_nowait(self, val): + """Put an item into the queue without blocking. + + If no free slot is immediately available, raise QueueFull. + """ + if self.qsize() >= self.maxsize and self.maxsize: + raise QueueFull() + self._put(val) + + def qsize(self): + """Number of items in the queue.""" + return len(self._queue) + + def empty(self): + """Return True if the queue is empty, False otherwise.""" + return not self._queue + + def full(self): + """Return True if there are maxsize items in the queue. + + Note: if the Queue was initialized with maxsize=0 (the default), + then full() is never True. + """ + if self.maxsize <= 0: + return False + else: + return self.qsize() >= self.maxsize