Add RP2 examples.

master
Peter Hinch 2025-07-24 17:09:18 +01:00
rodzic 96b7bfe476
commit 8f958d0a69
12 zmienionych plików z 832 dodań i 39 usunięć

Wyświetl plik

@ -50,7 +50,8 @@ Please also see the [official examples](https://github.com/micropython/micropyth
4.17 [2D array indexing](./README.md#417-2d-array-indexing) Use `[1:3, 20]` syntax to address a 2D array.
4.18 [Astronomy](./README.md#418-astronomy) Derive Sun and Moon rise and set times, moon phase.
4.19 [Tone detection](./README.md#419-tone-detection) Goertzel algorithm.
5. [Module Index](./README.md#5-module-index) Supported code. Device drivers, GUI's, utilities.
4.20 [Using RP2XXX hardware](./README.md#420-using-rp2xxx-hardware) Introduction to PIO, DMA and dual-core code.
5. [Module Index](./README.md#5-module-index) Index to my fully supported modules. Device drivers, GUI's, utilities.
5.1 [asyncio](./README.md#51-asyncio) Tutorial and drivers for asynchronous coding.
5.2 [Memory Device Drivers](./README.md#52-memory-device-drivers) Drivers for nonvolatile memory devices.
5.3 [Inertial Measurement Units](./README.md#53-inertial-measurement-units) Gravity, gyro and magnetic sensors.
@ -402,6 +403,21 @@ This module may be used for detection of audio tones. It uses the Goertzel
algorithm which is effectively a single-bin Fourier transform. See
[docs](./goertzel/README.md).
## 4.20 Using RP2XXX hardware
The RP2040 (Pico) and RP2350 (Pico 2) have some powerful hardware:
* The PIO comprising a set of programmable, high speed, state machines.
* Python-programmable DMA that can be linked to the SM's.
* A second core.
The following samples are provided to illustrate the use of these facilities:
* A nonblocking SPI master capable of very high throughput.
* A nonblocking SPI slave.
* A means of measuring the characteristics of an incoming pulse train.
* An analog of the ESP32 RMT module: nonblocking output of complex pulse trains.
See [the docs](./rp2/RP2.md).
# 5. Module index
This index references applications and device drivers that I have developed, in

281
rp2/RP2.md 100644
Wyświetl plik

@ -0,0 +1,281 @@
# Code samples for RP2040 and RP2350 (Pico and Pico 2)
These are intended to demonstrate the use of Pico-specific hardware.
1. [Nonblocking SPI master](./RP2#1-nonblocking-spi-master) High speed bulk data output.
1.1 [Class RP2_SPI_DMA_MASTER](./RP2#11-rp2_spi_dma_master)
1.2 [Constructor](./RP2#12-constructor)
1.3 [Methods](./RP2#13-methods)
1.4 [CS](./RP2#14-cs) How to control the CS/ pin.
2. [Nonblocking SPI slave](./RP2#2-nonblocking-spi-slave) High speed bulk data input.
2.1 [Introduction](./RP2#21-introduction)
2.2 [SpiSlave class](./RP2#22-spislave-class)
2.3 [Constructor](./RP2#23-constructor)
2.4 [Synchronous Interface](./RP2#24-synchronous-interface)
2.5 [Asynchronous Interface](./RP2#25-asynchronous-interface)
2.6 [Test Scripts](./RP2#26-test-scripts)
3. [Pulse Measurement](./RP2#3-pulse-Measurement) Measure incoming pulses.
4. [Pulse train output](./RP2#4-pulse-train-output) Output arbitrary pulse trains as per ESP32 RMT.
4.1 [The RP2_RMT class](./RP2#41-the-rp2_rmt-class)
4.2 [Constructor](./RP2#42-constructor)
4.3 [Methods](./RP2#43-methods)
     4.3.1 [send](./RP2#431-send)
     4.3.2 [busy](./RP2#432-busy)
4.4 [Design](./RP2#44-design)
4.5 [Limitations](./RP2#45-limitations)
# 1. Nonblocking SPI master
The module `spi_master` provides a class `RP2_SPI_DMA_MASTER` which uses DMA to
perform fast SPI output. A typical use case is to transfer the contents of a
frame buffer to a display as a background task. The following files are provided
in the `spi` directory:
* `spi_master.py` The main module.
* `master_test.py` Test script - requires a scope or LA to check SPI output.
* `master_slave_test.py` Full test of master linked to slave, with the latter
printing results.
This module supports the most common SPI case, being the `machine.SPI` default:
polarity=0, phase=0, bits=8, firstbit=SPI.MSB. Benefits over the official
`machine.SPI` are the nonblocking write and the fact that baudrates are precise;
those on the official class are highly quantised, with (for example) 20MHz
manifesting as 12MHz. The module has been tested to 30MHz, but higher rates
should be possible.
To run the full test, the following pins should be linked:
* 0-19 MOSI
* 1-18 SCK
* 2-17 CSN
To run a test issue (e.g.):
```py
>>> import spi.master_slave_test
```
## 1.1 Class RP2_SPI_DMA_MASTER
## 1.2 Constructor
This takes the following positional args:
* `sm_num` State machine no. (0..7 on RP2040, 0..11 on RP2350)
* `freq` SPI clock frequency in Hz.
* `sck` A `Pin` instance for `sck`. Pins are arbitrary.
* `mosi` A `Pin` instance for `mosi`.
* `callback` A callback to run when DMA is complete. This is run in a hard IRQ
context. Typical use is to set a `ThreadSafeFlag`. Note that the DMA completes
before transmission ends due to bytes stored in the SM FIFO. This is unlikely to
have practical consequences because of MicroPython latency: in particular
response to a `ThreadSafeFlag` typically takes >200μs.
## 1.3 Methods
* `write(data : bytes)` arg a `bytes` or `bytearray` of data to transmit. Return
is rapid with transmission running in the background. Returns `None`.
* `deinit()` Disables the DMA and the SM. Returns `None`.
## 1.4 CS/
The application should assert CS/ (set to 0) prior to transmission and deassert
it after transmission is complete.
# 2. Nonblocking SPI slave
This module requires incoming data to conform to the most common SPI case, being
the `machine.SPI` default: polarity=0, phase=0, bits=8, firstbit=SPI.MSB.
It has been tested at a clock rate of 24MHz on an RP2040 running at 250MHz.
The following files may be found in the `spi` directory:
* `spi_slave.py` Main module.
* `slave_sync_test` Test using synchronous code.
* `slave_async_test` Test using asynchronous code.
* `master_slave_test.py` Full test of master linked to slave, with the latter
printing results.
Tests operate by using the official SPI library to transmit with the module
receiving (`master_slave_test` uses the nonblocking master). To run the tests
the following pins should be linked:
* 0-19 MOSI
* 1-18 SCK
* 2-17 CSN
Tests are run by issuing (e.g.):
```py
>>> import spi.master_slave_test
```
# 2.1 Introduction
This uses a PIO state machine (SM) with DMA to enable an RP2 to receive SPI
transfers from a host. Reception is non-blocking, enabling code to run while a
transfer is in progress. The principal application area is for fast transfer of
large messages.
# 2.2 SpiSlave class
The class presents synchronous and asynchronous APIs. In use the class is set
up to read data. The master sets `CS/` low, transmits data, then sets `CS/`
high, terminating the transfer.
## 2.3 Constructor
This takes the following positional args:
* `buf=None` Optional `bytearray` for incoming data. This is required if using
the asynchronous iterator interface, otherwise it is unused.
* `callback=None` Optional callback for use with the synchronous API. It takes
a single arg, being the number of bytes received.
* `sm_num=0` State machine number.
Keyword args: Pin instances, initialised as input. GPIO nos. must be consecutive
starting from `mosi`.
* `mosi` Pin for MOSI.
* `sck` SCK Pin.
* `csn` CSN Pin.
# 2.4 Synchronous Interface
This is via `SpiSlave.read_into(buffer)` where `buffer` is a user supplied
`bytearray`. This must be large enough for the expected message. The method
returns immediately. When a message arrives and reception is complete, the
callback runs. Its integer arg is the number of bytes received.
If a message is too long to fit the buffer, when the buffer fills, subsequent
characters are lost.
# 2.5 Asynchronous Interface
There are two ways to use this. The simplest uses a single buffer passed to the
constructor. This should be sized to accommodate the longest expected message.
Reception is via an asynchronous iterator:
```py
async def receive(piospi): # Arg is an SpiSlave instantiated with a buffer
async for msg in piospi: # msg is a memoryview of the buffer
print(bytes(msg))
```
This prints incoming messages as they arrive.
An alternative approach enables the use of multiple buffers (for example two
phase "ping pong" buffering). Reception is via an asynchronous method
`SpiSlave.as_read_into(buffer)`:
```py
rbuf = bytearray(200)
nbytes = await piospi.as_read_into(rbuf) # Wait for a message, get no. of received bytes
print(bytes(rbuf[:nbytes]))
```
As with all asynchronous code, this task pauses while others continue to run.
# 2.6 Operation
The slave will ignore all interface activity until CS/ is driven low. It then
receives data with the end of message identified by a low to high transition on
CS/.
# 3. Pulse Measurement
The file `measure_pulse.py` is a simple demo of using the PIO to measure a pulse
waveform. As written it runs a PWM to provide a signal. To run the demo link
pins GPIO16 and GPIO17. It measures period/frequency and mark (hence space and
mark/space can readily be derived).
As written the native clock rate is used (125MHz on Pico).
# 4. Pulse train output
The `RP2_RMT` class provides functionality similar to that of the ESP32 `RMT`
class. It enables pulse trains to be output using a non-blocking driver. By
default the train occurs once. Alternatively it can repeat a defined number of
times, or can be repeated continuously.
The class was designed for my [IR blaster](https://github.com/peterhinch/micropython_ir)
and [433MHz remote](https://github.com/peterhinch/micropython_remote)
libraries. It supports an optional carrier frequency, where each high pulse can
appear as a burst of a defined carrier frequency. The class can support both
forms concurrently on a pair of pins: one pin produces pulses while a second
produces carrier bursts.
Pulse trains are specified as arrays with each element being a duration in μs.
Arrays may be of integers or half-words depending on the range of times to be
covered. The duration of a "tick" is 1μs by default, but this can be changed.
To run the test issue:
```py
>>> import rmt.rp2_rmt_test
```
Output is on pins 16 and/or 17: see below and test source.
# 4.1 The RP2_RMT class
## 4.2 Constructor
This takes the following args:
1. `pin_pulse=None` If an output `Pin` instance is provided, pulses will be
output on it.
2. `carrier=None` To output a carrier, a 3-tuple should be provided comprising
`(pin, freq, duty)` where `pin` is an output pin instance, `freq` is the
carrier frequency in Hz and `duty` is the duty ratio in %.
3. `sm_no=0` State machine no.
4. `sm_freq=1_000_000` Clock frequency for SM. Defines the unit for pulse
durations.
## 4.3 Methods
### 4.3.1 send
This returns "immediately" with a pulse train being emitted as a background
process. Args:
1. `ar` A zero terminated array of pulse durations in μs. See notes below.
2. `reps=1` No. of repetions. 0 indicates continuous output.
3. `check=True` By default ensures that the pulse train ends in the inactive
state.
In normal operation, between pulse trains, the pulse pin is low and the carrier
is off. A pulse train ends when a 0 pulse width is encountered: this allows
pulse trains to be shorter than the array length, for example where a
pre-allocated array stores pulse trains of varying lengths. In RF transmitter
applications ensuring the carrier is off between pulse trains may be a legal
requirement, so by default the `send` method enforces this.
The first element of the array defines the duration of the first high going
pulse, with the second being the duration of the first `off` period. If there
are an even number of elements prior to the terminating 0, the signal will end
in the `off` state. If the `check` arg is `True`, `.send()` will check for an
odd number of elements; in this case it will overwrite the last element with 0
to enforce a final `off` state.
This check may be skipped by setting `check=False`. This provides a means of
inverting the normal sense of the driver: if the first pulse train has an odd
number of elements and `check=False` the pin will be left high (and the carrier
on). Subsequent normal pulsetrains will start and end in the high state.
### 4.3.2 busy
No args. Returns `True` if a pulse train is being emitted.
### 4.3.3 cancel
No args. If a pulse train is being emitted it will continue to the end but no
further repetitions will take place.
# 4.4 Design
The class constructor installs one of two PIO scripts depending on whether a
`pin_pulse` is specified. If it is, the `pulsetrain` script is loaded which
drives the pin directly from the PIO. If no `pin_pulse` is required, the
`irqtrain` script is loaded. Both scripts cause an IRQ to be raised at times
when a pulse would start or end.
The `send` method loads the transmit FIFO with initial pulse durations and
starts the state machine. The `._cb` ISR keeps the FIFO loaded with data until
a 0 entry is encountered. It also turns the carrier on and off (using a PWM
instance). This means that there is some latency between the pulse and the
carrier. However latencies at start and end are effectively identical, so the
duration of a carrier burst is correct.
# 4.5 Limitations
While the tick interval can be reduced to provide timing precision better than
1μs, the design of this class will not support very high pulse repetition
frequencies. This is because each pulse causes an interrupt: MicroPython is
unable to support high IRQ rates.
[This library](https://github.com/robert-hh/RP2040-Examples/tree/master/pulses)
is more capable in this regard.

Wyświetl plik

@ -0,0 +1,68 @@
# measure_pulse.py Measure a pulse train with PIO
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2025 Peter Hinch
# Link GPIO16-GPIO17 to test
from machine import Pin, PWM
import rp2
import time
@rp2.asm_pio(set_init=rp2.PIO.IN_LOW, autopush=True, push_thresh=32)
def period():
wrap_target()
set(x, 0)
wait(0, pin, 0) # Wait for pin to go low
wait(1, pin, 0) # Low to high transition
label("low_high")
jmp(x_dec, "next")[1] # unconditional
label("next")
jmp(pin, "low_high") # while pin is high
label("low") # pin is low
jmp(x_dec, "nxt")
label("nxt")
jmp(pin, "done") # pin has gone high: all done
jmp("low")
label("done")
in_(x, 32) # Auto push: SM stalls if FIFO full
wrap()
@rp2.asm_pio(set_init=rp2.PIO.IN_LOW, autopush=True, push_thresh=32)
def mark():
wrap_target()
set(x, 0)
wait(0, pin, 0) # Wait for pin to go low
wait(1, pin, 0) # Low to high transition
label("low_high")
jmp(x_dec, "next")[1] # unconditional
label("next")
jmp(pin, "low_high") # while pin is high
in_(x, 32) # Auto push: SM stalls if FIFO full
wrap()
ck = 100_000_000 # Clock rate in Hz.
pin16 = Pin(16, Pin.IN, Pin.PULL_UP)
sm0 = rp2.StateMachine(0, period, in_base=pin16, jmp_pin=pin16, freq=ck)
sm0.active(1)
sm1 = rp2.StateMachine(1, mark, in_base=pin16, jmp_pin=pin16, freq=ck)
sm1.active(1)
# Clock is 100MHz. 3 cycles per iteration, so unit is 30.0ns
def scale(v):
return (1 + (v ^ 0xFFFFFFFF)) * 3000 / ck # Scale to ms
# ***** TEST WITH PWM *****
pwm = PWM(Pin(17))
pwm.freq(1000)
pwm.duty_u16(0xFFFF // 3)
while True:
period = scale(sm0.get())
mark = scale(sm1.get())
print(f"Period {period}ms Mark {mark}ms m/s {100*mark / (period - mark):5.2f}%")
time.sleep(0.2)

107
rp2/rmt/rp2_rmt.py 100644
Wyświetl plik

@ -0,0 +1,107 @@
# rp2_rmt.py A RMT-like class for the RP2.
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2021 Peter Hinch
from machine import Pin, PWM
import rp2
@rp2.asm_pio(set_init=rp2.PIO.OUT_LOW, autopull=True, pull_thresh=32)
def pulsetrain():
wrap_target()
out(x, 32) # No of 1MHz ticks. Block if FIFO MT at end.
irq(rel(0))
set(pins, 1) # Set pin high
label("loop")
jmp(x_dec, "loop")
irq(rel(0))
set(pins, 0) # Set pin low
out(y, 32) # Low time.
label("loop_lo")
jmp(y_dec, "loop_lo")
wrap()
@rp2.asm_pio(autopull=True, pull_thresh=32)
def irqtrain():
wrap_target()
out(x, 32) # No of 1MHz ticks. Block if FIFO MT at end.
irq(rel(0))
label("loop")
jmp(x_dec, "loop")
wrap()
class DummyPWM:
def duty_u16(self, _):
pass
class RP2_RMT:
def __init__(self, pin_pulse=None, carrier=None, sm_no=0, sm_freq=1_000_000):
if carrier is None:
self.pwm = DummyPWM()
self.duty = (0, 0)
else:
pin_car, freq, duty = carrier
self.pwm = PWM(pin_car) # Set up PWM with carrier off.
self.pwm.freq(freq)
self.pwm.duty_u16(0)
self.duty = (int(0xFFFF * duty // 100), 0)
if pin_pulse is None:
self.sm = rp2.StateMachine(sm_no, irqtrain, freq=sm_freq)
else:
self.sm = rp2.StateMachine(sm_no, pulsetrain, freq=sm_freq, set_base=pin_pulse)
self.apt = 0 # Array index
self.arr = None # Array
self.ict = None # Current IRQ count
self.icm = 0 # End IRQ count
self.reps = 0 # 0 == forever n == no. of reps
rp2.PIO(0).irq(self._cb)
# IRQ callback. Because of FIFO IRQ's keep arriving after STOP.
def _cb(self, pio):
self.pwm.duty_u16(self.duty[self.ict & 1])
self.ict += 1
if d := self.arr[self.apt]: # If data available feed FIFO
self.sm.put(d)
self.apt += 1
else:
if r := self.reps != 1: # All done if reps == 1
if r: # 0 == run forever
self.reps -= 1
self.sm.put(self.arr[0])
self.apt = 1 # Set pointer and count to state
self.ict = 1 # after 1st IRQ
# Arg is an array of times in μs terminated by 0.
def send(self, ar, reps=1, check=True):
self.sm.active(0)
self.reps = reps
ar[-1] = 0 # Ensure at least one STOP
for x, d in enumerate(ar): # Find 1st STOP
if d == 0:
break
if check:
# Discard any trailing mark which would leave carrier on.
if x & 1:
x -= 1
ar[x] = 0
self.icm = x # index of 1st STOP
mv = memoryview(ar)
n = min(x, 4) # Fill FIFO if there are enough data points.
self.sm.put(mv[0:n])
self.arr = ar # Initial conditions for ISR
self.apt = n # Point to next data value
self.ict = 0 # IRQ count
self.sm.active(1)
def busy(self):
if self.ict is None:
return False # Just instantiated
return self.ict < self.icm
def cancel(self):
self.reps = 1

Wyświetl plik

@ -0,0 +1,37 @@
# rp2_rmt_test.py Demo for rp2_rmt
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2021 Peter Hinch
from time import sleep_ms
import array
from machine import Pin
from .rp2_rmt import RP2_RMT
def test():
ar = array.array("H", [800, 400, 800, 400, 800, 400, 800, 400, 0])
# Pulse and carrier
rmt = RP2_RMT(Pin(16, Pin.OUT), (Pin(17), 38000, 33))
# Pulse only
# rmt = RP2_RMT(Pin(16, Pin.OUT))
# Carrier only
# rmt = RP2_RMT(None, (Pin(17), 38000, 33))
rmt.send(ar, 0)
sleep_ms(10_000)
rmt.cancel()
while rmt.busy():
sleep_ms(10)
while True:
sleep_ms(400)
rmt.send(ar, 1)
test()
# Format of IR array: on/off times (μs). Realistic minimum 440/440
# ar = array.array("H", [400 if x & 1 else 800 for x in range(9)])
# ar[-1] = 0 # STOP
# Fastest IRQ rate ~444μs (Philips RC6)

Wyświetl plik

@ -0,0 +1,61 @@
# master_slave_test.py Test asynchronous interface of SpiSlave and master class
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2025 Peter Hinch
# Link pins
# 0-19 MOSI
# 1-18 SCK
# 2-17 CSN
from machine import Pin
import asyncio
from .spi_slave import SpiSlave
from .spi_master import RP2_SPI_DMA_MASTER
tsf = asyncio.ThreadSafeFlag()
def callback(dma): # Hard ISR
tsf.set() # Flag user code that transfer is complete
async def send(cs, spi, data):
cs(0) # Assert CS/
spi.write(data) # "Immediate" return: minimal blocking.
await tsf.wait() # Wait for transfer complete (other tasks run)
cs(1) # Deassert CS/
await asyncio.sleep_ms(100)
async def receive(piospi):
async for msg in piospi:
print(f"Received: {len(msg)} bytes:")
print(bytes(msg))
print()
async def test():
obuf = bytearray(range(512)) # Test data
# Master CS/
cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive.
# Pins for slave
mosi = Pin(0, Pin.IN)
sck = Pin(1, Pin.IN)
csn = Pin(2, Pin.IN)
piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn)
rt = asyncio.create_task(receive(piospi))
await asyncio.sleep_ms(0) # Ensure receive task is running
# Pins for Master
pin_sck = Pin(18, Pin.OUT, value=0)
pin_mosi = Pin(19, Pin.OUT, value=0)
spi = RP2_SPI_DMA_MASTER(4, 10_000_000, pin_sck, pin_mosi, callback)
print("\nBasic test\n")
await send(cs, spi, obuf[:256])
await send(cs, spi, obuf[:20])
await send(cs, spi, b"The quick brown fox jumps over the lazy dog")
print("\nDone")
asyncio.run(test())

Wyświetl plik

@ -1,10 +1,13 @@
# spi_dma_test.py Test script for spi_dma.py
# master_test.py Test script for spi_dma.py
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2025 Peter Hinch
# Performs SPI output: check on scope or LA.
from machine import Pin
import asyncio
from spi_dma import RP2_SPI_DMA_MASTER
from .spi_master import RP2_SPI_DMA_MASTER
pin_cs = Pin(20, Pin.OUT, value=1)
pin_sck = Pin(18, Pin.OUT, value=0)
@ -19,6 +22,7 @@ def callback(dma): # Hard ISR
spi = RP2_SPI_DMA_MASTER(6, 1_000_000, pin_sck, pin_mosi, callback)
async def send(data):
pin_cs(0) # Assert CS/
spi.write(data) # "Immediate" return: minimal blocking.

Wyświetl plik

@ -0,0 +1,73 @@
# slave_async_test.py Test asynchronous interface of SpiSlave class
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2025 Peter Hinch
# Link pins
# 0-19 MOSI
# 1-18 SCK
# 2-17 CSN
from machine import Pin, SPI
import asyncio
from .spi_slave import SpiSlave
async def send(cs, spi, obuf):
cs(0)
spi.write(obuf)
cs(1)
await asyncio.sleep_ms(100)
async def receive(piospi):
async for msg in piospi:
print(f"Received: {len(msg)} bytes:")
print(bytes(msg))
print()
async def get_msg(piospi):
rbuf = bytearray(200)
nbytes = await piospi.as_read_into(rbuf)
print(bytes(rbuf[:nbytes]))
print(f"Received: {nbytes} bytes")
async def test():
obuf = bytearray(range(512)) # Test data
# Master CS/
cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive.
# Pins for slave
mosi = Pin(0, Pin.IN)
sck = Pin(1, Pin.IN)
csn = Pin(2, Pin.IN)
piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn)
rt = asyncio.create_task(receive(piospi))
await asyncio.sleep_ms(0) # Ensure receive task is running
# Pins for Master
pin_miso = Pin(16, Pin.IN) # Not used: keep driver happy
pin_sck = Pin(18, Pin.OUT, value=0)
pin_mosi = Pin(19, Pin.OUT, value=0)
spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso)
print(spi)
print("\nBasic test\n")
await send(cs, spi, obuf[:256])
await send(cs, spi, obuf[:20])
print("\nOverrun test: send 512 bytes, rx buffer is 300 bytes.\n")
await send(cs, spi, obuf)
print("\nTest subsequent transfers\n")
await send(cs, spi, b"The quick brown fox jumps over the lazy dog")
await send(cs, spi, b"A short message")
await send(cs, spi, b"A longer message")
rt.cancel() # Terminate the read task
await asyncio.sleep_ms(0)
print("\nAsynchronous read into user supplied buffer\n")
asyncio.create_task(get_msg(piospi)) # Set up for a single read
await asyncio.sleep_ms(0) # Ensure above task gets to run
await send(cs, spi, b"Received by .as_read_into()")
await asyncio.sleep_ms(100)
print("\nDone")
asyncio.run(test())

Wyświetl plik

@ -0,0 +1,64 @@
# slave_sync_test.py Test synchronous interface of SpiSlave class
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2025 Peter Hinch
# Link pins
# 0-19 MOSI
# 1-18 SCK
# 2-17 CSN
from machine import Pin, SPI, SoftSPI
from .spi_slave import SpiSlave
from time import sleep_ms
# SPI send a passed buffer
def send(cs, spi, obuf):
cs(0)
spi.write(obuf)
cs(1)
sleep_ms(100)
buf = bytearray(300) # Read buffer
# Callback runs when transfer complete (soft ISR context)
def receive(nbytes):
print(f"Received: {nbytes} bytes:")
print(bytes(buf[:nbytes]))
print()
def test():
obuf = bytearray(range(512)) # Test data
cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive.
# Pins for slave
mosi = Pin(0, Pin.IN)
sck = Pin(1, Pin.IN)
csn = Pin(2, Pin.IN)
piospi = SpiSlave(callback=receive, sm_num=4, mosi=mosi, sck=sck, csn=csn)
# Pins for master
pin_miso = Pin(16, Pin.IN) # Not used: keep driver happy
pin_sck = Pin(18, Pin.OUT, value=0)
pin_mosi = Pin(19, Pin.OUT, value=0)
spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso)
print("\nBasic test\n")
piospi.read_into(buf)
send(cs, spi, obuf[:256])
piospi.read_into(buf)
send(cs, spi, obuf[:20])
print("\nOverrun test: send 512 bytes, rx buffer is 300 bytes.\n")
piospi.read_into(buf)
send(cs, spi, obuf)
print("\nTest subsequent transfers\n")
piospi.read_into(buf)
send(cs, spi, b"The quick brown fox jumps over the lazy dog")
piospi.read_into(buf)
send(cs, spi, b"A short message")
piospi.read_into(buf)
send(cs, spi, b"A longer message")
print("\nDone")
test()

Wyświetl plik

@ -1,4 +1,4 @@
# spi_dma.py A nonblocking SPI master for RP2040/RP2350
# spi_mater.py A nonblocking SPI master for RP2040/RP2350
# Inspired by
# https://github.com/raspberrypi/pico-micropython-examples/blob/master/pio/pio_spi.py

Wyświetl plik

@ -0,0 +1,117 @@
# spi_slave.py An asynchronous, DMA driven, SPI slave using the PIO.
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2025 Peter Hinch
import rp2
from machine import Pin, mem32
import asyncio
from micropython import schedule, alloc_emergency_exception_buf
alloc_emergency_exception_buf(100)
# pin numbers (offset from start_pin)
# 0 MOSI
# 1 Clk
# 2 CS\
@rp2.asm_pio(autopush=True, autopull=True)
def spi_in():
label("escape") # Just started, transfer ended or overrun attempt.
out(y, 32) # Get maximum byte count (blocking wait)
wait(0, pins, 2) # Wait for CS/ True
wrap_target() # Input a byte
set(x, 7)
label("bit")
jmp(pin, "next")
jmp(not_osre, "done") # Data received: quit
jmp("bit")
label("next") # clk leading edge
in_(pins, 1) # Input a bit MSB first
wait(0, pins, 1) # clk trailing
jmp(y_dec, "continue")
label("overrun") # An overrun would occur. Wait for CS/ ISR to send data.
jmp(not_osre, "done") # Data received
jmp("overrun")
label("continue")
jmp(x_dec, "bit") # Post decrement
wrap() # Next byte
label("done") # ISR has sent data
out(x, 32) # Discard it
in_(y, 30) # Return amount of unfilled buffer truncated to 30 bits
# Truncation ensures that overrun returns a short int
jmp("escape")
class SpiSlave:
def __init__(self, buf=None, callback=None, sm_num=0, *, mosi, sck, csn):
# Get data request channel for a SM: RP2040 datasheet 2.5.3 RP2350 12.6.4.1
def dreq(sm, rx=False):
d = (sm & 3) + ((sm >> 2) << 3)
return 4 + d if rx else d
self._callback = callback
# Set up DMA
self._dma = rp2.DMA()
# Transfer bytes, rather than words, don't increment the read address and pace the transfer.
self._cfg = self._dma.pack_ctrl(size=0, inc_read=False, treq_sel=dreq(sm_num, True))
self._sm_num = sm_num
self._buf = buf
self._nbytes = 0 # Number of received bytes
if buf is not None:
self._mvb = memoryview(buf)
self._tsf = asyncio.ThreadSafeFlag()
csn.irq(self._done, trigger=Pin.IRQ_RISING, hard=True) # IRQ occurs at end of transfer
self._sm = rp2.StateMachine(
sm_num,
spi_in,
in_shiftdir=rp2.PIO.SHIFT_LEFT,
push_thresh=8,
in_base=mosi,
jmp_pin=sck,
)
def __aiter__(self): # Asynchronous iterator support
return self
async def __anext__(self):
if self._buf is None:
raise OSError("No buffer provided to constructor.")
self.read_into(self._buf) # Initiate DMA and return.
await self._tsf.wait() # Wait for CS/ high (master signals transfer complete)
return self._mvb[: self._nbytes]
# Initiate a read into a buffer. Immediate return.
def read_into(self, buf):
buflen = len(buf)
self._buflen = buflen # Save for ISR
self._dma.active(0) # De-activate befor re-configuring
self._sm.active(0)
self._tsf.clear()
self._dma.config(read=self._sm, write=buf, count=buflen, ctrl=self._cfg, trigger=False)
self._dma.active(1)
self._sm.restart()
self._sm.active(1) # Start SM
self._sm.put(buflen, 3) # Number of expected bits
# Hard ISR for CS/ rising edge.
def _done(self, _): # Get no. of bytes received.
self._dma.active(0)
self._sm.put(0) # Request no. of received bits
if not self._sm.rx_fifo(): # Occurs if .read_into() never called while CSN is low:
return # ISR runs on trailing edge but SM is not running. Nothing to do.
sp = self._sm.get() >> 3 # Bits->bytes: space left in buffer or 7ffffff on overflow
self._nbytes = self._buflen - sp if sp != 0x7FFFFFF else self._buflen
self._dma.active(0)
self._sm.active(0)
self._tsf.set()
if self._callback is not None:
schedule(self._callback, self._nbytes) # Soft ISR
# Await a read into a user-supplied buffer. Return no. of bytes read.
async def as_read_into(self, buf):
self.read_into(buf) # Start the read
await self._tsf.wait() # Wait for CS/ high (master signals transfer complete)
return self._nbytes

Wyświetl plik

@ -1,35 +0,0 @@
# A nonblocking SPI master for RP2
The module `spi_dma` provides a class `RP2_SPI_DMA_MASTER` which uses DMA to
perform fast SPI output. A typical use case is to transfer the contents of a
frame buffer to a display as a background task.
## Constructor
This takes the following args:
* `sm_num` State machine no. (0..7 on RP2040, 0..11 on RP2350)
* `freq` SPI clock frequency in Hz.
* `sck` A `Pin` instance for `sck`. Pins are arbitrary.
* `mosi` A `Pin` instance for `mosi`.
* `callback` A callback to run when DMA is complete. This is run in a hard IRQ
context. Typical use is to set a `ThreadSafeFlag`. Note that the DMA completes
before transmission ends due to bytes stored in the SM FIFO. This is unlikely to
have practical consequences because of MicroPython latency: in particular
response to a `ThreadSafeFlag` typically takes >200μs.
## Methods
* `write(data : bytes)` arg a `bytes` or `bytearray` of data to transmit. Return
is rapid with transmission running in the background. Returns `None`.
* `deinit()` Disables the DMA and the SM. Returns `None`.
## CS/
The application should assert CS/ (set to 0) prior to transmission and deassert
it after transmission is complete.
## Test script and performance
The file `spi_dma_test.py` illustrates usage with `asyncio`. The module has
been tested at 30MHz, but higher frequencies may be practical with care
given to wiring.