From bfbb9eb4a347a1a3419f8c819d8dbc835058f288 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Wed, 20 Jun 2018 16:48:25 +0100 Subject: [PATCH] iotest6.py added: test for uasyncio fast_io --- uasyncio_iostream/tests/iotest.py | 8 +-- uasyncio_iostream/tests/iotest6.py | 78 ++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 4 deletions(-) create mode 100644 uasyncio_iostream/tests/iotest6.py diff --git a/uasyncio_iostream/tests/iotest.py b/uasyncio_iostream/tests/iotest.py index 0e7c2b3..cbe1e4d 100644 --- a/uasyncio_iostream/tests/iotest.py +++ b/uasyncio_iostream/tests/iotest.py @@ -15,7 +15,7 @@ class MyIO(io.IOBase): self.ready = False self.count = 0 tim = pyb.Timer(4) - tim.init(freq=1) + tim.init(freq=1) # 1Hz - 1 simulated input line per sec. tim.callback(self.setready) def ioctl(self, req, arg): @@ -27,18 +27,18 @@ class MyIO(io.IOBase): return r return 0 - def readline(self): + def readline(self): # Y1 goes low when I/O is serviced y1.value(0) return '{}\n'.format(self.count) - def setready(self, t): + def setready(self, t): # Y1 goes high when I/O becomes ready self.count += 1 y1.value(1) self.ready = True myio = MyIO() -async def foo(p): +async def foo(p): # Toggle a pin when scheduled print('start foo', p) pin = pyb.Pin(p, pyb.Pin.OUT) while True: diff --git a/uasyncio_iostream/tests/iotest6.py b/uasyncio_iostream/tests/iotest6.py new file mode 100644 index 0000000..28894b2 --- /dev/null +++ b/uasyncio_iostream/tests/iotest6.py @@ -0,0 +1,78 @@ +# iotest6.py Test fast_io PR + +# Test is designed to quantify the difference between fast and normal I/O without +# recourse to electronic testgear. + +# The MyIO class supports .readline() which updates a call counter and clears the +# .ready_read flag. This is set by a timer (emulating the arrival of data from +# some hardware device). +# The .dummy method emulates a relatively slow user coro which yields with a zero +# delay; the test runs 10 instances of this. Each instance updates a common call +# counter. +# The receiver coro awaits .readline continuously. With normal scheduling each is +# scheduled after the ten .dummy instances have run. With fast scheduling and a +# timer period <= 10ms readline and dummy alternate. If timer period is increased +# readline is sheduled progressively less frequently. + +import io +import pyb +import utime +import uasyncio as asyncio +import micropython +micropython.alloc_emergency_exception_buf(100) + +MP_STREAM_POLL_RD = const(1) +MP_STREAM_POLL_WR = const(4) +MP_STREAM_POLL = const(3) +MP_STREAM_ERROR = const(-1) + +class MyIO(io.IOBase): + def __init__(self, read=False, write=False): + self.read_count = 0 + self.dummy_count = 0 + self.ready_rd = False + pyb.Timer(4, freq = 100, callback = self.do_input) + + # Read callback: emulate asynchronous input from hardware. + def do_input(self, t): + self.ready_rd = True # Data is ready to read + + def ioctl(self, req, arg): + ret = MP_STREAM_ERROR + if req == MP_STREAM_POLL: + ret = 0 + if arg & MP_STREAM_POLL_RD: + if self.ready_rd: + ret |= MP_STREAM_POLL_RD + return ret + + def readline(self): + self.read_count += 1 + self.ready_rd = False + return b'a\n' + + async def dummy(self): + while True: + await asyncio.sleep(0) + self.dummy_count += 1 + utime.sleep_ms(10) # Emulate time consuming user code + + async def killer(self): + print('Test runs for 5s') + await asyncio.sleep(5) + print('I/O count {} Dummy count {}'.format(self.read_count, self.dummy_count)) + +async def receiver(myior): + sreader = asyncio.StreamReader(myior) + while True: + await sreader.readline() + +def test(fast_io=False): + loop = asyncio.get_event_loop(fast_io=fast_io) + myior = MyIO() + loop.create_task(receiver(myior)) + for _ in range(10): + loop.create_task(myior.dummy()) + loop.run_until_complete(myior.killer()) + +print('Run test() to check normal I/O, test(True) for fast I/O')