Enhanced refresh lock functionality.

pull/55/head
Peter Hinch 2024-10-01 10:40:15 +01:00
rodzic 868ea26f99
commit 67e1e8ea5b
10 zmienionych plików z 349 dodań i 111 usunięć

Wyświetl plik

@ -3,12 +3,14 @@
This is a lightweight, portable, MicroPython GUI library for displays having
drivers subclassed from `framebuf`. Written in Python it runs under a standard
MicroPython firmware build. Options for data input comprise:
* Two pushbuttons: limited capabilities with some widgets unusable for input.
* Three pushbuttons with full capability.
* Five pushbuttons: full capability, less "modal" interface.
* Two pushbuttons: restricted capabilities with some widgets unusable for input.
* All the following options offer full capability:
* Three pushbuttons.
* Five pushbuttons: extra buttons provide a less "modal" interface.
* A switch-based navigation joystick: another way to implement five buttons.
* Via two pushbuttons and a rotary encoder such as
* Two pushbuttons and a rotary encoder such as
[this one](https://www.adafruit.com/product/377). An intuitive interface.
* A rotary encoder with built-in push switch only.
* On ESP32 physical buttons may be replaced with touchpads.
It is larger and more complex than `nano-gui` owing to the support for input.
@ -65,6 +67,7 @@ target and a C device driver (unless you can acquire a suitable binary).
# Project status
Oct 2024: Oct 2024: Refresh locking can now be handled by device driver.
Sept 2024: Refresh control is now via a `Lock`. See [Realtime applications](./README.md#9-realtime-applications).
This is a breaking change for applications which use refresh control.
Sept 2024: Dropdown and Listbox widgets support dynamically variable lists of elements.
@ -686,6 +689,8 @@ Some of these require larger screens. Required sizes are specified as
* `listbox_var.py` Listbox with dynamically variable elements.
* `dropdown_var.py` Dropdown with dynamically variable elements.
* `dropdown_var_tuple.py ` Dropdown with dynamically variable tuple elements.
* `refresh_lock.py` Specialised demo of an application which controls refresh
behaviour. See [Realtime applications](./README.md#8-realtime-applications).
###### [Contents](./README.md#0-contents)
@ -3174,14 +3179,39 @@ docs on `pushbutton.py` may be found
# 9. Realtime applications
Screen refresh is performed in a continuous loop which yields to the scheduler.
In normal applications this works well, however a significant proportion of
processor time is spent performing a blocking refresh. The `asyncio` scheduler
allocates run time to tasks in round-robin fashion. This means that another task
will normally be scheduled once per screen refresh. This can limit data
throughput. To enable applications to handle this, a means of synchronising
refresh to other tasks is provided. This is via a `Lock` instance. The refresh
task operates as below (code simplified to illustrate this mechanism).
These notes assume an application based on `asyncio` that needs to handle events
occurring in real time. There are two ways in which the GUI might affect real
time performance:
* By imposing latency on the scheduling of tasks.
* By making demands on processing power such that a critical task is starved of
execution.
The GUI uses `asyncio` internally and runs a number of tasks. Most of these are
simple and undemanding, the one exception being refresh. This has to copy the
contents of the frame buffer to the hardware, and runs continuously. The way
this works depends on the display type. On small displays with relatively few
pixels it is a blocking, synchronous method. On bigger screens such a method
would block for many tens of ms which would affect latency which would affect
the responsiveness of the user interface. The drivers for such screens have an
asynchronous `do_refresh` method: this divides the refresh into a small number
of segments, each of which blocks for a short period, preserving responsiveness.
In the great majority of applications this works well. For demanding cases a
user-accessible `Lock` is provided to enable refresh to be paused. This is
`Screen.rfsh_lock`. Further, the behaviour of this `Lock` can be modified. By
default the refresh task will hold the `Lock` for the entire duration of a
refresh. Alternatively the `Lock` can be held for the duration of the update of
one segment. In testing on a Pico with ILI9341 the `Lock` duration was reduced
from 95ms to 11.3ms. If an application has a task which needs to be scheduled at
a high rate, this corresponds to an increase from 10Hz to 88Hz.
The mechanism for controlling lock behaviour is a method of the `ssd` instance:
* `short_lock(v=None)` If `True` is passed, the `Lock` will be held briefly,
`False` will cause it to be held for the entire refresh, `None` makes no change.
The method returns the current state. Note that only the larger display drivers
support this method.
The following (pseudocode, simplified) illustrates this mechanism:
```python
class Screen:
rfsh_lock = Lock() # Refresh pauses until lock is acquired
@ -3189,21 +3219,36 @@ class Screen:
@classmethod
async def auto_refresh(cls):
while True:
async with cls.rfsh_lock:
ssd.show() # Refresh the physical display.
# Flag user code.
await asyncio.sleep_ms(0) # Let user code respond to event
if display_supports_segmented_refresh and short_lock_is_enabled:
# At intervals yield and release the lock
await ssd.do_refresh(split, cls.rfsh_lock)
else: # Lock for the entire refresh
await asyncio.sleep_ms(0) # Let user code respond to event
async with cls.rfsh_lock:
if display_supports_segmented_refresh:
# Yield at intervals (retaining lock)
await ssd.do_refresh(split) # Segmented refresh
else:
ssd.show() # Blocking synchronous refresh on small screen.
```
User code can wait on the lock and, once acquired, perform an operation which
cannot be interrupted by a refresh. This is normally done as follows:
User code can wait on the lock and, once acquired, run asynchronous code which
cannot be interrupted by a refresh. This is normally done with an asynchronous
context manager:
```python
async with Screen.rfsh_lock:
# do something that can't be interrupted with a refresh
```
The demo `gui/demos/audio.py` provides an example, where the `play_song` task
gives priority to maintaining the audio buffer. It does this by holding the lock
for several iterations of buffer filling before releasing the lock to allow a
single refresh.
The demo `refresh_lock.py` illustrates this mechanism, allowing refresh to be
started and stopped. The demo also allows the `short_lock` method to be tested,
with a display of the scheduling rate of a minimal locked task. In a practical
application this rate is dependant on various factors. A number of debugging
aids exist to assist in measuring and optimising this. See
[this doc](https://github.com/peterhinch/micropython-async/blob/master/v3/README.md).
The demo `gui/demos/audio.py`
provides an example, where the `play_song` task gives priority to maintaining
the audio buffer. It does this by holding the lock for several iterations of
buffer filling before releasing the lock to allow a single refresh.
See [Appendix 4 GUI Design notes](./README.md#appendix-4-gui-design-notes) for
the reason for continuous refresh.

Wyświetl plik

@ -73,6 +73,7 @@ class GC9A01(framebuf.FrameBuffer):
self._cs = cs
self._dc = dc
self._rst = rst
self.lock_mode = False # If set, user lock is passed to .do_refresh
self.height = height # Logical dimensions for GUIs
self.width = width
self._spi_init = init_spi
@ -202,7 +203,16 @@ class GC9A01(framebuf.FrameBuffer):
self._spi.write(lb)
self._cs(1)
async def do_refresh(self, split=4):
def short_lock(self, v=None):
if v is not None:
self.lock_mode = v # If set, user lock is passed to .do_refresh
return self.lock_mode
# nanogui apps typically call with no args. ugui and tgui pass split and
# may pass a Lock depending on lock_mode
async def do_refresh(self, split=4, elock=None):
if elock is None:
elock = asyncio.Lock()
async with self._lock:
lines, mod = divmod(self.height, split) # Lines per segment
if mod:
@ -216,12 +226,13 @@ class GC9A01(framebuf.FrameBuffer):
cm = self._gscale # color False, greyscale True
line = 0
for _ in range(split): # For each segment
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._cs(0)
for start in range(wd * line, wd * (line + lines), wd): # For each line
_lcopy(lb, buf[start:], clut, wd, cm) # Copy and map colors
self._spi.write(lb)
line += lines
self._cs(1) # Allow other tasks to use bus
async with elock:
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._cs(0)
for start in range(wd * line, wd * (line + lines), wd): # For each line
_lcopy(lb, buf[start:], clut, wd, cm) # Copy and map colors
self._spi.write(lb)
line += lines
self._cs(1) # Allow other tasks to use bus
await asyncio.sleep_ms(0)

Wyświetl plik

@ -60,6 +60,7 @@ class GC9A01(framebuf.FrameBuffer):
self._cs = cs
self._dc = dc
self._rst = rst
self.lock_mode = False # If set, user lock is passed to .do_refresh
self.height = height # Logical dimensions for GUIs
self.width = width
self._spi_init = init_spi
@ -182,7 +183,16 @@ class GC9A01(framebuf.FrameBuffer):
self._spi.write(lb)
self._cs(1)
async def do_refresh(self, split=4):
def short_lock(self, v=None):
if v is not None:
self.lock_mode = v # If set, user lock is passed to .do_refresh
return self.lock_mode
# nanogui apps typically call with no args. ugui and tgui pass split and
# may pass a Lock depending on lock_mode
async def do_refresh(self, split=4, elock=None):
if elock is None:
elock = asyncio.Lock()
async with self._lock:
lines, mod = divmod(self.height, split) # Lines per segment
if mod:
@ -194,12 +204,13 @@ class GC9A01(framebuf.FrameBuffer):
wd = self.width
line = 0
for _ in range(split): # For each segment
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._cs(0)
for start in range(wd * line, wd * (line + lines), wd): # For each line
_lcopy(lb, buf[start:], wd) # Copy and map colors
self._spi.write(lb)
line += lines
self._cs(1) # Allow other tasks to use bus
async with elock:
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._cs(0)
for start in range(wd * line, wd * (line + lines), wd): # For each line
_lcopy(lb, buf[start:], wd) # Copy and map colors
self._spi.write(lb)
line += lines
self._cs(1) # Allow other tasks to use bus
await asyncio.sleep_ms(0)

Wyświetl plik

@ -58,6 +58,7 @@ class ILI9341(framebuf.FrameBuffer):
self._cs = cs
self._dc = dc
self._rst = rst
self.lock_mode = False # If set, user lock is passed to .do_refresh
self.height = height
self.width = width
self._spi_init = init_spi
@ -156,7 +157,16 @@ class ILI9341(framebuf.FrameBuffer):
self._spi.write(lb)
self._cs(1)
async def do_refresh(self, split=4):
def short_lock(self, v=None):
if v is not None:
self.lock_mode = v # If set, user lock is passed to .do_refresh
return self.lock_mode
# nanogui apps typically call with no args. ugui and tgui pass split and
# may pass a Lock depending on lock_mode
async def do_refresh(self, split=4, elock=None):
if elock is None:
elock = asyncio.Lock()
async with self._lock:
lines, mod = divmod(self.height, split) # Lines per segment
if mod:
@ -174,12 +184,13 @@ class ILI9341(framebuf.FrameBuffer):
self._dc(1)
line = 0
for _ in range(split): # For each segment
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._cs(0)
for start in range(wd * line, wd * (line + lines), wd): # For each line
_lcopy(lb, buf[start:], clut, wd, cm) # Copy and map colors
self._spi.write(lb)
line += lines
self._cs(1) # Allow other tasks to use bus
async with elock:
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._cs(0)
for start in range(wd * line, wd * (line + lines), wd): # For each line
_lcopy(lb, buf[start:], clut, wd, cm) # Copy and map colors
self._spi.write(lb)
line += lines
self._cs(1) # Allow other tasks to use bus
await asyncio.sleep_ms(0)

Wyświetl plik

@ -46,6 +46,7 @@ class ILI9341(framebuf.FrameBuffer):
self._cs = cs
self._dc = dc
self._rst = rst
self.lock_mode = False # If set, user lock is passed to .do_refresh
self.height = height
self.width = width
self._spi_init = init_spi
@ -134,7 +135,16 @@ class ILI9341(framebuf.FrameBuffer):
self._spi.write(lb)
self._cs(1)
async def do_refresh(self, split=4):
def short_lock(self, v=None):
if v is not None:
self.lock_mode = v # If set, user lock is passed to .do_refresh
return self.lock_mode
# nanogui apps typically call with no args. ugui and tgui pass split and
# may pass a Lock depending on lock_mode
async def do_refresh(self, split=4, elock=None):
if elock is None:
elock = asyncio.Lock()
async with self._lock:
lines, mod = divmod(self.height, split) # Lines per segment
if mod:
@ -150,12 +160,13 @@ class ILI9341(framebuf.FrameBuffer):
self._dc(1)
line = 0
for _ in range(split): # For each segment
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._cs(0)
for start in range(wd * line, wd * (line + lines), wd): # For each line
_lcopy(lb, buf[start:], wd) # Copy and map colors
self._spi.write(lb)
line += lines
self._cs(1) # Allow other tasks to use bus
async with elock:
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._cs(0)
for start in range(wd * line, wd * (line + lines), wd): # For each line
_lcopy(lb, buf[start:], wd) # Copy and map colors
self._spi.write(lb)
line += lines
self._cs(1) # Allow other tasks to use bus
await asyncio.sleep_ms(0)

Wyświetl plik

@ -86,6 +86,7 @@ class ILI9486(framebuf.FrameBuffer):
self._cs = cs
self._dc = dc
self._rst = rst
self.lock_mode = False # If set, user lock is passed to .do_refresh
self.height = height # Logical dimensions for GUIs
self.width = width
self._long = max(height, width) # Physical dimensions of screen and aspect ratio
@ -180,7 +181,16 @@ class ILI9486(framebuf.FrameBuffer):
self._spi.write(lb)
self._cs(1)
async def do_refresh(self, split=4):
def short_lock(self, v=None):
if v is not None:
self.lock_mode = v # If set, user lock is passed to .do_refresh
return self.lock_mode
# nanogui apps typically call with no args. ugui and tgui pass split and
# may pass a Lock depending on lock_mode
async def do_refresh(self, split=4, elock=None):
if elock is None:
elock = asyncio.Lock()
async with self._lock:
lines, mod = divmod(self._long, split) # Lines per segment
if mod:
@ -195,27 +205,29 @@ class ILI9486(framebuf.FrameBuffer):
wd = self.width // 2
line = 0
for _ in range(split): # For each segment
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._cs(0)
for start in range(wd * line, wd * (line + lines), wd): # For each line
_lcopy(lb, buf[start:], clut, wd, cm) # Copy and map colors
self._spi.write(lb)
line += lines
self._cs(1) # Allow other tasks to use bus
async with elock:
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._cs(0)
for start in range(wd * line, wd * (line + lines), wd): # For each line
_lcopy(lb, buf[start:], clut, wd, cm) # Copy and map colors
self._spi.write(lb)
line += lines
self._cs(1) # Allow other tasks to use bus
await asyncio.sleep_ms(0)
else: # Landscape: write sets of cols. lines is no. of cols per segment.
cargs = (self.height << 9) + (self.width << 18) # Viper 4-arg limit
sc = self.width - 1 # Start and end columns
ec = sc - lines # End column
for _ in range(split): # For each segment
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._cs(0)
for col in range(sc, ec, -1): # For each column of landscape display
_lscopy(lb, buf, clut, col + cargs, cm) # Copy and map colors
self._spi.write(lb)
sc -= lines
ec -= lines
self._cs(1) # Allow other tasks to use bus
async with elock:
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._cs(0)
for col in range(sc, ec, -1): # For each column of landscape display
_lscopy(lb, buf, clut, col + cargs, cm) # Copy and map colors
self._spi.write(lb)
sc -= lines
ec -= lines
self._cs(1) # Allow other tasks to use bus
await asyncio.sleep_ms(0)

Wyświetl plik

@ -91,6 +91,7 @@ class ST7789(framebuf.FrameBuffer):
self._rst = rst # Pins
self._dc = dc
self._cs = cs
self.lock_mode = False # If set, user lock is passed to .do_refresh
self.height = height # Required by Writer class
self.width = width
self._offset = display[:2] # display arg is (x, y, orientation)
@ -244,8 +245,16 @@ class ST7789(framebuf.FrameBuffer):
self._cs(1)
# print(ticks_diff(ticks_us(), ts))
# Asynchronous refresh with support for reducing blocking time.
async def do_refresh(self, split=5):
def short_lock(self, v=None):
if v is not None:
self.lock_mode = v # If set, user lock is passed to .do_refresh
return self.lock_mode
# nanogui apps typically call with no args. ugui and tgui pass split and
# may pass a Lock depending on lock_mode
async def do_refresh(self, split=4, elock=None):
if elock is None:
elock = asyncio.Lock()
async with self._lock:
lines, mod = divmod(self.height, split) # Lines per segment
if mod:
@ -257,15 +266,16 @@ class ST7789(framebuf.FrameBuffer):
buf = self.mvb
line = 0
for n in range(split):
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._dc(0)
self._cs(0)
self._spi.write(b"\x3c" if n else b"\x2c") # RAMWR/Write memory continue
self._dc(1)
for start in range(wd * line, wd * (line + lines), wd):
_lcopy(lb, buf[start:], clut, wd, cm) # Copy and map colors
self._spi.write(lb)
line += lines
self._cs(1)
async with elock:
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._dc(0)
self._cs(0)
self._spi.write(b"\x3c" if n else b"\x2c") # RAMWR/Write memory continue
self._dc(1)
for start in range(wd * line, wd * (line + lines), wd):
_lcopy(lb, buf[start:], clut, wd, cm) # Copy and map colors
self._spi.write(lb)
line += lines
self._cs(1)
await asyncio.sleep(0)

Wyświetl plik

@ -79,6 +79,7 @@ class ST7789(framebuf.FrameBuffer):
self._rst = rst # Pins
self._dc = dc
self._cs = cs
self.lock_mode = False # If set, user lock is passed to .do_refresh
self.height = height # Required by Writer class
self.width = width
self._offset = display[:2] # display arg is (x, y, orientation)
@ -224,8 +225,16 @@ class ST7789(framebuf.FrameBuffer):
self._cs(1)
# print(ticks_diff(ticks_us(), ts))
# Asynchronous refresh with support for reducing blocking time.
async def do_refresh(self, split=5):
def short_lock(self, v=None):
if v is not None:
self.lock_mode = v # If set, user lock is passed to .do_refresh
return self.lock_mode
# nanogui apps typically call with no args. ugui and tgui pass split and
# may pass a Lock depending on lock_mode
async def do_refresh(self, split=4, elock=None):
if elock is None:
elock = asyncio.Lock()
async with self._lock:
lines, mod = divmod(self.height, split) # Lines per segment
if mod:
@ -235,15 +244,16 @@ class ST7789(framebuf.FrameBuffer):
buf = self.mvb
line = 0
for n in range(split):
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._dc(0)
self._cs(0)
self._spi.write(b"\x3c" if n else b"\x2c") # RAMWR/Write memory continue
self._dc(1)
for start in range(wd * line, wd * (line + lines), wd):
_lcopy(lb, buf[start:], wd) # Copy and map colors
self._spi.write(lb)
line += lines
self._cs(1)
async with elock:
if self._spi_init: # A callback was passed
self._spi_init(self._spi) # Bus may be shared
self._dc(0)
self._cs(0)
self._spi.write(b"\x3c" if n else b"\x2c") # RAMWR/Write memory continue
self._dc(1)
for start in range(wd * line, wd * (line + lines), wd):
_lcopy(lb, buf[start:], wd) # Copy and map colors
self._spi.write(lb)
line += lines
self._cs(1)
await asyncio.sleep(0)

Wyświetl plik

@ -25,7 +25,7 @@ ssd = None
_vb = True
gc.collect()
__version__ = (0, 1, 9)
__version__ = (0, 1, 11)
async def _g():
@ -415,6 +415,7 @@ class Screen:
@classmethod
async def auto_refresh(cls):
arfsh = hasattr(ssd, "do_refresh") # Refresh can be asynchronous.
gran = hasattr(ssd, "lock_mode") # Allow granular locking
if arfsh:
h = ssd.height
split = max(y for y in (1, 2, 3, 5, 7) if not h % y)
@ -422,14 +423,19 @@ class Screen:
arfsh = False
while True:
Screen.show(False) # Update stale controls. No physical refresh.
# Now perform physical refresh. If there is no user locking,
# the lock will be acquired immediately
async with cls.rfsh_lock:
await asyncio.sleep_ms(0) # Allow other tasks to detect lock
if arfsh:
await ssd.do_refresh(split)
else:
ssd.show() # Synchronous (blocking) refresh.
# Now perform physical refresh.
# If there is no user locking, .rfsh_lock will be acquired immediately
if arfsh and gran and ssd.lock_mode: # Async refresh, display driver can handle lock
# User locking is granular: lock is released at intervals during refresh
await ssd.do_refresh(split, cls.rfsh_lock)
else: # Either synchronous refresh or old style device driver
# Lock for the entire refresh period.
async with cls.rfsh_lock:
await asyncio.sleep_ms(0) # Allow other tasks to detect lock
if arfsh:
await ssd.do_refresh(split)
else:
ssd.show() # Synchronous (blocking) refresh.
await asyncio.sleep_ms(0) # Let user code respond to lock release
@classmethod

Wyświetl plik

@ -0,0 +1,111 @@
# refresh_lock.py
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2024 Peter Hinch
# This demo assumes a large display whose drive supports segmented refresh.
import hardware_setup # Create a display instance
try:
from gui.core.tgui import Screen, ssd
except ImportError: # Running under micro-gui
from gui.core.ugui import Screen, ssd
from gui.widgets import Label, Button, ButtonList, CloseButton, LED
from gui.core.writer import CWriter
import gui.fonts.font10 as font
from gui.core.colors import *
import asyncio
from machine import Pin
class BaseScreen(Screen):
def __init__(self):
table = [
{"fgcolor": RED, "shape": RECTANGLE, "text": "Stop", "args": [False]},
{"fgcolor": GREEN, "shape": RECTANGLE, "text": "Start", "args": [True]},
]
table1 = [
{"fgcolor": YELLOW, "shape": RECTANGLE, "text": "Fast", "args": [True]},
{"fgcolor": CYAN, "shape": RECTANGLE, "text": "Slow", "args": [False]},
]
super().__init__()
fixed_speed = not hasattr(ssd, "short_lock")
if fixed_speed:
print("Display does not support short_lock method.")
self.do_refresh = True
self.task_count = 0
wri = CWriter(ssd, font, GREEN, BLACK, verbose=False)
col = 2
row = 2
lb = Label(wri, row, col, "Refresh test")
self.led = LED(wri, row, lb.mcol + 20)
row = 30
bl = ButtonList(self.cb)
for t in table: # Buttons overlay each other at same location
bl.add_button(wri, row, col, **t)
row = 60
bl = ButtonList(self.cbspeed)
bl.greyed_out(fixed_speed)
for t in table1: # Buttons overlay each other at same location
bl.add_button(wri, row, col, **t)
row = 90
lb = Label(wri, row, col, "Scheduling rate:")
self.lblrate = Label(wri, row, lb.mcol + 4, "000", bdcolor=RED, justify=Label.RIGHT)
Label(wri, row, self.lblrate.mcol + 4, "Hz")
self.reg_task(self.flash()) # Flash the LED
self.reg_task(self.toggle()) # Run a task which measures its scheduling rate
self.reg_task(self.report())
self.reg_task(self.rfsh_ctrl()) # Turn refresh on or off
CloseButton(wri) # Quit
def cb(self, _, v): # Star-stop Pushbutton callback
asyncio.create_task(self.dopb(v))
# The long delay here is a slight hack. Allow least one refresh cycle to occur
# before stopping so that the new button state is visible.
async def dopb(self, v):
self.lblrate.value("0")
await asyncio.sleep_ms(200)
self.do_refresh = v
def cbspeed(self, _, v): # Fast-slow pushbutton callback
ssd.short_lock(v)
async def rfsh_ctrl(self):
while True:
if self.do_refresh: # Allow refresh to proceed normally
await asyncio.sleep_ms(100)
else: # Prevent refresh until the button is pressed.
async with Screen.rfsh_lock:
while not self.do_refresh:
await asyncio.sleep_ms(100)
# Proof of stopped refresh: task keeps running but change not visible
async def flash(self):
while True:
self.led.value(not self.led.value())
await asyncio.sleep_ms(300)
async def report(self):
while True:
await asyncio.sleep(1)
self.lblrate.value(f"{self.task_count}")
self.task_count = 0
# Measure the scheduling rate of a minimal task
async def toggle(self):
while True:
async with Screen.rfsh_lock:
self.task_count += 1
await asyncio.sleep_ms(0)
def test():
print("Refresh test.")
Screen.change(BaseScreen)
test()