async demos updated for uasyncio V3

ili9341
Peter Hinch 2020-06-12 15:08:59 +01:00
rodzic 23db6c337b
commit b9b1c5c860
3 zmienionych plików z 99 dodań i 120 usunięć

Wyświetl plik

@ -21,13 +21,16 @@ applications performing fast concurrent input over devices such as UARTs.
# Demo scripts # Demo scripts
These require uasyncio V3. This is incorporated in daily builds and will be
available in release builds starting with MicroPython V1.13. The demos assume
a Pyboard.
* `asnano.py` Runs until the usr button is pressed. In this demo each meter * `asnano.py` Runs until the usr button is pressed. In this demo each meter
updates independently and mutually asynchronously to test the response to updates independently and mutually asynchronously to test the response to
repeated display refreshes. repeated display refreshes.
* `asnano_sync.py` Provides a less hectic visual by only updating the display * `asnano_sync.py` Provides a less hectic visual. Display objects update
when all data has been acquired. Uses a `Barrier` object to achieve the themselves as data becomes available but screen updates occur asynchronously
necessary synchronisation. at a low frequency. An asynchronous iterator is used to stop the demo when the
pyboard usr button is pressed.
These demos require [asyn.py](https://github.com/peterhinch/micropython-async/blob/master/asyn.py).
###### [Main README](../README.md) ###### [Main README](../README.md)

Wyświetl plik

@ -3,27 +3,8 @@
# Adafruit 1.5" 128*128 OLED display: https://www.adafruit.com/product/1431 # Adafruit 1.5" 128*128 OLED display: https://www.adafruit.com/product/1431
# Adafruit 1.27" 128*96 display https://www.adafruit.com/product/1673 # Adafruit 1.27" 128*96 display https://www.adafruit.com/product/1673
# The MIT License (MIT) # Copyright (c) 2020 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
# Copyright (c) 2018 Peter Hinch
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# WIRING (Adafruit pin nos and names) # WIRING (Adafruit pin nos and names)
# Pyb SSD # Pyb SSD
@ -43,15 +24,18 @@ import gc
from ssd1351 import SSD1351 as SSD from ssd1351 import SSD1351 as SSD
# Initialise hardware and framebuf before importing modules # Initialise hardware and framebuf before importing modules
pdc = machine.Pin('X1', machine.Pin.OUT_PP, value=0) #pdc = machine.Pin('X1', machine.Pin.OUT_PP, value=0)
pcs = machine.Pin('X2', machine.Pin.OUT_PP, value=1) #pcs = machine.Pin('X2', machine.Pin.OUT_PP, value=1)
prst = machine.Pin('X3', machine.Pin.OUT_PP, value=1) #prst = machine.Pin('X3', machine.Pin.OUT_PP, value=1)
spi = machine.SPI(1) #spi = machine.SPI(1)
pdc = machine.Pin('Y1', machine.Pin.OUT_PP, value=0)
pcs = machine.Pin('Y2', machine.Pin.OUT_PP, value=1)
prst = machine.Pin('Y3', machine.Pin.OUT_PP, value=1)
spi = machine.SPI(2)
gc.collect() # Precaution befor instantiating framebuf gc.collect() # Precaution befor instantiating framebuf
ssd = SSD(spi, pcs, pdc, prst, height) # Create a display instance ssd = SSD(spi, pcs, pdc, prst, height) # Create a display instance
import uasyncio as asyncio import uasyncio as asyncio
import asyn
import pyb import pyb
import uos import uos
from writer import CWriter from writer import CWriter
@ -73,7 +57,6 @@ wri.set_clip(True, True, False)
color = lambda v : RED if v > 0.7 else YELLOW if v > 0.5 else GREEN color = lambda v : RED if v > 0.7 else YELLOW if v > 0.5 else GREEN
txt = lambda v : 'ovr' if v > 0.7 else 'high' if v > 0.5 else 'ok' txt = lambda v : 'ovr' if v > 0.7 else 'high' if v > 0.5 else 'ok'
@asyn.cancellable
async def meter(n, x, text, t): async def meter(n, x, text, t):
print('Meter {} test.'.format(n)) print('Meter {} test.'.format(n))
m = Meter(wri, 5, x, divisions = 4, ptcolor=YELLOW, m = Meter(wri, 5, x, divisions = 4, ptcolor=YELLOW,
@ -87,29 +70,30 @@ async def meter(n, x, text, t):
refresh(ssd) refresh(ssd)
await asyncio.sleep_ms(t) await asyncio.sleep_ms(t)
@asyn.cancellable
async def flash(n, t): async def flash(n, t):
led = pyb.LED(n) led = pyb.LED(n)
try:
while True: while True:
led.toggle() led.toggle()
await asyncio.sleep_ms(t) await asyncio.sleep_ms(t)
except asyn.StopTask:
led.off()
print('LED {} was cancelled.'.format(n))
async def killer(): async def killer(tasks):
sw = pyb.Switch() sw = pyb.Switch()
while not sw(): while not sw():
await asyncio.sleep_ms(100) await asyncio.sleep_ms(100)
await asyn.Cancellable.cancel_all() for task in tasks:
task.cancel()
async def main():
tasks = []
tasks.append(asyncio.create_task(meter(1, 2, 'left', 700)))
tasks.append(asyncio.create_task(meter(2, 50, 'right', 1000)))
tasks.append(asyncio.create_task(meter(3, 98, 'bass', 1500)))
tasks.append(asyncio.create_task(flash(1, 200)))
tasks.append(asyncio.create_task(flash(2, 233)))
await killer(tasks)
print('Press Pyboard usr button to stop test.') print('Press Pyboard usr button to stop test.')
loop = asyncio.get_event_loop() try:
loop.create_task(asyn.Cancellable(meter, 1, 2, 'left', 700)()) asyncio.run(main())
loop.create_task(asyn.Cancellable(meter, 2, 50, 'right', 1000)()) finally: # Reset uasyncio case of KeyboardInterrupt
loop.create_task(asyn.Cancellable(meter, 3, 98, 'bass', 1500)()) asyncio.new_event_loop()
loop.create_task(asyn.Cancellable(flash, 1, 200)())
loop.create_task(asyn.Cancellable(flash, 2, 233)())
loop.run_until_complete(killer())

Wyświetl plik

@ -3,27 +3,8 @@
# Adafruit 1.5" 128*128 OLED display: https://www.adafruit.com/product/1431 # Adafruit 1.5" 128*128 OLED display: https://www.adafruit.com/product/1431
# Adafruit 1.27" 128*96 display https://www.adafruit.com/product/1673 # Adafruit 1.27" 128*96 display https://www.adafruit.com/product/1673
# The MIT License (MIT) # Copyright (c) 2020 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
# Copyright (c) 2018 Peter Hinch
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# WIRING (Adafruit pin nos and names) # WIRING (Adafruit pin nos and names)
# Pyb SSD # Pyb SSD
@ -43,15 +24,18 @@ import gc
from ssd1351 import SSD1351 as SSD from ssd1351 import SSD1351 as SSD
# Initialise hardware and framebuf before importing modules # Initialise hardware and framebuf before importing modules
pdc = machine.Pin('X1', machine.Pin.OUT_PP, value=0) #pdc = machine.Pin('X1', machine.Pin.OUT_PP, value=0)
pcs = machine.Pin('X2', machine.Pin.OUT_PP, value=1) #pcs = machine.Pin('X2', machine.Pin.OUT_PP, value=1)
prst = machine.Pin('X3', machine.Pin.OUT_PP, value=1) #prst = machine.Pin('X3', machine.Pin.OUT_PP, value=1)
spi = machine.SPI(1) #spi = machine.SPI(1)
pdc = machine.Pin('Y1', machine.Pin.OUT_PP, value=0)
pcs = machine.Pin('Y2', machine.Pin.OUT_PP, value=1)
prst = machine.Pin('Y3', machine.Pin.OUT_PP, value=1)
spi = machine.SPI(2)
gc.collect() # Precaution befor instantiating framebuf gc.collect() # Precaution befor instantiating framebuf
ssd = SSD(spi, pcs, pdc, prst, height) # Create a display instance ssd = SSD(spi, pcs, pdc, prst, height) # Create a display instance
import uasyncio as asyncio import uasyncio as asyncio
import asyn
import pyb import pyb
import uos import uos
from writer import CWriter from writer import CWriter
@ -66,66 +50,74 @@ RED = SSD.rgb(255, 0, 0)
YELLOW = SSD.rgb(255, 255, 0) YELLOW = SSD.rgb(255, 255, 0)
BLACK = 0 BLACK = 0
CWriter.set_textpos(ssd, 0, 0) # In case previous tests have altered it
wri = CWriter(ssd, arial10, GREEN, BLACK, verbose=False)
wri.set_clip(True, True, False)
color = lambda v : RED if v > 0.7 else YELLOW if v > 0.5 else GREEN color = lambda v : RED if v > 0.7 else YELLOW if v > 0.5 else GREEN
txt = lambda v : 'ovr' if v > 0.7 else 'high' if v > 0.5 else 'ok' txt = lambda v : 'ovr' if v > 0.7 else 'high' if v > 0.5 else 'ok'
@asyn.cancellable class MyMeter(Meter):
async def meter(bar, x, text): def __init__(self, x, text):
m = Meter(wri, 5, x, divisions = 4, ptcolor=YELLOW, CWriter.set_textpos(ssd, 0, 0) # In case previous tests have altered it
label=text, style=Meter.BAR, legends=('0.0', '0.5', '1.0')) wri = CWriter(ssd, arial10, GREEN, BLACK, verbose=False)
l = LED(wri, ssd.height - 16 - wri.height, x, bdcolor=YELLOW, label ='over') wri.set_clip(True, True, False)
super().__init__(wri, 5, x, divisions = 4, ptcolor=YELLOW, label=text,
style=Meter.BAR, legends=('0.0', '0.5', '1.0'))
self.led = LED(wri, ssd.height - 16 - wri.height, x, bdcolor=YELLOW, label ='over')
self.task = asyncio.create_task(self._run())
async def _run(self):
while True: while True:
v = int.from_bytes(uos.urandom(3),'little')/16777216 v = int.from_bytes(uos.urandom(3),'little')/16777216
m.value(v, color(v)) self.value(v, color(v))
l.color(color(v)) self.led.color(color(v))
l.text(txt(v), fgcolor=color(v)) self.led.text(txt(v), fgcolor=color(v))
# Demo would work if data acquisition was slow. But even with fast # Slow asynchronous data acquisition might occur here. Note
# running we pause here until all meters have updated and the main # that meters update themselves asynchronously (in a real
# task is ready to refresh the display. # application as data becomes available).
await bar await asyncio.sleep(v) # Demo variable times
@asyn.cancellable
async def flash(n, t): async def flash(n, t):
led = pyb.LED(n) led = pyb.LED(n)
try: try:
while True: while True:
led.toggle() led.toggle()
await asyncio.sleep_ms(t) await asyncio.sleep_ms(t)
except asyn.StopTask: except asyncio.CancelledError:
led.off() # Demo tidying up on cancellation. led.off() # Demo tidying up on cancellation.
class Killer:
def __init__(self):
self.sw = pyb.Switch()
async def wait(self, t):
while t >= 0:
if self.sw():
return True
await asyncio.sleep_ms(50)
t -= 50
return False
# The main task instantiates other tasks then does the display update process. # The main task instantiates other tasks then does the display update process.
@asyn.cancellable async def main():
async def main(loop):
print('Press Pyboard usr button to stop test.') print('Press Pyboard usr button to stop test.')
bar = asyn.Barrier(4, refresh, (ssd,))
# Asynchronously flash Pyboard LED's. Because we can. # Asynchronously flash Pyboard LED's. Because we can.
loop.create_task(asyn.Cancellable(flash, 1, 200)()) leds = [asyncio.create_task(flash(1, 200)), asyncio.create_task(flash(2, 233))]
loop.create_task(asyn.Cancellable(flash, 2, 233)())
# Task for each meter and GUI LED # Task for each meter and GUI LED
loop.create_task(asyn.Cancellable(meter, bar, 2, 'left')()) mtasks =[MyMeter(2, 'left').task, MyMeter(50, 'right').task, MyMeter(98, 'bass').task]
loop.create_task(asyn.Cancellable(meter, bar, 50, 'right')()) k = Killer()
loop.create_task(asyn.Cancellable(meter, bar, 98, 'bass')())
try:
while True: while True:
await asyncio.sleep_ms(800) if await k.wait(800): # Switch was pressed
# If necessary wait until all meters have updated. break
# Barrier callback updates display. refresh(ssd)
await bar for task in mtasks + leds:
except asyn.StopTask: task.cancel()
await asyncio.sleep_ms(0)
ssd.fill(0) # Clear display at end. ssd.fill(0) # Clear display at end.
refresh(ssd) refresh(ssd)
async def killer(): def test():
sw = pyb.Switch() try:
while not sw(): asyncio.run(main())
await asyncio.sleep_ms(100) finally: # Reset uasyncio case of KeyboardInterrupt
await asyn.Cancellable.cancel_all() asyncio.new_event_loop()
print('asnano_sync.test() to re-run test.')
loop = asyncio.get_event_loop() test()
loop.create_task(asyn.Cancellable(main, loop)())
loop.run_until_complete(killer())