kopia lustrzana https://github.com/peterhinch/micropython-samples
uasyncio iotest7 added, iotest6 uses ioq_len arg.
rodzic
7a6ac8a45b
commit
f01ec022da
|
@ -5,7 +5,7 @@
|
||||||
# recourse to electronic testgear.
|
# recourse to electronic testgear.
|
||||||
|
|
||||||
# The MyIO class supports .readline() which updates a call counter and clears the
|
# The MyIO class supports .readline() which updates a call counter and clears the
|
||||||
# .ready_read flag. This is set by a timer (emulating the arrival of data from
|
# .ready_rd flag. This is set by a timer (emulating the arrival of data from
|
||||||
# some hardware device).
|
# some hardware device).
|
||||||
# The .dummy method emulates a relatively slow user coro which yields with a zero
|
# The .dummy method emulates a relatively slow user coro which yields with a zero
|
||||||
# delay; the test runs 10 instances of this. Each instance updates a common call
|
# delay; the test runs 10 instances of this. Each instance updates a common call
|
||||||
|
@ -69,11 +69,14 @@ async def receiver(myior):
|
||||||
await sreader.readline()
|
await sreader.readline()
|
||||||
|
|
||||||
def test(fast_io=False):
|
def test(fast_io=False):
|
||||||
loop = asyncio.get_event_loop(fast_io=fast_io)
|
loop = asyncio.get_event_loop(ioq_len = 6 if fast_io else 0)
|
||||||
myior = MyIO()
|
myior = MyIO()
|
||||||
loop.create_task(receiver(myior))
|
loop.create_task(receiver(myior))
|
||||||
for _ in range(10):
|
for _ in range(10):
|
||||||
loop.create_task(myior.dummy())
|
loop.create_task(myior.dummy())
|
||||||
loop.run_until_complete(myior.killer())
|
loop.run_until_complete(myior.killer())
|
||||||
|
|
||||||
|
print('Test case of I/O competing with zero delay tasks.')
|
||||||
|
print('fast_io False: approx I/O count 25, dummy count 510.')
|
||||||
|
print('fast_io True: approx I/O count 510, dummy count 510.')
|
||||||
print('Run test() to check normal I/O, test(True) for fast I/O')
|
print('Run test() to check normal I/O, test(True) for fast I/O')
|
||||||
|
|
|
@ -0,0 +1,72 @@
|
||||||
|
# iotest7.py Test fast_io PR
|
||||||
|
# https://github.com/micropython/micropython-lib/pull/287
|
||||||
|
|
||||||
|
# Test the case where runq is empty
|
||||||
|
|
||||||
|
# The MyIO class supports .readline() which updates a call counter and clears the
|
||||||
|
# .ready_rd flag. This is set by a timer (emulating the arrival of data from
|
||||||
|
# some hardware device).
|
||||||
|
|
||||||
|
|
||||||
|
import io
|
||||||
|
import pyb
|
||||||
|
import utime
|
||||||
|
import uasyncio as asyncio
|
||||||
|
import micropython
|
||||||
|
micropython.alloc_emergency_exception_buf(100)
|
||||||
|
|
||||||
|
MP_STREAM_POLL_RD = const(1)
|
||||||
|
MP_STREAM_POLL_WR = const(4)
|
||||||
|
MP_STREAM_POLL = const(3)
|
||||||
|
MP_STREAM_ERROR = const(-1)
|
||||||
|
|
||||||
|
class MyIO(io.IOBase):
|
||||||
|
def __init__(self, read=False, write=False):
|
||||||
|
self.read_count = 0
|
||||||
|
self.dummy_count = 0
|
||||||
|
self.ready_rd = False
|
||||||
|
pyb.Timer(4, freq = 100, callback = self.do_input)
|
||||||
|
|
||||||
|
# Read callback: emulate asynchronous input from hardware.
|
||||||
|
def do_input(self, t):
|
||||||
|
self.ready_rd = True # Data is ready to read
|
||||||
|
|
||||||
|
def ioctl(self, req, arg):
|
||||||
|
ret = MP_STREAM_ERROR
|
||||||
|
if req == MP_STREAM_POLL:
|
||||||
|
ret = 0
|
||||||
|
if arg & MP_STREAM_POLL_RD:
|
||||||
|
if self.ready_rd:
|
||||||
|
ret |= MP_STREAM_POLL_RD
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def readline(self):
|
||||||
|
self.read_count += 1
|
||||||
|
self.ready_rd = False
|
||||||
|
return b'a\n'
|
||||||
|
|
||||||
|
async def dummy(self):
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep_ms(50)
|
||||||
|
self.dummy_count += 1
|
||||||
|
|
||||||
|
async def killer(self):
|
||||||
|
print('Test runs for 5s')
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
print('I/O count {} Dummy count {}'.format(self.read_count, self.dummy_count))
|
||||||
|
|
||||||
|
async def receiver(myior):
|
||||||
|
sreader = asyncio.StreamReader(myior)
|
||||||
|
while True:
|
||||||
|
await sreader.readline()
|
||||||
|
|
||||||
|
def test(fast_io=False):
|
||||||
|
loop = asyncio.get_event_loop(ioq_len = 6 if fast_io else 0)
|
||||||
|
myior = MyIO()
|
||||||
|
loop.create_task(receiver(myior))
|
||||||
|
loop.create_task(myior.dummy())
|
||||||
|
loop.run_until_complete(myior.killer())
|
||||||
|
|
||||||
|
print('Test case of empty runq: the fast_io option has no effect.')
|
||||||
|
print('I/O and dummy run at expected rates (around 500 and 99 counts respectively.')
|
||||||
|
print('Run test() to check normal I/O, test(True) for fast I/O')
|
Ładowanie…
Reference in New Issue