kopia lustrzana https://github.com/micropython/micropython-lib
uasyncio.queues: Add simple implementation of asynchronous queues for uasyncio
rodzic
1387950bb0
commit
4dbb10e39d
|
@ -0,0 +1,5 @@
|
|||
srctype = micropython-lib
|
||||
type = package
|
||||
version = 0.1
|
||||
long_desc = Port of asyncio.queues to uasyncio.
|
||||
depends = uasyncio.core, collections.deque
|
|
@ -0,0 +1,19 @@
|
|||
import sys
|
||||
# Remove current dir from sys.path, otherwise setuptools will peek up our
|
||||
# module instead of system.
|
||||
sys.path.pop(0)
|
||||
from setuptools import setup
|
||||
|
||||
|
||||
setup(name='micropython-uasyncio.queues',
|
||||
version='0.1',
|
||||
description='uasyncio.queues module for MicroPython',
|
||||
long_description='Port of asyncio.queues to uasyncio.',
|
||||
url='https://github.com/micropython/micropython/issues/405',
|
||||
author='MicroPython Developers',
|
||||
author_email='micro-python@googlegroups.com',
|
||||
maintainer='MicroPython Developers',
|
||||
maintainer_email='micro-python@googlegroups.com',
|
||||
license='MIT',
|
||||
packages=['uasyncio'],
|
||||
install_requires=['micropython-uasyncio.core', 'micropython-collections.deque'])
|
|
@ -0,0 +1,57 @@
|
|||
from unittest import TestCase, run_class
|
||||
import sys
|
||||
sys.path.insert(0, '../uasyncio')
|
||||
import queues
|
||||
|
||||
|
||||
class QueueTestCase(TestCase):
|
||||
|
||||
def _val(self, gen):
|
||||
"""Returns val from generator."""
|
||||
while True:
|
||||
try:
|
||||
gen.send(None)
|
||||
except StopIteration as e:
|
||||
return e.value
|
||||
|
||||
def test_get_put(self):
|
||||
q = queues.Queue(maxsize=1)
|
||||
self._val(q.put(42))
|
||||
self.assertEqual(self._val(q.get()), 42)
|
||||
|
||||
def test_get_put_nowait(self):
|
||||
q = queues.Queue(maxsize=1)
|
||||
q.put_nowait(12)
|
||||
try:
|
||||
q.put_nowait(42)
|
||||
self.assertTrue(False)
|
||||
except Exception as e:
|
||||
self.assertEqual(type(e), queues.QueueFull)
|
||||
self.assertEqual(q.get_nowait(), 12)
|
||||
try:
|
||||
q.get_nowait()
|
||||
self.assertTrue(False)
|
||||
except Exception as e:
|
||||
self.assertEqual(type(e), queues.QueueEmpty)
|
||||
|
||||
def test_qsize(self):
|
||||
q = queues.Queue()
|
||||
for n in range(10):
|
||||
q.put_nowait(10)
|
||||
self.assertEqual(q.qsize(), 10)
|
||||
|
||||
def test_empty(self):
|
||||
q = queues.Queue()
|
||||
self.assertTrue(q.empty())
|
||||
q.put_nowait(10)
|
||||
self.assertFalse(q.empty())
|
||||
|
||||
def test_full(self):
|
||||
q = queues.Queue(maxsize=1)
|
||||
self.assertFalse(q.full())
|
||||
q.put_nowait(10)
|
||||
self.assertTrue(q.full())
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
run_class(QueueTestCase)
|
|
@ -0,0 +1,94 @@
|
|||
from collections.deque import deque
|
||||
from uasyncio.core import sleep
|
||||
|
||||
|
||||
class QueueEmpty(Exception):
|
||||
"""Exception raised by get_nowait()."""
|
||||
|
||||
|
||||
class QueueFull(Exception):
|
||||
"""Exception raised by put_nowait()."""
|
||||
|
||||
|
||||
class Queue:
|
||||
"""A queue, useful for coordinating producer and consumer coroutines.
|
||||
|
||||
If maxsize is less than or equal to zero, the queue size is infinite. If it
|
||||
is an integer greater than 0, then "yield from put()" will block when the
|
||||
queue reaches maxsize, until an item is removed by get().
|
||||
|
||||
Unlike the standard library Queue, you can reliably know this Queue's size
|
||||
with qsize(), since your single-threaded uasyncio application won't be
|
||||
interrupted between calling qsize() and doing an operation on the Queue.
|
||||
"""
|
||||
_attempt_delay = 0.1
|
||||
|
||||
def __init__(self, maxsize=0):
|
||||
self.maxsize = maxsize
|
||||
self._queue = deque()
|
||||
|
||||
def _get(self):
|
||||
return self._queue.popleft()
|
||||
|
||||
def get(self):
|
||||
"""Returns generator, which can be used for getting (and removing)
|
||||
an item from a queue.
|
||||
|
||||
Usage::
|
||||
|
||||
item = yield from queue.get()
|
||||
"""
|
||||
while not self._queue:
|
||||
yield from sleep(self._attempt_delay)
|
||||
return self._get()
|
||||
|
||||
def get_nowait(self):
|
||||
"""Remove and return an item from the queue.
|
||||
|
||||
Return an item if one is immediately available, else raise QueueEmpty.
|
||||
"""
|
||||
if not self._queue:
|
||||
raise QueueEmpty()
|
||||
return self._get()
|
||||
|
||||
def _put(self, val):
|
||||
self._queue.append(val)
|
||||
|
||||
def put(self, val):
|
||||
"""Returns generator which can be used for putting item in a queue.
|
||||
|
||||
Usage::
|
||||
|
||||
yield from queue.put(item)
|
||||
"""
|
||||
while self.qsize() > self.maxsize and self.maxsize:
|
||||
yield from sleep(self._attempt_delay)
|
||||
self._put(val)
|
||||
|
||||
def put_nowait(self, val):
|
||||
"""Put an item into the queue without blocking.
|
||||
|
||||
If no free slot is immediately available, raise QueueFull.
|
||||
"""
|
||||
if self.qsize() >= self.maxsize and self.maxsize:
|
||||
raise QueueFull()
|
||||
self._put(val)
|
||||
|
||||
def qsize(self):
|
||||
"""Number of items in the queue."""
|
||||
return len(self._queue)
|
||||
|
||||
def empty(self):
|
||||
"""Return True if the queue is empty, False otherwise."""
|
||||
return not self._queue
|
||||
|
||||
def full(self):
|
||||
"""Return True if there are maxsize items in the queue.
|
||||
|
||||
Note: if the Queue was initialized with maxsize=0 (the default),
|
||||
then full() is never True.
|
||||
"""
|
||||
if self.maxsize <= 0:
|
||||
return False
|
||||
else:
|
||||
return self.qsize() >= self.maxsize
|
Ładowanie…
Reference in New Issue