Replace uasyncio with asyncio (code and docs).

pull/64/head
Peter Hinch 2024-05-12 11:19:11 +01:00
rodzic a487992b37
commit ff77aea008
20 zmienionych plików z 406 dodań i 310 usunięć

Wyświetl plik

@ -7,7 +7,7 @@
## Blocking
The suitability of `nanogui` for use with cooperative schedulers such as
`uasyncio` is constrained by the underlying display driver. The GUI supports
`asyncio` is constrained by the underlying display driver. The GUI supports
displays whose driver is subclassed from `framebuf`. Such drivers hold the
frame buffer on the host, transferring its entire contents to the display
hardware, usually via I2C or SPI. Current drivers block for the time taken by
@ -50,7 +50,7 @@ between devices having a `do_refresh` method and those that do not:
while True:
# Update widgets
if hasattr(ssd, 'do_refresh'):
# Option to reduce uasyncio latency
# Option to reduce asyncio latency
await ssd.do_refresh()
else:
# Normal synchronous call

Wyświetl plik

@ -425,10 +425,10 @@ The driver uses the `micropython.viper` decorator. If your platform does not
support this, the Viper code will need to be rewritten with a substantial hit
to performance.
#### Use with uasyncio
#### Use with asyncio
A full refresh blocks for ~200ms. If this is acceptable, no special precautions
are required. However this period may be unacceptable for some `uasyncio`
are required. However this period may be unacceptable for some `asyncio`
applications. The driver provides an asynchronous `do_refresh(split=4)` method.
If this is run the display will be refreshed, but will periodically yield to
the scheduler enabling other tasks to run. This is documented
@ -565,11 +565,11 @@ spi = SPI(1, 30_000_000, sck=Pin(10), mosi=Pin(11), miso=Pin(8))
ssd = SSD(spi, dc=pdc, cs=pcs, rst=prst, disp_mode=PORTRAIT | USD)
```
#### Use with uasyncio
#### Use with asyncio
Running the SPI bus at 60MHz a refresh blocks for 83ms (tested on a Pi Pico at
standard clock frequency). If the blocking period is acceptable, no special
precautions are required. This period may be unacceptable for some `uasyncio`
precautions are required. This period may be unacceptable for some `asyncio`
applications. Some may use lower SPI baudrates either for electrical reasons or
where the host cannot support high speeds, and some platforms may run Python
code at a different speed.
@ -788,10 +788,10 @@ The driver uses the `micropython.viper` decorator. If your platform does not
support this, the Viper code will need to be rewritten with a substantial hit
to performance.
#### Use with uasyncio
#### Use with asyncio
A full refresh blocks for ~220ms. If this is acceptable, no special precautions
are required. However this period may be unacceptable for some `uasyncio`
are required. However this period may be unacceptable for some `asyncio`
applications. The driver provides an asynchronous `do_refresh(split=4)` method.
If this is run the display will be refreshed, but will periodically yield to
the scheduler enabling other tasks to run. This is documented
@ -992,7 +992,7 @@ The standard refresh method blocks (monopolises the CPU) until refresh is
complete, adding an additional 2s delay. This enables the demo scripts to run
unchanged, with the 2s delay allowing the results to be seen before the next
refresh begins. This is fine for simple applications. The drivers also support
concurrency with `uasyncio`. Such applications can perform other tasks while a
concurrency with `asyncio`. Such applications can perform other tasks while a
refresh is in progress. See
[EPD Asynchronous support](./DRIVERS.md#6-epd-asynchronous-support).

Wyświetl plik

@ -82,7 +82,7 @@ display.
#### [The extras directory.](./extras/README.md)
The `extras` directory contains further widgets back-ported from
The `extras` directory contains further widgets back-ported from
[micro-gui](https://github.com/peterhinch/micropython-micro-gui) plus further
demos and information. The aim is to avoid this document becoming over long and
daunting to new users.
@ -106,7 +106,7 @@ without SPIRAM. Running on ESP8266 is possible but frozen bytecode must be used
owing to its restricted RAM - see
[Appendix 1 Freezing bytecode](./README.md#appendix-1-freezing-bytecode).
It uses synchronous code but is compatible with `uasyncio`. Some demo programs
It uses synchronous code but is compatible with `asyncio`. Some demo programs
illustrate this. Code is standard MicroPython, but some device drivers use the
`native` and `viper` decorators.
@ -132,7 +132,7 @@ firmware V1.17 or later.
26 Aug 2021 Support [PR7682](https://github.com/micropython/micropython/pull/7682)
for fast text rendering.
25 Apr 2021 Support TTGO T-Display.
26 Mar 2021 Add ST7789. Alter uasyncio support on ili9341.
26 Mar 2021 Add ST7789. Alter asyncio support on ili9341.
## 1.2 Description
@ -169,7 +169,7 @@ $ cd micropython-nano-gui
```
As supplied, `color_setup.py` assumes a Pyboard (1.x or D) connected to an
Adafruit 1.27" OLED as specified in that file. If that doesn't correspond to
your hardware, it should be edited to suit. See example files in the
your hardware, it should be edited to suit. See example files in the
`setup_examples` directory.
```bash
$ mpremote mount .
@ -291,8 +291,8 @@ Demos for larger displays.
* `aclock.py` Analog clock demo. Cross platform.
* `alevel.py` Spirit level using Pyboard accelerometer.
* `fpt.py` Plot demo. Cross platform.
* `scale.py` A demo of the `Scale` widget. Cross platform. Uses `uasyncio`.
* `asnano_sync.py` Two Pyboard specific demos using the GUI with `uasyncio`.
* `scale.py` A demo of the `Scale` widget. Cross platform. Uses `asyncio`.
* `asnano_sync.py` Two Pyboard specific demos using the GUI with `asyncio`.
* `asnano.py` Could readily be adapted for other targets.
* `tbox.py` Demo `Textbox` class. Cross-platform.
@ -309,8 +309,8 @@ Demos for Sharp displays:
* `clocktest.py` Digital and analog clock demo.
* `clock_batt.py` Low power demo of battery operated clock.
Usage with `uasyncio` is discussed [here](./ASYNC.md). In summary the GUI works
well with `uasyncio` but the blocking which occurs during transfer of the
Usage with `asyncio` is discussed [here](./ASYNC.md). In summary the GUI works
well with `asyncio` but the blocking which occurs during transfer of the
framebuffer to the display may affect more demanding applications. Some display
drivers have an additional asynchronous refresh method. This may optionally be
used to mitigate the resultant latency.
@ -362,7 +362,7 @@ copied to the hardware root as `color_setup.py`. Example files:
* `st7789_ttgo.py` Setup for the TTGO T-Display device.
* `waveshare_pyb.py` 176*274 ePaper display on Pyboard.
* `epd29_pyb_sync.py` Adafruit 2.9 inch ePaper display for synchronous code.
* `epd29_pyb_async.py` Adafruit 2.9 inch ePaper display: `uasyncio` applications.
* `epd29_pyb_async.py` Adafruit 2.9 inch ePaper display: `asyncio` applications.
## 2.2 Dependencies
@ -541,9 +541,9 @@ to transfer an entire buffer over SPI.
On ePaper displays `refresh` is both slow and visually intrusive, with the
display flashing repeatedly. This made them unsatisfactory for displaying
rapidly changing information. There is a new breed of ePaper display supporting
effective partial updates notably
effective partial updates notably
[the Waveshare Pico paper 4.2](https://www.waveshare.com/pico-epaper-4.2.htm).
This can be used in such roles and is discussed in
This can be used in such roles and is discussed in
[EPD Asynchronous support](./DRIVERS.md#6-epd-asynchronous-support).
###### [Contents](./README.md#contents)
@ -959,7 +959,7 @@ A further issue is that, by default, ESP8266 firmware does not support complex
numbers. This rules out the plot module and the `Dial` widget. It is possible
to turn on complex support in the build, but I haven't tried this.
I set out to run the `scale.py` and `textbox.py` demos as these use `uasyncio`
I set out to run the `scale.py` and `textbox.py` demos as these use `asyncio`
to create dynamic content, and the widgets themselves are relatively complex.
I froze a subset of the `drivers` and the `gui` directories. A subset minimises

Wyświetl plik

@ -1,10 +1,10 @@
# color_setup.py Customise for your hardware config
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2020 Peter Hinch
# Copyright (c) 2024 Peter Hinch
# As written, supports:
# ili9341 240x320 displays on Pi Pico
# gc9a01 240x240 circular display on Pi Pico
# Edit the driver import for other displays.
# Demo of initialisation procedure designed to minimise risk of memory fail
@ -27,7 +27,9 @@ import gc
# *** Choose your color display driver here ***
# ili9341 specific driver
from drivers.ili93xx.ili9341 import ILI9341 as SSD
from drivers.gc9a01.gc9a01 import GC9A01 as SSD
# from drivers.ili93xx.ili9341 import ILI9341 as SSD
pdc = Pin(8, Pin.OUT, value=0) # Arbitrary pins
prst = Pin(9, Pin.OUT, value=1)
@ -37,4 +39,5 @@ pcs = Pin(10, Pin.OUT, value=1)
gc.collect() # Precaution before instantiating framebuf
# See DRIVERS.md re overclocking the SPI bus
spi = SPI(0, sck=Pin(6), mosi=Pin(7), miso=Pin(4), baudrate=30_000_000)
ssd = SSD(spi, dc=pdc, cs=pcs, rst=prst)
ssd = SSD(spi, dc=pdc, cs=pcs, rst=prst, lscape=False, usd=False, mirror=False)
# ssd = SSD(spi, dc=pdc, cs=pcs, rst=prst, width=240)

Wyświetl plik

@ -13,10 +13,11 @@
# https://github.com/waveshare/e-Paper/blob/master/RaspberryPi_JetsonNano/python/lib/waveshare_epd/epd2in7.py ("official")
import framebuf
import uasyncio as asyncio
import asyncio
from time import sleep_ms, ticks_ms, ticks_us, ticks_diff
from drivers.boolpalette import BoolPalette
def asyncio_running():
try:
_ = asyncio.current_task()
@ -24,6 +25,7 @@ def asyncio_running():
return False
return True
class EPD(framebuf.FrameBuffer):
# A monochrome approach should be used for coding this. The rgb method ensures
# nothing breaks if users specify colors.
@ -44,7 +46,7 @@ class EPD(framebuf.FrameBuffer):
self.complete = asyncio.Event()
# Dimensions in pixels. Waveshare code is portrait mode.
# Public bound variables required by nanogui.
self.width = 264 if landscape else 176
self.width = 264 if landscape else 176
self.height = 176 if landscape else 264
self.demo_mode = False # Special mode enables demos to run
self._buffer = bytearray(self.height * self.width // 8)
@ -75,92 +77,97 @@ class EPD(framebuf.FrameBuffer):
sleep_ms(200)
# Initialisation
cmd = self._command
cmd(b'\x01', b'\x03\x00\x2B\x2B\x09') # POWER_SETTING: VDS_EN VDG_EN, VCOM_HV VGHL_LV[1] VGHL_LV[0], VDH, VDL, VDHR
cmd(b'\x06', b'\x07\x07\x17') # BOOSTER_SOFT_START
cmd(b'\xf8', b'\x60\xA5') # POWER_OPTIMIZATION
cmd(b'\xf8', b'\x89\xA5')
cmd(b'\xf8', b'\x90\x00')
cmd(b'\xf8', b'\x93\x2A')
cmd(b'\xf8', b'\xA0\xA5')
cmd(b'\xf8', b'\xA1\x00')
cmd(b'\xf8', b'\x73\x41')
cmd(b'\x16', b'\x00') # PARTIAL_DISPLAY_REFRESH
cmd(b'\x04') # POWER_ON
cmd(
b"\x01", b"\x03\x00\x2B\x2B\x09"
) # POWER_SETTING: VDS_EN VDG_EN, VCOM_HV VGHL_LV[1] VGHL_LV[0], VDH, VDL, VDHR
cmd(b"\x06", b"\x07\x07\x17") # BOOSTER_SOFT_START
cmd(b"\xf8", b"\x60\xA5") # POWER_OPTIMIZATION
cmd(b"\xf8", b"\x89\xA5")
cmd(b"\xf8", b"\x90\x00")
cmd(b"\xf8", b"\x93\x2A")
cmd(b"\xf8", b"\xA0\xA5")
cmd(b"\xf8", b"\xA1\x00")
cmd(b"\xf8", b"\x73\x41")
cmd(b"\x16", b"\x00") # PARTIAL_DISPLAY_REFRESH
cmd(b"\x04") # POWER_ON
self.wait_until_ready()
cmd(b'\x00', b'\xAF') # PANEL_SETTING: KW-BF, KWR-AF, BWROTP 0f
cmd(b'\x30', b'\x3A') # PLL_CONTROL: 3A 100HZ, 29 150Hz, 39 200HZ 31 171HZ
cmd(b'\x50', b'\x57') # Vcom and data interval setting (PGH)
cmd(b'\x82', b'\x12') # VCM_DC_SETTING_REGISTER
cmd(b"\x00", b"\xAF") # PANEL_SETTING: KW-BF, KWR-AF, BWROTP 0f
cmd(b"\x30", b"\x3A") # PLL_CONTROL: 3A 100HZ, 29 150Hz, 39 200HZ 31 171HZ
cmd(b"\x50", b"\x57") # Vcom and data interval setting (PGH)
cmd(b"\x82", b"\x12") # VCM_DC_SETTING_REGISTER
sleep_ms(2) # No delay in official code
# Set LUT. Local bytes objects reduce RAM usage.
# Values used by mcauser
#lut_vcom_dc =\
#b'\x00\x00\x00\x0F\x0F\x00\x00\x05\x00\x32\x32\x00\x00\x02\x00'\
#b'\x0F\x0F\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'\
#b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
#lut_ww =\
#b'\x50\x0F\x0F\x00\x00\x05\x60\x32\x32\x00\x00\x02\xA0\x0F\x0F'\
#b'\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'\
#b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' # R21H
#lut_bb =\
#b'\xA0\x0F\x0F\x00\x00\x05\x60\x32\x32\x00\x00\x02\x50\x0F\x0F'\
#b'\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'\
#b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' # R24H b
# lut_vcom_dc =\
# b'\x00\x00\x00\x0F\x0F\x00\x00\x05\x00\x32\x32\x00\x00\x02\x00'\
# b'\x0F\x0F\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'\
# b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
# lut_ww =\
# b'\x50\x0F\x0F\x00\x00\x05\x60\x32\x32\x00\x00\x02\xA0\x0F\x0F'\
# b'\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'\
# b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' # R21H
# lut_bb =\
# b'\xA0\x0F\x0F\x00\x00\x05\x60\x32\x32\x00\x00\x02\x50\x0F\x0F'\
# b'\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'\
# b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' # R24H b
# Values from official code:
lut_vcom_dc =\
b'\x00\x00\x00\x08\x00\x00\x00\x02\x60\x28\x28\x00\x00\x01\x00'\
b'\x14\x00\x00\x00\x01\x00\x12\x12\x00\x00\x01\x00\x00\x00\x00'\
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
lut_ww =\
b'\x40\x08\x00\x00\x00\x02\x90\x28\x28\x00\x00\x01\x40\x14\x00'\
b'\x00\x00\x01\xA0\x12\x12\x00\x00\x01\x00\x00\x00\x00\x00\x00'\
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
lut_bb =\
b'\x80\x08\x00\x00\x00\x02\x90\x28\x28\x00\x00\x01\x80\x14\x00'\
b'\x00\x00\x01\x50\x12\x12\x00\x00\x01\x00\x00\x00\x00\x00\x00'\
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
lut_vcom_dc = (
b"\x00\x00\x00\x08\x00\x00\x00\x02\x60\x28\x28\x00\x00\x01\x00"
b"\x14\x00\x00\x00\x01\x00\x12\x12\x00\x00\x01\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
lut_ww = (
b"\x40\x08\x00\x00\x00\x02\x90\x28\x28\x00\x00\x01\x40\x14\x00"
b"\x00\x00\x01\xA0\x12\x12\x00\x00\x01\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
lut_bb = (
b"\x80\x08\x00\x00\x00\x02\x90\x28\x28\x00\x00\x01\x80\x14\x00"
b"\x00\x00\x01\x50\x12\x12\x00\x00\x01\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
# Both agree on this:
lut_bw = lut_ww # R22H r
lut_wb = lut_bb # R23H w
cmd(b'\x20', lut_vcom_dc) # LUT_FOR_VCOM vcom
cmd(b'\x21', lut_ww) # LUT_WHITE_TO_WHITE ww --
cmd(b'\x22', lut_bw) # LUT_BLACK_TO_WHITE bw r
cmd(b'\x23', lut_bb) # LUT_WHITE_TO_BLACK wb w
cmd(b'\x24', lut_wb) # LUT_BLACK_TO_BLACK bb b
print('Init Done.')
cmd(b"\x20", lut_vcom_dc) # LUT_FOR_VCOM vcom
cmd(b"\x21", lut_ww) # LUT_WHITE_TO_WHITE ww --
cmd(b"\x22", lut_bw) # LUT_BLACK_TO_WHITE bw r
cmd(b"\x23", lut_bb) # LUT_WHITE_TO_BLACK wb w
cmd(b"\x24", lut_wb) # LUT_BLACK_TO_BLACK bb b
print("Init Done.")
def wait_until_ready(self):
sleep_ms(50)
t = ticks_ms()
while not self.ready():
while not self.ready():
sleep_ms(100)
dt = ticks_diff(ticks_ms(), t)
print('wait_until_ready {}ms {:5.1f}mins'.format(dt, dt/60_000))
print("wait_until_ready {}ms {:5.1f}mins".format(dt, dt / 60_000))
# For polling in asynchronous code. Just checks pin state.
# 0 == busy. Comment in official code is wrong. Code is correct.
def ready(self):
return not(self._as_busy or (self._busy() == 0)) # 0 == busy
return not (self._as_busy or (self._busy() == 0)) # 0 == busy
async def _as_show(self, buf1=bytearray(1)):
mvb = self._mvb
send = self._spi.write
cmd = self._command
cmd(b'\x10') # DATA_START_TRANSMISSION_1
cmd(b"\x10") # DATA_START_TRANSMISSION_1
self._dc(1) # For some reason don't need to deassert CS here
buf1[0] = 0xff
buf1[0] = 0xFF
t = ticks_ms()
for i in range(len(mvb)):
self._cs(0) # but do when copying the framebuf
send(buf1)
if not(i & 0x1f) and (ticks_diff(ticks_ms(), t) > 20):
if not (i & 0x1F) and (ticks_diff(ticks_ms(), t) > 20):
await asyncio.sleep_ms(0)
t = ticks_ms()
self._cs(1)
cmd(b'\x13') # DATA_START_TRANSMISSION_2 not in datasheet
cmd(b"\x13") # DATA_START_TRANSMISSION_2 not in datasheet
self._dc(1)
# Necessary to deassert CS after each byte otherwise display does not
@ -184,7 +191,7 @@ class EPD(framebuf.FrameBuffer):
if not vbc:
hpc += 1
idx = iidx + hpc
if not(i & 0x1f) and (ticks_diff(ticks_ms(), t) > 20):
if not (i & 0x1F) and (ticks_diff(ticks_ms(), t) > 20):
await asyncio.sleep_ms(0)
t = ticks_ms()
else:
@ -193,12 +200,12 @@ class EPD(framebuf.FrameBuffer):
buf1[0] = b # INVERSION HACK ~data
send(buf1)
self._cs(1)
if not(i & 0x1f) and (ticks_diff(ticks_ms(), t) > 20):
if not (i & 0x1F) and (ticks_diff(ticks_ms(), t) > 20):
await asyncio.sleep_ms(0)
t = ticks_ms()
self.updated.set() # framebuf has now been copied to the device
cmd(b'\x12') # DISPLAY_REFRESH
cmd(b"\x12") # DISPLAY_REFRESH
await asyncio.sleep(1)
while self._busy() == 0:
await asyncio.sleep_ms(200) # Don't release lock until update is complete
@ -209,7 +216,7 @@ class EPD(framebuf.FrameBuffer):
def show(self, buf1=bytearray(1)):
if asyncio_running():
if self._as_busy:
raise RuntimeError('Cannot refresh: display is busy.')
raise RuntimeError("Cannot refresh: display is busy.")
self._as_busy = True
self.updated.clear()
self.complete.clear()
@ -219,14 +226,14 @@ class EPD(framebuf.FrameBuffer):
mvb = self._mvb
send = self._spi.write
cmd = self._command
cmd(b'\x10') # DATA_START_TRANSMISSION_1
cmd(b"\x10") # DATA_START_TRANSMISSION_1
self._dc(1) # For some reason don't need to deassert CS here
buf1[0] = 0xff
buf1[0] = 0xFF
for i in range(len(mvb)):
self._cs(0) # but do when copying the framebuf
send(buf1)
self._cs(1)
cmd(b'\x13') # DATA_START_TRANSMISSION_2 not in datasheet
cmd(b"\x13") # DATA_START_TRANSMISSION_2 not in datasheet
self._dc(1)
# Necessary to deassert CS after each byte otherwise display does not
@ -256,51 +263,51 @@ class EPD(framebuf.FrameBuffer):
send(buf1)
self._cs(1)
cmd(b'\x12') # DISPLAY_REFRESH
cmd(b"\x12") # DISPLAY_REFRESH
te = ticks_us()
print('show time', ticks_diff(te, t)//1000, 'ms')
print("show time", ticks_diff(te, t) // 1000, "ms")
if not self.demo_mode:
# Immediate return to avoid blocking the whole application.
# User should wait for ready before calling refresh()
return
self.wait_until_ready()
sleep_ms(2000) # Give time for user to see result
# to wake call init()
def sleep(self):
self._as_busy = False
self.wait_until_ready()
cmd = self._command
cmd(b'\x50', b'\xf7') # From Waveshare code
cmd(b'\x02') # POWER_OFF
cmd(b'\x07', b'\xA5') # DEEP_SLEEP (Waveshare and mcauser)
cmd(b"\x50", b"\xf7") # From Waveshare code
cmd(b"\x02") # POWER_OFF
cmd(b"\x07", b"\xA5") # DEEP_SLEEP (Waveshare and mcauser)
self._rst(0) # According to schematic this turns off the power
# Testing connections by toggling pins connected to 40-way connector and checking volts on small connector
# All OK except rst: a 1 level produced only about 1.6V as against 3.3V for all other I/O.
# Further the level on the 40-way connector read 2.9V as agains 3.3V for others. Suspect hardware problem,
# ordered a second unit from Amazon.
#import machine
#import gc
# import machine
# import gc
#pdc = machine.Pin('Y1', machine.Pin.OUT_PP, value=0)
#pcs = machine.Pin('Y2', machine.Pin.OUT_PP, value=1)
#prst = machine.Pin('Y3', machine.Pin.OUT_PP, value=1)
#pbusy = machine.Pin('Y4', machine.Pin.IN)
# pdc = machine.Pin('Y1', machine.Pin.OUT_PP, value=0)
# pcs = machine.Pin('Y2', machine.Pin.OUT_PP, value=1)
# prst = machine.Pin('Y3', machine.Pin.OUT_PP, value=1)
# pbusy = machine.Pin('Y4', machine.Pin.IN)
## baudrate
## From https://github.com/mcauser/micropython-waveshare-epaper/blob/master/examples/2in9-hello-world/test.py 2MHz
## From https://github.com/waveshare/e-Paper/blob/master/RaspberryPi_JetsonNano/python/lib/waveshare_epd/epd2in7.py 4MHz
#spi = machine.SPI(2, baudrate=2_000_000)
#gc.collect() # Precaution before instantiating framebuf
#epd = EPD(spi, pcs, pdc, prst, pbusy) # Create a display instance
#sleep_ms(100)
#epd.init()
#print('Initialised')
#epd.fill(1) # 1 seems to be white
#epd.show()
#sleep_ms(1000)
#epd.fill(0)
#epd.show()
#epd._rst(0)
#epd._dc(0) # Turn off power according to RPI code
# spi = machine.SPI(2, baudrate=2_000_000)
# gc.collect() # Precaution before instantiating framebuf
# epd = EPD(spi, pcs, pdc, prst, pbusy) # Create a display instance
# sleep_ms(100)
# epd.init()
# print('Initialised')
# epd.fill(1) # 1 seems to be white
# epd.show()
# sleep_ms(1000)
# epd.fill(0)
# epd.show()
# epd._rst(0)
# epd._dc(0) # Turn off power according to RPI code

Wyświetl plik

@ -18,11 +18,12 @@
# Adfruit code transposing the axes.
import framebuf
import uasyncio as asyncio
import asyncio
from micropython import const
from time import sleep_ms, sleep_us, ticks_ms, ticks_us, ticks_diff
from drivers.boolpalette import BoolPalette
def asyncio_running():
try:
_ = asyncio.current_task()
@ -30,8 +31,10 @@ def asyncio_running():
return False
return True
MAX_BLOCK = const(20) # Maximum blocking time (ms) for asynchronous show.
class EPD(framebuf.FrameBuffer):
# A monochrome approach should be used for coding this. The rgb method ensures
# nothing breaks if users specify colors.
@ -96,43 +99,43 @@ class EPD(framebuf.FrameBuffer):
cmd = self._command
# Power setting. Data from Adafruit.
# Datasheet default \x03\x00\x26\x26\x03 - slightly different voltages.
cmd(b'\x01', b'\x03\x00\x2b\x2b\x09')
cmd(b"\x01", b"\x03\x00\x2b\x2b\x09")
# Booster soft start. Matches datasheet.
cmd(b'\x06', b'\x17\x17\x17')
cmd(b'\x04') # Power on
cmd(b"\x06", b"\x17\x17\x17")
cmd(b"\x04") # Power on
sleep_ms(200)
# Iss https://github.com/adafruit/Adafruit_CircuitPython_IL0373/issues/16
cmd(b'\x00', b'\x9f')
cmd(b"\x00", b"\x9f")
# CDI: As used by Adafruit. Datasheet is confusing on this.
# See https://github.com/adafruit/Adafruit_CircuitPython_IL0373/issues/11
# With 0x37 got white border on flexible display, black on FeatherWing
# 0xf7 still produced black border on FeatherWing
cmd(b'\x50', b'\x37')
cmd(b"\x50", b"\x37")
# PLL: correct for 150Hz as specified in Adafruit code
cmd(b'\x30', b'\x29')
cmd(b"\x30", b"\x29")
# Resolution 128w * 296h as required by IL0373
cmd(b'\x61', b'\x80\x01\x28') # Note hex(296) == 0x128
cmd(b"\x61", b"\x80\x01\x28") # Note hex(296) == 0x128
# Set VCM_DC. Now clarified with Adafruit.
# https://github.com/adafruit/Adafruit_CircuitPython_IL0373/issues/17
cmd(b'\x82', b'\x12') # Set Vcom to -1.0V
cmd(b"\x82", b"\x12") # Set Vcom to -1.0V
sleep_ms(50)
print('Init Done.')
print("Init Done.")
# For use in synchronous code: blocking wait on ready state.
def wait_until_ready(self):
sleep_ms(50)
while not self.ready():
while not self.ready():
sleep_ms(100)
# Return immediate status. Pin state: 0 == busy.
def ready(self):
return not(self._as_busy or (self._busy() == 0))
return not (self._as_busy or (self._busy() == 0))
async def _as_show(self, buf1=bytearray(1)):
mvb = self._mvb
cmd = self._command
dat = self._data
cmd(b'\x13')
cmd(b"\x13")
t = ticks_ms()
if self._lsc: # Landscape mode
wid = self.width
@ -150,21 +153,21 @@ class EPD(framebuf.FrameBuffer):
if not vbc:
hpc += 1
idx = iidx + hpc
if not(i & 0x0f) and (ticks_diff(ticks_ms(), t) > _MAX_BLOCK):
if not (i & 0x0F) and (ticks_diff(ticks_ms(), t) > _MAX_BLOCK):
await asyncio.sleep_ms(0)
t = ticks_ms()
else:
for i, b in enumerate(mvb):
buf1[0] = ~b
dat(buf1)
if not(i & 0x0f) and (ticks_diff(ticks_ms(), t) > _MAX_BLOCK):
if not (i & 0x0F) and (ticks_diff(ticks_ms(), t) > _MAX_BLOCK):
await asyncio.sleep_ms(0)
t = ticks_ms()
cmd(b'\x11') # Data stop
cmd(b"\x11") # Data stop
self.updated.set()
sleep_us(20) # Allow for data coming back: currently ignore this
cmd(b'\x12') # DISPLAY_REFRESH
cmd(b"\x12") # DISPLAY_REFRESH
# busy goes low now, for ~4.9 seconds.
await asyncio.sleep(1)
while self._busy() == 0:
@ -176,7 +179,7 @@ class EPD(framebuf.FrameBuffer):
def show(self, buf1=bytearray(1)):
if asyncio_running():
if self._as_busy:
raise RuntimeError('Cannot refresh: display is busy.')
raise RuntimeError("Cannot refresh: display is busy.")
self._as_busy = True # Immediate busy flag. Pin goes low much later.
self.updated.clear()
self.complete.clear()
@ -189,7 +192,7 @@ class EPD(framebuf.FrameBuffer):
# DATA_START_TRANSMISSION_2 Datasheet P31 indicates this sets
# busy pin low (True) and that it stays logically True until
# refresh is complete. In my testing this doesn't happen.
cmd(b'\x13')
cmd(b"\x13")
if self._lsc: # Landscape mode
wid = self.width
tbc = self.height // 8 # Vertical bytes per column
@ -211,9 +214,9 @@ class EPD(framebuf.FrameBuffer):
buf1[0] = ~b
dat(buf1)
cmd(b'\x11') # Data stop
cmd(b"\x11") # Data stop
sleep_us(20) # Allow for data coming back: currently ignore this
cmd(b'\x12') # DISPLAY_REFRESH
cmd(b"\x12") # DISPLAY_REFRESH
# 258ms to get here on Pyboard D
# Checking with scope, busy goes low now. For 4.9s.
if not self.demo_mode:
@ -229,8 +232,8 @@ class EPD(framebuf.FrameBuffer):
self.wait_until_ready()
cmd = self._command
# CDI: not sure about value or why we set this here. Copying Adafruit.
cmd(b'\x50', b'\x17')
cmd(b"\x50", b"\x17")
# Set VCM_DC. 0 is datasheet default.
cmd(b'\x82', b'\x00')
cmd(b"\x82", b"\x00")
# POWER_OFF. User code should pull ENA low to power down the display.
cmd(b'\x02')
cmd(b"\x02")

Wyświetl plik

@ -20,7 +20,7 @@
# You can run a demo for this driver by executing the demo script "epd29_sync.py"
import framebuf
import uasyncio as asyncio
import asyncio
from micropython import const
from time import sleep_ms, sleep_us, ticks_ms, ticks_us, ticks_diff
from gui.drivers.boolpalette import BoolPalette
@ -106,33 +106,33 @@ class EPD(framebuf.FrameBuffer):
# print("software reset successful")
# deriver output control
cmd(b'\x01', b'\x27\x01\x01')
cmd(b"\x01", b"\x27\x01\x01")
# data entry mode
cmd(b'\x11', b'\x01')
cmd(b"\x11", b"\x01")
# set ram-x addr start/end pos
cmd(b'\x44', b'\x00\x0F')
cmd(b"\x44", b"\x00\x0F")
# set ram-y addr start/end pos
cmd(b'\x45', b'\x27\x01\x00\x00')
cmd(b"\x45", b"\x27\x01\x00\x00")
# border waveform
cmd(b'\x3c', b'\x05')
cmd(b"\x3c", b"\x05")
# display update control
cmd(b'\x21', b'\x00\x80')
cmd(b"\x21", b"\x00\x80")
# set ram-x addr cnt to 0
cmd(b'\x4e', b'\x00')
cmd(b"\x4e", b"\x00")
# set ram-y addr cnt to 0x127
cmd(b'\x4F', b'\x27\x01')
cmd(b"\x4F", b"\x27\x01")
# set to use internal temperature sensor
cmd(b'\x18', b'\x80')
cmd(b"\x18", b"\x80")
'''
"""
# read from internal temperature sensor
self._dc(0)
self._spi.write(b'\x1B')
print(self._spi.read(2))
self._dc(1)
print(self._spi.read(2))
'''
"""
self.wait_until_ready()
# print('Init Done.')
@ -148,10 +148,13 @@ class EPD(framebuf.FrameBuffer):
return not (self._as_busy or (self._busy() == 1))
# draw the current frame memory.
def show(self, buf1=bytearray(1),
fast_refresh=False, # USELESS for this driver, doesn't work well at all
deepsleep_after_refresh=False,
lightsleep_while_waiting_for_refresh=False):
def show(
self,
buf1=bytearray(1),
fast_refresh=False, # USELESS for this driver, doesn't work well at all
deepsleep_after_refresh=False,
lightsleep_while_waiting_for_refresh=False,
):
if not self.ready():
# Hardware reset to exit deep sleep mode
self.hw_reset()
@ -160,7 +163,7 @@ class EPD(framebuf.FrameBuffer):
cmd = self._command
dat = self._data
cmd(b'\x24')
cmd(b"\x24")
if self._lsc: # Landscape mode
wid = self.width
tbc = self.height // 8 # Vertical bytes per column
@ -183,11 +186,11 @@ class EPD(framebuf.FrameBuffer):
dat(buf1)
if fast_refresh:
cmd(b'\x22', b'\xFF')
cmd(b"\x22", b"\xFF")
else:
cmd(b'\x22', b'\xF7')
cmd(b"\x22", b"\xF7")
sleep_us(20)
cmd(b'\x20') # DISPLAY_REFRESH
cmd(b"\x20") # DISPLAY_REFRESH
if lightsleep_while_waiting_for_refresh:
# set Pin hold=True is needed before entering lightsleep and after you must revert it back to hold=False
@ -198,6 +201,6 @@ class EPD(framebuf.FrameBuffer):
self.wait_until_ready()
if deepsleep_after_refresh:
cmd(b'\x10', b'\x01')
cmd(b"\x10", b"\x01")
else:
cmd(b'\x10', b'\x00')
cmd(b"\x10", b"\x00")

Wyświetl plik

@ -43,7 +43,7 @@
from machine import Pin, SPI
import framebuf
import time
import uasyncio as asyncio
import asyncio
from drivers.boolpalette import BoolPalette
@ -68,7 +68,8 @@ _BUSY_PIN = const(13)
# LUT elements vcom, ww, bw, wb, bb
# ****************************** full screen update LUT********************************* #
lut_full = (b"\x00\x08\x08\x00\x00\x02\x00\x0F\x0F\x00\x00\x01\x00\x08\x08\x00\
lut_full = (
b"\x00\x08\x08\x00\x00\x02\x00\x0F\x0F\x00\x00\x01\x00\x08\x08\x00\
\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\
\x00\x00\x00\x00\x00\x00",
b"\x50\x08\x08\x00\x00\x02\x90\x0F\x0F\x00\x00\x01\xA0\x08\x08\x00\x00\x02\

Wyświetl plik

@ -42,9 +42,10 @@
from machine import Pin, SPI
import framebuf
import time
import uasyncio as asyncio
import asyncio
from drivers.boolpalette import BoolPalette
def asyncio_running():
try:
_ = asyncio.current_task()
@ -52,6 +53,7 @@ def asyncio_running():
return False
return True
# Display resolution
_EPD_WIDTH = const(400)
_BWIDTH = _EPD_WIDTH // 4 # FB width in bytes (2 bits/pixel)
@ -93,7 +95,7 @@ def _lmap(dest: ptr8, source: ptr8, pattern: int, length: int):
for _ in range(2):
t = source[s]
for _ in range(4):
e |= ((pattern >> (t & 3)) & 1)
e |= (pattern >> (t & 3)) & 1
t >>= 2
e <<= 1
s += 1
@ -101,18 +103,20 @@ def _lmap(dest: ptr8, source: ptr8, pattern: int, length: int):
dest[d] = e >> 1
d += 1
# Color mapping.
# There is no LUT - colors.py creates 13 color constants which have 2-bit values determined
# by EPD.rgb(). These 2-bit values are written to the framebuf. The _lmap function produces
# 1-bit colors which are written to two buffers on the hardware. Each buffer is written using
# a different LUT so that grey values appear as 1 in one hardware buffer and 0 in the other.
class EPD(framebuf.FrameBuffer):
# The rgb method maps colors onto a 2-bit greyscale
# colors.py creates color constants with 2-bit colors which are written to FB
@staticmethod
def rgb(r, g, b):
return min((r + g + b) >> 7, 3) # Greyscale in range 0 <= gs <= 3
return min((r + g + b) >> 7, 3) # Greyscale in range 0 <= gs <= 3
# Discard asyn arg: autodetect
def __init__(self, spi=None, cs=None, dc=None, rst=None, busy=None, asyn=False):
@ -120,8 +124,8 @@ class EPD(framebuf.FrameBuffer):
self._busy_pin = Pin(_BUSY_PIN, Pin.IN, Pin.PULL_UP) if busy is None else busy
self._cs = Pin(_CS_PIN, Pin.OUT) if cs is None else cs
self._dc = Pin(_DC_PIN, Pin.OUT) if dc is None else dc
self._spi = SPI(1, sck = Pin(10), mosi = Pin(11), miso = Pin(28)) if spi is None else spi
self._spi.init(baudrate = 10_000_000) # Datasheet allows 10MHz
self._spi = SPI(1, sck=Pin(10), mosi=Pin(11), miso=Pin(28)) if spi is None else spi
self._spi.init(baudrate=10_000_000) # Datasheet allows 10MHz
self._busy = False # Set immediately on .show(). Cleared when busy pin is logically false (physically 1).
# Async API
self.updated = asyncio.Event()
@ -172,7 +176,7 @@ class EPD(framebuf.FrameBuffer):
def display_on(self):
self._command(b"\x12")
time.sleep_ms(100)
time.sleep_ms(100)
self.wait_until_ready()
def init(self):
@ -182,16 +186,19 @@ class EPD(framebuf.FrameBuffer):
self._command(b"\x06", b"\x17\x17\x17") # boost soft start
self._command(b"\x04") # POWER_ON
self.wait_until_ready()
self._command(b"\x00", b"\x3F") # panel setting. Works with BF and 3F, not with 1F or 2F. But black border.
self._command(
b"\x00", b"\x3F"
) # panel setting. Works with BF and 3F, not with 1F or 2F. But black border.
# KW-BF KWR-AF BWROTP 0f BWOTP 1f PGH was 0xBF
self._command(b"\x30", b"\x3C") # PLL setting
self._command(b"\x61", b"\x01\x90\x01\x2C") # resolution setting
self._command(b"\x82", b"\x12") # vcom_DC setting PGH 0x28 in normal driver
self._command(b"\x50", b"\x57") # VCOM AND DATA INTERVAL SETTING PGH 97 black border 57 white border
self._command(
b"\x50", b"\x57"
) # VCOM AND DATA INTERVAL SETTING PGH 97 black border 57 white border
self.set_grey() # Greyscale LUT
def set_grey(self):
self._command(b"\x20", EPD_grey_lut_vcom)
self._command(b"\x21", EPD_grey_lut_ww)
@ -202,12 +209,14 @@ class EPD(framebuf.FrameBuffer):
def wait_until_ready(self):
while not self.ready():
time.sleep_ms(100)
time.sleep_ms(100)
def set_partial(self): # Allow demos to run
pass
def set_full(self):
pass
# For polling in asynchronous code. Just checks pin state.
# 0 == busy. Comment in official code is wrong. Code is correct.
def ready(self):
@ -246,12 +255,12 @@ class EPD(framebuf.FrameBuffer):
self.complete.set()
async def do_refresh(self, split): # For micro-gui
assert (not self._busy), "Refresh while busy"
assert not self._busy, "Refresh while busy"
await self._as_show() # split=5
def show(self): # nanogui
if self._busy:
raise RuntimeError('Cannot refresh: display is busy.')
raise RuntimeError("Cannot refresh: display is busy.")
self._busy = True # Immediate busy flag. Pin goes low much later.
if asyncio_running():
self.updated.clear()
@ -281,6 +290,6 @@ class EPD(framebuf.FrameBuffer):
time.sleep_ms(2000) # Give time for user to see result
def sleep(self):
# self._command(b"\x02") # power off
# self.wait_until_ready()
# self._command(b"\x02") # power off
# self.wait_until_ready()
self._command(b"\x07", b"\xA5") # deep sleep

Wyświetl plik

@ -11,7 +11,7 @@
from time import sleep_ms
import gc
import framebuf
import uasyncio as asyncio
import asyncio
from drivers.boolpalette import BoolPalette

Wyświetl plik

@ -15,7 +15,7 @@
from time import sleep_ms
import gc
import framebuf
import uasyncio as asyncio
import asyncio
from drivers.boolpalette import BoolPalette
# Portrait mode

Wyświetl plik

@ -20,7 +20,7 @@ from time import sleep_ms # , ticks_us, ticks_diff
import framebuf
import gc
import micropython
import uasyncio as asyncio
import asyncio
from drivers.boolpalette import BoolPalette
# User orientation constants

Wyświetl plik

@ -14,7 +14,7 @@ ssd = SSD() #asyn=True) # Create a display instance. See link for meaning of a
"""
from color_setup import ssd
import uasyncio as asyncio
import asyncio
import time
from gui.core.writer import Writer
from gui.core.nanogui import refresh
@ -26,6 +26,7 @@ epaper = hasattr(ssd, "wait_until_ready")
if epaper and not hasattr(ssd, "set_partial"):
raise OSError("ePaper display does not support partial update.")
async def test():
wri = Writer(ssd, font, verbose=False)
wri.set_clip(True, True, False) # Clip to screen, no wrap
@ -54,6 +55,7 @@ async def test():
await ssd.complete.wait()
await asyncio.sleep(10)
try:
asyncio.run(test())
finally:

Wyświetl plik

@ -19,6 +19,7 @@ from color_setup import ssd # Create a display instance
from gui.core.nanogui import refresh
from gui.widgets.label import Label
from gui.widgets.dial import Dial, Pointer
refresh(ssd, True) # Initialise and clear display.
# Now import other modules
@ -26,33 +27,37 @@ import cmath
import time
from gui.core.writer import CWriter
from machine import RTC
import uasyncio as asyncio
import asyncio
import ntptime
import do_connect # WiFi connction script
# Font for CWriter
import gc
gc.collect()
import gui.fonts.freesans20 as font
from gui.core.colors import *
bst = False
def gbtime(now):
global bst
bst = False
year = time.localtime(now)[0] # get current year
# Time of March change to BST
t_march = time.mktime((year, 3, (31 - (int(5*year/4 + 4)) % 7), 1, 0, 0, 0, 0, 0))
t_march = time.mktime((year, 3, (31 - (int(5 * year / 4 + 4)) % 7), 1, 0, 0, 0, 0, 0))
# Time of October change to UTC
t_october = time.mktime((year, 10, (31 - (int(5*year/4 + 1)) % 7), 1, 0, 0, 0, 0, 0))
t_october = time.mktime((year, 10, (31 - (int(5 * year / 4 + 1)) % 7), 1, 0, 0, 0, 0, 0))
if now < t_march: # we are before last sunday of march
gbt = time.localtime(now) # UTC
gbt = time.localtime(now) # UTC
elif now < t_october: # we are before last sunday of october
gbt = time.localtime(now + 3600) # BST: UTC+1H
gbt = time.localtime(now + 3600) # BST: UTC+1H
bst = True
else: # we are after last sunday of october
gbt = time.localtime(now) # UTC
return(gbt)
gbt = time.localtime(now) # UTC
return gbt
async def set_rtc():
rtc = RTC()
@ -62,38 +67,51 @@ async def set_rtc():
try:
t = ntptime.time()
except OSError:
print('ntp timeout')
print("ntp timeout")
await asyncio.sleep(5)
s = gbtime(t) # Convert UTC to local (GB) time
t0 = time.time()
rtc.datetime(s[0:3] + (0,) + s[3:6] + (0,))
print('RTC was set, delta =', time.time() - t0)
print("RTC was set, delta =", time.time() - t0)
await asyncio.sleep(600)
async def ramcheck():
while True:
gc.collect()
print('Free RAM:',gc.mem_free())
print("Free RAM:", gc.mem_free())
await asyncio.sleep(600)
async def aclock():
do_connect.do_connect()
asyncio.create_task(set_rtc())
asyncio.create_task(ramcheck())
uv = lambda phi : cmath.rect(1, phi) # Return a unit vector of phase phi
uv = lambda phi: cmath.rect(1, phi) # Return a unit vector of phase phi
pi = cmath.pi
days = ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday',
'Sunday')
months = ('January', 'February', 'March', 'April', 'May', 'June', 'July',
'August', 'September', 'October', 'November', 'December')
days = ("Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday")
months = (
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
)
# Instantiate CWriter
CWriter.set_textpos(ssd, 0, 0) # In case previous tests have altered it
wri = CWriter(ssd, font, GREEN, BLACK, verbose=False)
wri.set_clip(True, True, False)
# Instantiate displayable objects
dial = Dial(wri, 2, 2, height = 130, ticks = 12, bdcolor=None) # Border in fg color
dial = Dial(wri, 2, 2, height=130, ticks=12, bdcolor=None) # Border in fg color
lbltim = Label(wri, 140, 2, 130)
lblday = Label(wri, 170, 2, 130)
lblmonth = Label(wri, 190, 2, 130)
@ -102,18 +120,18 @@ async def aclock():
mins = Pointer(dial)
secs = Pointer(dial)
hstart = 0 + 0.7j # Pointer lengths and position at top
hstart = 0 + 0.7j # Pointer lengths and position at top
mstart = 0 + 0.92j
sstart = 0 + 0.92j
t = time.localtime()
while True:
hrs.value(hstart * uv(-t[3]*pi/6 - t[4]*pi/360), YELLOW)
mins.value(mstart * uv(-t[4] * pi/30 - t[5] * pi/1800), YELLOW)
secs.value(sstart * uv(-t[5] * pi/30), RED)
lbltim.value('{:02d}.{:02d}.{:02d} {}'.format(t[3], t[4], t[5], 'BST' if bst else 'UTC'))
lblday.value('{}'.format(days[t[6]]))
lblmonth.value('{} {}'.format(t[2], months[t[1] - 1]))
lblyr.value('{}'.format(t[0]))
hrs.value(hstart * uv(-t[3] * pi / 6 - t[4] * pi / 360), YELLOW)
mins.value(mstart * uv(-t[4] * pi / 30 - t[5] * pi / 1800), YELLOW)
secs.value(sstart * uv(-t[5] * pi / 30), RED)
lbltim.value("{:02d}.{:02d}.{:02d} {}".format(t[3], t[4], t[5], "BST" if bst else "UTC"))
lblday.value("{}".format(days[t[6]]))
lblmonth.value("{} {}".format(t[2], months[t[1] - 1]))
lblyr.value("{}".format(t[0]))
refresh(ssd)
st = t
while st == t:
@ -125,4 +143,3 @@ try:
asyncio.run(aclock())
finally:
_ = asyncio.new_event_loop()

Wyświetl plik

@ -1,4 +1,4 @@
# asnano.py Test/demo program for use of nanogui with uasyncio
# asnano.py Test/demo program for use of nanogui with asyncio
# Uses Adafruit ssd1351-based OLED displays (change height to suit)
# Adafruit 1.5" 128*128 OLED display: https://www.adafruit.com/product/1431
# Adafruit 1.27" 128*96 display https://www.adafruit.com/product/1673
@ -9,7 +9,7 @@
# Initialise hardware and framebuf before importing modules.
from color_setup import ssd # Create a display instance
import uasyncio as asyncio
import asyncio
import pyb
import uos
from gui.core.writer import CWriter
@ -29,28 +29,39 @@ CWriter.set_textpos(ssd, 0, 0) # In case previous tests have altered it
wri = CWriter(ssd, arial10, GREEN, BLACK, verbose=False)
wri.set_clip(True, True, False)
color = lambda v : RED if v > 0.7 else YELLOW if v > 0.5 else GREEN
txt = lambda v : 'ovr' if v > 0.7 else 'high' if v > 0.5 else 'ok'
color = lambda v: RED if v > 0.7 else YELLOW if v > 0.5 else GREEN
txt = lambda v: "ovr" if v > 0.7 else "high" if v > 0.5 else "ok"
async def meter(n, x, text, t):
print('Meter {} test.'.format(n))
m = Meter(wri, 5, x, divisions = 4, ptcolor=YELLOW,
label=text, style=Meter.BAR, legends=('0.0', '0.5', '1.0'))
l = LED(wri, ssd.height - 16 - wri.height, x, bdcolor=YELLOW, label ='over')
print("Meter {} test.".format(n))
m = Meter(
wri,
5,
x,
divisions=4,
ptcolor=YELLOW,
label=text,
style=Meter.BAR,
legends=("0.0", "0.5", "1.0"),
)
l = LED(wri, ssd.height - 16 - wri.height, x, bdcolor=YELLOW, label="over")
while True:
v = int.from_bytes(uos.urandom(3),'little')/16777216
v = int.from_bytes(uos.urandom(3), "little") / 16777216
m.value(v, color(v))
l.color(color(v))
l.text(txt(v), fgcolor=color(v))
refresh(ssd)
await asyncio.sleep_ms(t)
async def flash(n, t):
led = pyb.LED(n)
while True:
led.toggle()
await asyncio.sleep_ms(t)
async def killer(tasks):
sw = pyb.Switch()
while not sw():
@ -58,17 +69,19 @@ async def killer(tasks):
for task in tasks:
task.cancel()
async def main():
tasks = []
tasks.append(asyncio.create_task(meter(1, 2, 'left', 700)))
tasks.append(asyncio.create_task(meter(2, 50, 'right', 1000)))
tasks.append(asyncio.create_task(meter(3, 98, 'bass', 1500)))
tasks.append(asyncio.create_task(meter(1, 2, "left", 700)))
tasks.append(asyncio.create_task(meter(2, 50, "right", 1000)))
tasks.append(asyncio.create_task(meter(3, 98, "bass", 1500)))
tasks.append(asyncio.create_task(flash(1, 200)))
tasks.append(asyncio.create_task(flash(2, 233)))
await killer(tasks)
print('Press Pyboard usr button to stop test.')
print("Press Pyboard usr button to stop test.")
try:
asyncio.run(main())
finally: # Reset uasyncio case of KeyboardInterrupt
finally: # Reset asyncio case of KeyboardInterrupt
asyncio.new_event_loop()

Wyświetl plik

@ -1,4 +1,4 @@
# asnano_sync.py Test/demo program for use of nanogui with uasyncio
# asnano_sync.py Test/demo program for use of nanogui with asyncio
# Requires Pyboard for switch and LEDs.
# Tested with Adafruit ssd1351 OLED display.
@ -8,7 +8,7 @@
# Initialise hardware and framebuf before importing modules
from color_setup import ssd # Create a display instance
import uasyncio as asyncio
import asyncio
import pyb
import uos
from gui.core.writer import CWriter
@ -24,22 +24,31 @@ import gui.fonts.freesans20 as freesans20
from gui.core.colors import *
color = lambda v : RED if v > 0.7 else YELLOW if v > 0.5 else GREEN
txt = lambda v : 'ovr' if v > 0.7 else 'high' if v > 0.5 else 'ok'
color = lambda v: RED if v > 0.7 else YELLOW if v > 0.5 else GREEN
txt = lambda v: "ovr" if v > 0.7 else "high" if v > 0.5 else "ok"
class MyMeter(Meter):
def __init__(self, x, text):
CWriter.set_textpos(ssd, 0, 0) # In case previous tests have altered it
wri = CWriter(ssd, arial10, GREEN, BLACK, verbose=False)
wri.set_clip(True, True, False)
super().__init__(wri, 5, x, divisions = 4, ptcolor=YELLOW, label=text,
style=Meter.BAR, legends=('0.0', '0.5', '1.0'))
self.led = LED(wri, ssd.height - 16 - wri.height, x, bdcolor=YELLOW, label ='over')
super().__init__(
wri,
5,
x,
divisions=4,
ptcolor=YELLOW,
label=text,
style=Meter.BAR,
legends=("0.0", "0.5", "1.0"),
)
self.led = LED(wri, ssd.height - 16 - wri.height, x, bdcolor=YELLOW, label="over")
self.task = asyncio.create_task(self._run())
async def _run(self):
while True:
v = int.from_bytes(uos.urandom(3),'little')/16777216
v = int.from_bytes(uos.urandom(3), "little") / 16777216
self.value(v, color(v))
self.led.color(color(v))
self.led.text(txt(v), fgcolor=color(v))
@ -48,6 +57,7 @@ class MyMeter(Meter):
# application as data becomes available).
await asyncio.sleep(v) # Demo variable times
async def flash(n, t):
led = pyb.LED(n)
try:
@ -57,6 +67,7 @@ async def flash(n, t):
except asyncio.CancelledError:
led.off() # Demo tidying up on cancellation.
class Killer:
def __init__(self):
self.sw = pyb.Switch()
@ -69,13 +80,14 @@ class Killer:
t -= 50
return False
# The main task instantiates other tasks then does the display update process.
async def main():
print('Press Pyboard usr button to stop test.')
print("Press Pyboard usr button to stop test.")
# Asynchronously flash Pyboard LED's. Because we can.
leds = [asyncio.create_task(flash(1, 200)), asyncio.create_task(flash(2, 233))]
# Task for each meter and GUI LED
mtasks =[MyMeter(2, 'left').task, MyMeter(50, 'right').task, MyMeter(98, 'bass').task]
mtasks = [MyMeter(2, "left").task, MyMeter(50, "right").task, MyMeter(98, "bass").task]
k = Killer()
while True:
if await k.wait(800): # Switch was pressed
@ -87,11 +99,13 @@ async def main():
ssd.fill(0) # Clear display at end.
refresh(ssd)
def test():
try:
asyncio.run(main())
finally: # Reset uasyncio case of KeyboardInterrupt
finally: # Reset asyncio case of KeyboardInterrupt
asyncio.new_event_loop()
print('asnano_sync.test() to re-run test.')
print("asnano_sync.test() to re-run test.")
test()

Wyświetl plik

@ -5,8 +5,9 @@
# color_setup must set landcsape True, asyn True and must not set demo_mode
from cmath import exp, pi
import uasyncio as asyncio
import asyncio
from color_setup import ssd
# On a monochrome display Writer is more efficient than CWriter.
from gui.core.writer import Writer
from gui.core.nanogui import refresh
@ -21,23 +22,27 @@ import gui.fonts.font6 as small
ssd._asyn = True # HACK to make it config agnostic
# Some ports don't support uos.urandom.
# See https://github.com/peterhinch/micropython-samples/tree/master/random
def xorshift64star(modulo, seed = 0xf9ac6ba4):
def xorshift64star(modulo, seed=0xF9AC6BA4):
x = seed
def func():
nonlocal x
x ^= x >> 12
x ^= ((x << 25) & 0xffffffffffffffff) # modulo 2**64
x ^= (x << 25) & 0xFFFFFFFFFFFFFFFF # modulo 2**64
x ^= x >> 27
return (x * 0x2545F4914F6CDD1D) % modulo
return func
async def compass(evt):
wri = Writer(ssd, arial10, verbose=False)
wri.set_clip(False, False, False)
v1 = 0 + 0.9j
v2 = exp(0 - (pi / 6) * 1j)
dial = Dial(wri, 5, 5, height = 75, ticks = 12, bdcolor=None,
label='Direction', style = Dial.COMPASS)
dial = Dial(
wri, 5, 5, height=75, ticks=12, bdcolor=None, label="Direction", style=Dial.COMPASS
)
ptr = Pointer(dial)
while True:
ptr.value(v1)
@ -53,45 +58,44 @@ async def multi_fields(evt):
dy = small.height() + 10
row = 2
col = 100
width = wri.stringlen('99.990')
for txt in ('X:', 'Y:', 'Z:'):
width = wri.stringlen("99.990")
for txt in ("X:", "Y:", "Z:"):
Label(wri, row, col, txt)
nfields.append(Label(wri, row, col, width, bdcolor=None)) # Draw border
row += dy
random = xorshift64star(2**24 - 1)
random = xorshift64star(2 ** 24 - 1)
while True:
for _ in range(10):
for field in nfields:
value = random() / 167772
field.value('{:5.2f}'.format(value))
field.value("{:5.2f}".format(value))
await evt.wait()
async def meter(evt):
wri = Writer(ssd, arial10, verbose=False)
wri.set_clip(False, False, False)
row = 10
col = 170
args = {'height' : 80,
'width' : 15,
'divisions' : 4,
'style' : Meter.BAR}
m0 = Meter(wri, row, col, legends=('0.0', '0.5', '1.0'), **args)
m1 = Meter(wri, row, col + 40, legends=('-1', '0', '+1'), **args)
m2 = Meter(wri, row, col + 80, legends=('-1', '0', '+1'), **args)
random = xorshift64star(2**24 - 1)
args = {"height": 80, "width": 15, "divisions": 4, "style": Meter.BAR}
m0 = Meter(wri, row, col, legends=("0.0", "0.5", "1.0"), **args)
m1 = Meter(wri, row, col + 40, legends=("-1", "0", "+1"), **args)
m2 = Meter(wri, row, col + 80, legends=("-1", "0", "+1"), **args)
random = xorshift64star(2 ** 24 - 1)
while True:
steps = 10
for n in range(steps + 1):
m0.value(random() / 16777216)
m1.value(n/steps)
m2.value(1 - n/steps)
m1.value(n / steps)
m2.value(1 - n / steps)
await evt.wait()
async def main():
refresh(ssd, True) # Clear display
await ssd.wait()
print('Ready')
print("Ready")
evt = asyncio.Event()
asyncio.create_task(meter(evt))
asyncio.create_task(multi_fields(evt))
@ -106,12 +110,12 @@ async def main():
evt.set()
evt.clear()
await asyncio.sleep(10) # Allow for slow refresh
tstr = '''Test of asynchronous code updating the EPD. This should
tstr = """Test of asynchronous code updating the EPD. This should
not be run for long periods as the EPD should not be updated more
frequently than every 180s.
'''
"""
print(tstr)
@ -119,7 +123,7 @@ try:
asyncio.run(main())
except KeyboardInterrupt:
# Defensive code: avoid leaving EPD hardware in an undefined state.
print('Waiting for display to become idle')
print("Waiting for display to become idle")
ssd.sleep() # Synchronous code. May block for 5s if display is updating.
finally:
_ = asyncio.new_event_loop()

Wyświetl plik

@ -7,7 +7,7 @@
# Copyright (c) 2020-2022 Peter Hinch
# color_setup must set landcsape False, asyn True and must not set demo_mode
import uasyncio as asyncio
import asyncio
from color_setup import ssd
from gui.core.writer import Writer
from gui.core.nanogui import refresh
@ -21,32 +21,36 @@ import gui.fonts.font6 as small
# Some ports don't support uos.urandom.
# See https://github.com/peterhinch/micropython-samples/tree/master/random
def xorshift64star(modulo, seed = 0xf9ac6ba4):
def xorshift64star(modulo, seed=0xF9AC6BA4):
x = seed
def func():
nonlocal x
x ^= x >> 12
x ^= ((x << 25) & 0xffffffffffffffff) # modulo 2**64
x ^= (x << 25) & 0xFFFFFFFFFFFFFFFF # modulo 2**64
x ^= x >> 27
return (x * 0x2545F4914F6CDD1D) % modulo
return func
async def fields(evt):
wri = Writer(ssd, fixed, verbose=False)
wri.set_clip(False, False, False)
textfield = Label(wri, 0, 2, wri.stringlen('longer'))
numfield = Label(wri, 25, 2, wri.stringlen('99.990'), bdcolor=None)
countfield = Label(wri, 0, 90, wri.stringlen('1'))
textfield = Label(wri, 0, 2, wri.stringlen("longer"))
numfield = Label(wri, 25, 2, wri.stringlen("99.990"), bdcolor=None)
countfield = Label(wri, 0, 90, wri.stringlen("1"))
n = 1
random = xorshift64star(65535)
while True:
for s in ('short', 'longer', '1', ''):
for s in ("short", "longer", "1", ""):
textfield.value(s)
numfield.value('{:5.2f}'.format(random() /1000))
countfield.value('{:1d}'.format(n))
numfield.value("{:5.2f}".format(random() / 1000))
countfield.value("{:1d}".format(n))
n += 1
await evt.wait()
async def multi_fields(evt):
wri = Writer(ssd, small, verbose=False)
wri.set_clip(False, False, False)
@ -55,45 +59,44 @@ async def multi_fields(evt):
dy = small.height() + 10
y = 80
col = 20
width = wri.stringlen('99.990')
for txt in ('X:', 'Y:', 'Z:'):
width = wri.stringlen("99.990")
for txt in ("X:", "Y:", "Z:"):
Label(wri, y, 0, txt)
nfields.append(Label(wri, y, col, width, bdcolor=None)) # Draw border
y += dy
random = xorshift64star(2**24 - 1)
random = xorshift64star(2 ** 24 - 1)
while True:
for _ in range(10):
for field in nfields:
value = random() / 167772
field.value('{:5.2f}'.format(value))
field.value("{:5.2f}".format(value))
await evt.wait()
async def meter(evt):
wri = Writer(ssd, arial10, verbose=False)
args = {'height' : 80,
'width' : 15,
'divisions' : 4,
'style' : Meter.BAR}
m0 = Meter(wri, 165, 2, legends=('0.0', '0.5', '1.0'), **args)
m1 = Meter(wri, 165, 62, legends=('-1', '0', '+1'), **args)
m2 = Meter(wri, 165, 122, legends=('-1', '0', '+1'), **args)
random = xorshift64star(2**24 - 1)
args = {"height": 80, "width": 15, "divisions": 4, "style": Meter.BAR}
m0 = Meter(wri, 165, 2, legends=("0.0", "0.5", "1.0"), **args)
m1 = Meter(wri, 165, 62, legends=("-1", "0", "+1"), **args)
m2 = Meter(wri, 165, 122, legends=("-1", "0", "+1"), **args)
random = xorshift64star(2 ** 24 - 1)
while True:
steps = 10
for n in range(steps + 1):
m0.value(random() / 16777216)
m1.value(n/steps)
m2.value(1 - n/steps)
m1.value(n / steps)
m2.value(1 - n / steps)
await evt.wait()
async def main():
# ssd.fill(1)
# ssd.show()
# await ssd.wait()
# ssd.fill(1)
# ssd.show()
# await ssd.wait()
refresh(ssd, True) # Clear display
await ssd.wait()
print('Ready')
print("Ready")
evt = asyncio.Event()
asyncio.create_task(meter(evt))
asyncio.create_task(multi_fields(evt))
@ -108,20 +111,20 @@ async def main():
evt.set()
evt.clear()
await asyncio.sleep(9) # Allow for slow refresh
tstr = '''Runs the following tests, updates every 10s
tstr = """Runs the following tests, updates every 10s
fields() Label test with dynamic data.
multi_fields() More Labels.
meter() Demo of Meter object.
'''
"""
print(tstr)
try:
asyncio.run(main())
except KeyboardInterrupt:
print('Waiting for display to become idle')
print("Waiting for display to become idle")
ssd.wait_until_ready() # Synchronous code
finally:
_ = asyncio.new_event_loop()

Wyświetl plik

@ -7,7 +7,7 @@
# import gui.demos.scale
# Initialise hardware and framebuf before importing modules.
# Uses uasyncio and also the asynchronous do_refresh method if the driver
# Uses asyncio and also the asynchronous do_refresh method if the driver
# supports it.
from color_setup import ssd # Create a display instance
@ -15,7 +15,7 @@ from color_setup import ssd # Create a display instance
from gui.core.nanogui import refresh
from gui.core.writer import CWriter
import uasyncio as asyncio
import asyncio
from gui.core.colors import *
import gui.fonts.arial10 as arial10
from gui.widgets.label import Label
@ -32,10 +32,11 @@ async def radio(scale):
for _ in range(steps):
cv += delta
# Map user variable to -1.0..+1.0
scale.value(2 * (cv - 88)/(108 - 88) - 1)
scale.value(2 * (cv - 88) / (108 - 88) - 1)
await asyncio.sleep_ms(200)
val, cv = v2, v1
async def default(scale, lbl):
cv = -1.0 # Current
val = 1.0
@ -46,9 +47,9 @@ async def default(scale, lbl):
for _ in range(steps):
cv += delta
scale.value(cv)
lbl.value('{:4.3f}'.format(cv))
if hasattr(ssd, 'do_refresh'):
# Option to reduce uasyncio latency
lbl.value("{:4.3f}".format(cv))
if hasattr(ssd, "do_refresh"):
# Option to reduce asyncio latency
await ssd.do_refresh()
else:
# Normal synchronous call
@ -64,23 +65,27 @@ def test():
if f < -0.8:
return BLUE
return c
def legendcb(f):
return '{:2.0f}'.format(88 + ((f + 1) / 2) * (108 - 88))
return "{:2.0f}".format(88 + ((f + 1) / 2) * (108 - 88))
refresh(ssd, True) # Initialise and clear display.
CWriter.set_textpos(ssd, 0, 0) # In case previous tests have altered it
wri = CWriter(ssd, arial10, GREEN, BLACK, verbose=False)
wri.set_clip(True, True, False)
scale1 = Scale(wri, 2, 2, width = 124, legendcb = legendcb,
pointercolor=RED, fontcolor=YELLOW)
scale1 = Scale(wri, 2, 2, width=124, legendcb=legendcb, pointercolor=RED, fontcolor=YELLOW)
asyncio.create_task(radio(scale1))
lbl = Label(wri, ssd.height - wri.height - 2, 2, 50,
bgcolor = DARKGREEN, bdcolor = RED, fgcolor=WHITE)
lbl = Label(
wri, ssd.height - wri.height - 2, 2, 50, bgcolor=DARKGREEN, bdcolor=RED, fgcolor=WHITE
)
# do_refresh is called with arg 4. In landscape mode this splits screen
# into segments of 240/4=60 lines. Here we ensure a scale straddles
# this boundary
scale = Scale(wri, 55, 2, width = 124, tickcb = tickcb,
pointercolor=RED, fontcolor=YELLOW, bdcolor=CYAN)
scale = Scale(
wri, 55, 2, width=124, tickcb=tickcb, pointercolor=RED, fontcolor=YELLOW, bdcolor=CYAN
)
asyncio.run(default(scale, lbl))
test()

Wyświetl plik

@ -12,7 +12,7 @@ from color_setup import ssd # Create a display instance
from gui.core.nanogui import refresh
from gui.core.writer import CWriter
import uasyncio as asyncio
import asyncio
from gui.core.colors import *
import gui.fonts.arial10 as arial10
from gui.widgets.label import Label
@ -23,19 +23,21 @@ from gui.widgets.textbox import Textbox
pargs = (2, 2, 124, 7) # Row, Col, Width, nlines
# Keyword
tbargs = {'fgcolor' : YELLOW,
'bdcolor' : RED,
'bgcolor' : DARKGREEN,
}
tbargs = {
"fgcolor": YELLOW,
"bdcolor": RED,
"bgcolor": DARKGREEN,
}
async def wrap(wri):
s = '''The textbox displays multiple lines of text in a field of fixed dimensions. \
s = """The textbox displays multiple lines of text in a field of fixed dimensions. \
Text may be clipped to the width of the control or may be word-wrapped. If the number \
of lines of text exceeds the height available, scrolling may be performed \
by calling a method.
'''
"""
tb = Textbox(wri, *pargs, clip=False, **tbargs)
tb.append(s, ntrim = 100, line = 0)
tb.append(s, ntrim=100, line=0)
refresh(ssd)
while True:
await asyncio.sleep(1)
@ -43,20 +45,29 @@ by calling a method.
break
refresh(ssd)
async def clip(wri):
ss = ('clip demo', 'short', 'longer line', 'much longer line with spaces',
'antidisestablishmentarianism', 'line with\nline break', 'Done')
ss = (
"clip demo",
"short",
"longer line",
"much longer line with spaces",
"antidisestablishmentarianism",
"line with\nline break",
"Done",
)
tb = Textbox(wri, *pargs, clip=True, **tbargs)
for s in ss:
tb.append(s, ntrim = 100) # Default line=None scrolls to show most recent
tb.append(s, ntrim=100) # Default line=None scrolls to show most recent
refresh(ssd)
await asyncio.sleep(1)
async def main(wri):
await wrap(wri)
await clip(wri)
def test():
refresh(ssd, True) # Initialise and clear display.
CWriter.set_textpos(ssd, 0, 0) # In case previous tests have altered it
@ -64,4 +75,5 @@ def test():
wri.set_clip(True, True, False)
asyncio.run(main(wri))
test()