micropython-samples/uasyncio_iostream/tests/iotest.py

70 wiersze
1.7 KiB
Python

# iotest.py Test PR #3836 timing using GPIO pins.
import io, pyb
import uasyncio as asyncio
import micropython
micropython.alloc_emergency_exception_buf(100)
MP_STREAM_POLL = const(3)
MP_STREAM_POLL_RD = const(1)
y1 = pyb.Pin('Y1', pyb.Pin.OUT)
class MyIO(io.IOBase):
def __init__(self):
self.ready = False
self.count = 0
tim = pyb.Timer(4)
tim.init(freq=1) # 1Hz - 1 simulated input line per sec.
tim.callback(self.setready)
def ioctl(self, req, arg):
if req == MP_STREAM_POLL and (arg & MP_STREAM_POLL_RD):
state = pyb.disable_irq()
r = self.ready
self.ready = False
pyb.enable_irq(state)
return r
return 0
def readline(self): # Y1 goes low when I/O is serviced
y1.value(0)
return '{}\n'.format(self.count)
def setready(self, t): # Y1 goes high when I/O becomes ready
self.count += 1
y1.value(1)
self.ready = True
myio = MyIO()
async def foo(p): # Toggle a pin when scheduled
print('start foo', p)
pin = pyb.Pin(p, pyb.Pin.OUT)
while True:
pin.value(1)
await asyncio.sleep(0)
pin.value(0)
await asyncio.sleep(0)
async def receiver():
last = None
nmissed = 0
sreader = asyncio.StreamReader(myio)
while True:
res = await sreader.readline()
print('Recieved {} Missed {}'.format(res, nmissed))
ires = int(res)
if last is not None:
if last != ires -1:
print('Missed {}'.format(ires - 1))
nmissed += 1
last = ires
loop = asyncio.get_event_loop()
loop.create_task(receiver())
loop.create_task(foo('Y2'))
loop.create_task(foo('Y3'))
loop.run_forever()