diff --git a/README.md b/README.md index 62d65cf..04a6f6d 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,8 @@ Please also see the [official examples](https://github.com/micropython/micropyth 4.17 [2D array indexing](./README.md#417-2d-array-indexing) Use `[1:3, 20]` syntax to address a 2D array. 4.18 [Astronomy](./README.md#418-astronomy) Derive Sun and Moon rise and set times, moon phase. 4.19 [Tone detection](./README.md#419-tone-detection) Goertzel algorithm. - 5. [Module Index](./README.md#5-module-index) Supported code. Device drivers, GUI's, utilities. + 4.20 [Using RP2XXX hardware](./README.md#420-using-rp2xxx-hardware) Introduction to PIO, DMA and dual-core code. + 5. [Module Index](./README.md#5-module-index) Index to my fully supported modules. Device drivers, GUI's, utilities. 5.1 [asyncio](./README.md#51-asyncio) Tutorial and drivers for asynchronous coding. 5.2 [Memory Device Drivers](./README.md#52-memory-device-drivers) Drivers for nonvolatile memory devices. 5.3 [Inertial Measurement Units](./README.md#53-inertial-measurement-units) Gravity, gyro and magnetic sensors. @@ -402,6 +403,21 @@ This module may be used for detection of audio tones. It uses the Goertzel algorithm which is effectively a single-bin Fourier transform. See [docs](./goertzel/README.md). +## 4.20 Using RP2XXX hardware + +The RP2040 (Pico) and RP2350 (Pico 2) have some powerful hardware: +* The PIO comprising a set of programmable, high speed, state machines. +* Python-programmable DMA that can be linked to the SM's. +* A second core. + +The following samples are provided to illustrate the use of these facilities: +* A nonblocking SPI master capable of very high throughput. +* A nonblocking SPI slave. +* A means of measuring the characteristics of an incoming pulse train. +* An analog of the ESP32 RMT module: nonblocking output of complex pulse trains. + +See [the docs](./rp2/RP2.md). + # 5. Module index This index references applications and device drivers that I have developed, in diff --git a/rp2/RP2.md b/rp2/RP2.md new file mode 100644 index 0000000..a265683 --- /dev/null +++ b/rp2/RP2.md @@ -0,0 +1,281 @@ +# Code samples for RP2040 and RP2350 (Pico and Pico 2) + +These are intended to demonstrate the use of Pico-specific hardware. + +1. [Nonblocking SPI master](./RP2#1-nonblocking-spi-master) High speed bulk data output. + 1.1 [Class RP2_SPI_DMA_MASTER](./RP2#11-rp2_spi_dma_master) + 1.2 [Constructor](./RP2#12-constructor) + 1.3 [Methods](./RP2#13-methods) + 1.4 [CS](./RP2#14-cs) How to control the CS/ pin. +2. [Nonblocking SPI slave](./RP2#2-nonblocking-spi-slave) High speed bulk data input. + 2.1 [Introduction](./RP2#21-introduction) + 2.2 [SpiSlave class](./RP2#22-spislave-class) + 2.3 [Constructor](./RP2#23-constructor) + 2.4 [Synchronous Interface](./RP2#24-synchronous-interface) + 2.5 [Asynchronous Interface](./RP2#25-asynchronous-interface) + 2.6 [Test Scripts](./RP2#26-test-scripts) +3. [Pulse Measurement](./RP2#3-pulse-Measurement) Measure incoming pulses. +4. [Pulse train output](./RP2#4-pulse-train-output) Output arbitrary pulse trains as per ESP32 RMT. + 4.1 [The RP2_RMT class](./RP2#41-the-rp2_rmt-class) + 4.2 [Constructor](./RP2#42-constructor) + 4.3 [Methods](./RP2#43-methods) +      4.3.1 [send](./RP2#431-send) +      4.3.2 [busy](./RP2#432-busy) + 4.4 [Design](./RP2#44-design) + 4.5 [Limitations](./RP2#45-limitations) + +# 1. Nonblocking SPI master + +The module `spi_master` provides a class `RP2_SPI_DMA_MASTER` which uses DMA to +perform fast SPI output. A typical use case is to transfer the contents of a +frame buffer to a display as a background task. The following files are provided +in the `spi` directory: +* `spi_master.py` The main module. +* `master_test.py` Test script - requires a scope or LA to check SPI output. +* `master_slave_test.py` Full test of master linked to slave, with the latter +printing results. + +This module supports the most common SPI case, being the `machine.SPI` default: +polarity=0, phase=0, bits=8, firstbit=SPI.MSB. Benefits over the official +`machine.SPI` are the nonblocking write and the fact that baudrates are precise; +those on the official class are highly quantised, with (for example) 20MHz +manifesting as 12MHz. The module has been tested to 30MHz, but higher rates +should be possible. + +To run the full test, the following pins should be linked: +* 0-19 MOSI +* 1-18 SCK +* 2-17 CSN + +To run a test issue (e.g.): +```py +>>> import spi.master_slave_test +``` + +## 1.1 Class RP2_SPI_DMA_MASTER + +## 1.2 Constructor + +This takes the following positional args: +* `sm_num` State machine no. (0..7 on RP2040, 0..11 on RP2350) +* `freq` SPI clock frequency in Hz. +* `sck` A `Pin` instance for `sck`. Pins are arbitrary. +* `mosi` A `Pin` instance for `mosi`. +* `callback` A callback to run when DMA is complete. This is run in a hard IRQ +context. Typical use is to set a `ThreadSafeFlag`. Note that the DMA completes +before transmission ends due to bytes stored in the SM FIFO. This is unlikely to +have practical consequences because of MicroPython latency: in particular +response to a `ThreadSafeFlag` typically takes >200μs. + +## 1.3 Methods + +* `write(data : bytes)` arg a `bytes` or `bytearray` of data to transmit. Return +is rapid with transmission running in the background. Returns `None`. +* `deinit()` Disables the DMA and the SM. Returns `None`. + +## 1.4 CS/ + +The application should assert CS/ (set to 0) prior to transmission and deassert +it after transmission is complete. + +# 2. Nonblocking SPI slave + +This module requires incoming data to conform to the most common SPI case, being +the `machine.SPI` default: polarity=0, phase=0, bits=8, firstbit=SPI.MSB. + +It has been tested at a clock rate of 24MHz on an RP2040 running at 250MHz. + +The following files may be found in the `spi` directory: +* `spi_slave.py` Main module. +* `slave_sync_test` Test using synchronous code. +* `slave_async_test` Test using asynchronous code. +* `master_slave_test.py` Full test of master linked to slave, with the latter +printing results. + +Tests operate by using the official SPI library to transmit with the module +receiving (`master_slave_test` uses the nonblocking master). To run the tests +the following pins should be linked: +* 0-19 MOSI +* 1-18 SCK +* 2-17 CSN + +Tests are run by issuing (e.g.): +```py +>>> import spi.master_slave_test +``` + +# 2.1 Introduction + +This uses a PIO state machine (SM) with DMA to enable an RP2 to receive SPI +transfers from a host. Reception is non-blocking, enabling code to run while a +transfer is in progress. The principal application area is for fast transfer of +large messages. + +# 2.2 SpiSlave class + +The class presents synchronous and asynchronous APIs. In use the class is set +up to read data. The master sets `CS/` low, transmits data, then sets `CS/` +high, terminating the transfer. + +## 2.3 Constructor + +This takes the following positional args: +* `buf=None` Optional `bytearray` for incoming data. This is required if using +the asynchronous iterator interface, otherwise it is unused. +* `callback=None` Optional callback for use with the synchronous API. It takes +a single arg, being the number of bytes received. +* `sm_num=0` State machine number. + +Keyword args: Pin instances, initialised as input. GPIO nos. must be consecutive +starting from `mosi`. +* `mosi` Pin for MOSI. +* `sck` SCK Pin. +* `csn` CSN Pin. + +# 2.4 Synchronous Interface + +This is via `SpiSlave.read_into(buffer)` where `buffer` is a user supplied +`bytearray`. This must be large enough for the expected message. The method +returns immediately. When a message arrives and reception is complete, the +callback runs. Its integer arg is the number of bytes received. + +If a message is too long to fit the buffer, when the buffer fills, subsequent +characters are lost. + +# 2.5 Asynchronous Interface + +There are two ways to use this. The simplest uses a single buffer passed to the +constructor. This should be sized to accommodate the longest expected message. +Reception is via an asynchronous iterator: +```py +async def receive(piospi): # Arg is an SpiSlave instantiated with a buffer + async for msg in piospi: # msg is a memoryview of the buffer + print(bytes(msg)) +``` +This prints incoming messages as they arrive. + +An alternative approach enables the use of multiple buffers (for example two +phase "ping pong" buffering). Reception is via an asynchronous method +`SpiSlave.as_read_into(buffer)`: +```py + rbuf = bytearray(200) + nbytes = await piospi.as_read_into(rbuf) # Wait for a message, get no. of received bytes + print(bytes(rbuf[:nbytes])) +``` +As with all asynchronous code, this task pauses while others continue to run. + +# 2.6 Operation + +The slave will ignore all interface activity until CS/ is driven low. It then +receives data with the end of message identified by a low to high transition on +CS/. + +# 3. Pulse Measurement + +The file `measure_pulse.py` is a simple demo of using the PIO to measure a pulse +waveform. As written it runs a PWM to provide a signal. To run the demo link +pins GPIO16 and GPIO17. It measures period/frequency and mark (hence space and +mark/space can readily be derived). + +As written the native clock rate is used (125MHz on Pico). + +# 4. Pulse train output + +The `RP2_RMT` class provides functionality similar to that of the ESP32 `RMT` +class. It enables pulse trains to be output using a non-blocking driver. By +default the train occurs once. Alternatively it can repeat a defined number of +times, or can be repeated continuously. + +The class was designed for my [IR blaster](https://github.com/peterhinch/micropython_ir) +and [433MHz remote](https://github.com/peterhinch/micropython_remote) +libraries. It supports an optional carrier frequency, where each high pulse can +appear as a burst of a defined carrier frequency. The class can support both +forms concurrently on a pair of pins: one pin produces pulses while a second +produces carrier bursts. + +Pulse trains are specified as arrays with each element being a duration in μs. +Arrays may be of integers or half-words depending on the range of times to be +covered. The duration of a "tick" is 1μs by default, but this can be changed. + +To run the test issue: +```py +>>> import rmt.rp2_rmt_test +``` +Output is on pins 16 and/or 17: see below and test source. + +# 4.1 The RP2_RMT class + +## 4.2 Constructor + +This takes the following args: + 1. `pin_pulse=None` If an output `Pin` instance is provided, pulses will be + output on it. + 2. `carrier=None` To output a carrier, a 3-tuple should be provided comprising + `(pin, freq, duty)` where `pin` is an output pin instance, `freq` is the + carrier frequency in Hz and `duty` is the duty ratio in %. + 3. `sm_no=0` State machine no. + 4. `sm_freq=1_000_000` Clock frequency for SM. Defines the unit for pulse + durations. + +## 4.3 Methods + +### 4.3.1 send + +This returns "immediately" with a pulse train being emitted as a background +process. Args: + 1. `ar` A zero terminated array of pulse durations in μs. See notes below. + 2. `reps=1` No. of repetions. 0 indicates continuous output. + 3. `check=True` By default ensures that the pulse train ends in the inactive + state. + +In normal operation, between pulse trains, the pulse pin is low and the carrier +is off. A pulse train ends when a 0 pulse width is encountered: this allows +pulse trains to be shorter than the array length, for example where a +pre-allocated array stores pulse trains of varying lengths. In RF transmitter +applications ensuring the carrier is off between pulse trains may be a legal +requirement, so by default the `send` method enforces this. + +The first element of the array defines the duration of the first high going +pulse, with the second being the duration of the first `off` period. If there +are an even number of elements prior to the terminating 0, the signal will end +in the `off` state. If the `check` arg is `True`, `.send()` will check for an +odd number of elements; in this case it will overwrite the last element with 0 +to enforce a final `off` state. + +This check may be skipped by setting `check=False`. This provides a means of +inverting the normal sense of the driver: if the first pulse train has an odd +number of elements and `check=False` the pin will be left high (and the carrier +on). Subsequent normal pulsetrains will start and end in the high state. + +### 4.3.2 busy + +No args. Returns `True` if a pulse train is being emitted. + +### 4.3.3 cancel + +No args. If a pulse train is being emitted it will continue to the end but no +further repetitions will take place. + +# 4.4 Design + +The class constructor installs one of two PIO scripts depending on whether a +`pin_pulse` is specified. If it is, the `pulsetrain` script is loaded which +drives the pin directly from the PIO. If no `pin_pulse` is required, the +`irqtrain` script is loaded. Both scripts cause an IRQ to be raised at times +when a pulse would start or end. + +The `send` method loads the transmit FIFO with initial pulse durations and +starts the state machine. The `._cb` ISR keeps the FIFO loaded with data until +a 0 entry is encountered. It also turns the carrier on and off (using a PWM +instance). This means that there is some latency between the pulse and the +carrier. However latencies at start and end are effectively identical, so the +duration of a carrier burst is correct. + +# 4.5 Limitations + +While the tick interval can be reduced to provide timing precision better than +1μs, the design of this class will not support very high pulse repetition +frequencies. This is because each pulse causes an interrupt: MicroPython is +unable to support high IRQ rates. +[This library](https://github.com/robert-hh/RP2040-Examples/tree/master/pulses) +is more capable in this regard. diff --git a/rp2/measure_pulse/measure_pulse.py b/rp2/measure_pulse/measure_pulse.py new file mode 100644 index 0000000..b399d81 --- /dev/null +++ b/rp2/measure_pulse/measure_pulse.py @@ -0,0 +1,68 @@ +# measure_pulse.py Measure a pulse train with PIO + +# Released under the MIT License (MIT). See LICENSE. +# Copyright (c) 2025 Peter Hinch + +# Link GPIO16-GPIO17 to test + +from machine import Pin, PWM +import rp2 +import time + + +@rp2.asm_pio(set_init=rp2.PIO.IN_LOW, autopush=True, push_thresh=32) +def period(): + wrap_target() + set(x, 0) + wait(0, pin, 0) # Wait for pin to go low + wait(1, pin, 0) # Low to high transition + label("low_high") + jmp(x_dec, "next")[1] # unconditional + label("next") + jmp(pin, "low_high") # while pin is high + label("low") # pin is low + jmp(x_dec, "nxt") + label("nxt") + jmp(pin, "done") # pin has gone high: all done + jmp("low") + label("done") + in_(x, 32) # Auto push: SM stalls if FIFO full + wrap() + + +@rp2.asm_pio(set_init=rp2.PIO.IN_LOW, autopush=True, push_thresh=32) +def mark(): + wrap_target() + set(x, 0) + wait(0, pin, 0) # Wait for pin to go low + wait(1, pin, 0) # Low to high transition + label("low_high") + jmp(x_dec, "next")[1] # unconditional + label("next") + jmp(pin, "low_high") # while pin is high + in_(x, 32) # Auto push: SM stalls if FIFO full + wrap() + + +ck = 100_000_000 # Clock rate in Hz. +pin16 = Pin(16, Pin.IN, Pin.PULL_UP) +sm0 = rp2.StateMachine(0, period, in_base=pin16, jmp_pin=pin16, freq=ck) +sm0.active(1) +sm1 = rp2.StateMachine(1, mark, in_base=pin16, jmp_pin=pin16, freq=ck) +sm1.active(1) + +# Clock is 100MHz. 3 cycles per iteration, so unit is 30.0ns +def scale(v): + return (1 + (v ^ 0xFFFFFFFF)) * 3000 / ck # Scale to ms + + +# ***** TEST WITH PWM ***** +pwm = PWM(Pin(17)) +pwm.freq(1000) +pwm.duty_u16(0xFFFF // 3) + +while True: + period = scale(sm0.get()) + mark = scale(sm1.get()) + print(f"Period {period}ms Mark {mark}ms m/s {100*mark / (period - mark):5.2f}%") + time.sleep(0.2) diff --git a/rp2/rmt/rp2_rmt.py b/rp2/rmt/rp2_rmt.py new file mode 100644 index 0000000..9feb6c6 --- /dev/null +++ b/rp2/rmt/rp2_rmt.py @@ -0,0 +1,107 @@ +# rp2_rmt.py A RMT-like class for the RP2. + +# Released under the MIT License (MIT). See LICENSE. + +# Copyright (c) 2021 Peter Hinch + +from machine import Pin, PWM +import rp2 + + +@rp2.asm_pio(set_init=rp2.PIO.OUT_LOW, autopull=True, pull_thresh=32) +def pulsetrain(): + wrap_target() + out(x, 32) # No of 1MHz ticks. Block if FIFO MT at end. + irq(rel(0)) + set(pins, 1) # Set pin high + label("loop") + jmp(x_dec, "loop") + irq(rel(0)) + set(pins, 0) # Set pin low + out(y, 32) # Low time. + label("loop_lo") + jmp(y_dec, "loop_lo") + wrap() + + +@rp2.asm_pio(autopull=True, pull_thresh=32) +def irqtrain(): + wrap_target() + out(x, 32) # No of 1MHz ticks. Block if FIFO MT at end. + irq(rel(0)) + label("loop") + jmp(x_dec, "loop") + wrap() + + +class DummyPWM: + def duty_u16(self, _): + pass + + +class RP2_RMT: + def __init__(self, pin_pulse=None, carrier=None, sm_no=0, sm_freq=1_000_000): + if carrier is None: + self.pwm = DummyPWM() + self.duty = (0, 0) + else: + pin_car, freq, duty = carrier + self.pwm = PWM(pin_car) # Set up PWM with carrier off. + self.pwm.freq(freq) + self.pwm.duty_u16(0) + self.duty = (int(0xFFFF * duty // 100), 0) + if pin_pulse is None: + self.sm = rp2.StateMachine(sm_no, irqtrain, freq=sm_freq) + else: + self.sm = rp2.StateMachine(sm_no, pulsetrain, freq=sm_freq, set_base=pin_pulse) + self.apt = 0 # Array index + self.arr = None # Array + self.ict = None # Current IRQ count + self.icm = 0 # End IRQ count + self.reps = 0 # 0 == forever n == no. of reps + rp2.PIO(0).irq(self._cb) + + # IRQ callback. Because of FIFO IRQ's keep arriving after STOP. + def _cb(self, pio): + self.pwm.duty_u16(self.duty[self.ict & 1]) + self.ict += 1 + if d := self.arr[self.apt]: # If data available feed FIFO + self.sm.put(d) + self.apt += 1 + else: + if r := self.reps != 1: # All done if reps == 1 + if r: # 0 == run forever + self.reps -= 1 + self.sm.put(self.arr[0]) + self.apt = 1 # Set pointer and count to state + self.ict = 1 # after 1st IRQ + + # Arg is an array of times in μs terminated by 0. + def send(self, ar, reps=1, check=True): + self.sm.active(0) + self.reps = reps + ar[-1] = 0 # Ensure at least one STOP + for x, d in enumerate(ar): # Find 1st STOP + if d == 0: + break + if check: + # Discard any trailing mark which would leave carrier on. + if x & 1: + x -= 1 + ar[x] = 0 + self.icm = x # index of 1st STOP + mv = memoryview(ar) + n = min(x, 4) # Fill FIFO if there are enough data points. + self.sm.put(mv[0:n]) + self.arr = ar # Initial conditions for ISR + self.apt = n # Point to next data value + self.ict = 0 # IRQ count + self.sm.active(1) + + def busy(self): + if self.ict is None: + return False # Just instantiated + return self.ict < self.icm + + def cancel(self): + self.reps = 1 diff --git a/rp2/rmt/rp2_rmt_test.py b/rp2/rmt/rp2_rmt_test.py new file mode 100644 index 0000000..2471e6c --- /dev/null +++ b/rp2/rmt/rp2_rmt_test.py @@ -0,0 +1,37 @@ +# rp2_rmt_test.py Demo for rp2_rmt + +# Released under the MIT License (MIT). See LICENSE. +# Copyright (c) 2021 Peter Hinch + + +from time import sleep_ms +import array +from machine import Pin +from .rp2_rmt import RP2_RMT + + +def test(): + ar = array.array("H", [800, 400, 800, 400, 800, 400, 800, 400, 0]) + # Pulse and carrier + rmt = RP2_RMT(Pin(16, Pin.OUT), (Pin(17), 38000, 33)) + # Pulse only + # rmt = RP2_RMT(Pin(16, Pin.OUT)) + # Carrier only + # rmt = RP2_RMT(None, (Pin(17), 38000, 33)) + rmt.send(ar, 0) + sleep_ms(10_000) + rmt.cancel() + while rmt.busy(): + sleep_ms(10) + + while True: + sleep_ms(400) + rmt.send(ar, 1) + + +test() + +# Format of IR array: on/off times (μs). Realistic minimum 440/440 +# ar = array.array("H", [400 if x & 1 else 800 for x in range(9)]) +# ar[-1] = 0 # STOP +# Fastest IRQ rate ~444μs (Philips RC6) diff --git a/rp2/spi/master_slave_test.py b/rp2/spi/master_slave_test.py new file mode 100644 index 0000000..c7a3204 --- /dev/null +++ b/rp2/spi/master_slave_test.py @@ -0,0 +1,61 @@ +# master_slave_test.py Test asynchronous interface of SpiSlave and master class + +# Released under the MIT License (MIT). See LICENSE. +# Copyright (c) 2025 Peter Hinch + +# Link pins +# 0-19 MOSI +# 1-18 SCK +# 2-17 CSN + +from machine import Pin +import asyncio +from .spi_slave import SpiSlave +from .spi_master import RP2_SPI_DMA_MASTER + + +tsf = asyncio.ThreadSafeFlag() + + +def callback(dma): # Hard ISR + tsf.set() # Flag user code that transfer is complete + + +async def send(cs, spi, data): + cs(0) # Assert CS/ + spi.write(data) # "Immediate" return: minimal blocking. + await tsf.wait() # Wait for transfer complete (other tasks run) + cs(1) # Deassert CS/ + await asyncio.sleep_ms(100) + + +async def receive(piospi): + async for msg in piospi: + print(f"Received: {len(msg)} bytes:") + print(bytes(msg)) + print() + + +async def test(): + obuf = bytearray(range(512)) # Test data + # Master CS/ + cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive. + # Pins for slave + mosi = Pin(0, Pin.IN) + sck = Pin(1, Pin.IN) + csn = Pin(2, Pin.IN) + piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn) + rt = asyncio.create_task(receive(piospi)) + await asyncio.sleep_ms(0) # Ensure receive task is running + # Pins for Master + pin_sck = Pin(18, Pin.OUT, value=0) + pin_mosi = Pin(19, Pin.OUT, value=0) + spi = RP2_SPI_DMA_MASTER(4, 10_000_000, pin_sck, pin_mosi, callback) + print("\nBasic test\n") + await send(cs, spi, obuf[:256]) + await send(cs, spi, obuf[:20]) + await send(cs, spi, b"The quick brown fox jumps over the lazy dog") + print("\nDone") + + +asyncio.run(test()) diff --git a/temp/spi_dma_test.py b/rp2/spi/master_test.py similarity index 88% rename from temp/spi_dma_test.py rename to rp2/spi/master_test.py index 4ed1f3c..720e841 100644 --- a/temp/spi_dma_test.py +++ b/rp2/spi/master_test.py @@ -1,10 +1,13 @@ -# spi_dma_test.py Test script for spi_dma.py +# master_test.py Test script for spi_dma.py + # Released under the MIT License (MIT). See LICENSE. # Copyright (c) 2025 Peter Hinch +# Performs SPI output: check on scope or LA. + from machine import Pin import asyncio -from spi_dma import RP2_SPI_DMA_MASTER +from .spi_master import RP2_SPI_DMA_MASTER pin_cs = Pin(20, Pin.OUT, value=1) pin_sck = Pin(18, Pin.OUT, value=0) @@ -19,6 +22,7 @@ def callback(dma): # Hard ISR spi = RP2_SPI_DMA_MASTER(6, 1_000_000, pin_sck, pin_mosi, callback) + async def send(data): pin_cs(0) # Assert CS/ spi.write(data) # "Immediate" return: minimal blocking. diff --git a/rp2/spi/slave_async_test.py b/rp2/spi/slave_async_test.py new file mode 100644 index 0000000..d57ca32 --- /dev/null +++ b/rp2/spi/slave_async_test.py @@ -0,0 +1,73 @@ +# slave_async_test.py Test asynchronous interface of SpiSlave class + +# Released under the MIT License (MIT). See LICENSE. +# Copyright (c) 2025 Peter Hinch + +# Link pins +# 0-19 MOSI +# 1-18 SCK +# 2-17 CSN + +from machine import Pin, SPI +import asyncio +from .spi_slave import SpiSlave + + +async def send(cs, spi, obuf): + cs(0) + spi.write(obuf) + cs(1) + await asyncio.sleep_ms(100) + + +async def receive(piospi): + async for msg in piospi: + print(f"Received: {len(msg)} bytes:") + print(bytes(msg)) + print() + + +async def get_msg(piospi): + rbuf = bytearray(200) + nbytes = await piospi.as_read_into(rbuf) + print(bytes(rbuf[:nbytes])) + print(f"Received: {nbytes} bytes") + + +async def test(): + obuf = bytearray(range(512)) # Test data + # Master CS/ + cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive. + # Pins for slave + mosi = Pin(0, Pin.IN) + sck = Pin(1, Pin.IN) + csn = Pin(2, Pin.IN) + piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn) + rt = asyncio.create_task(receive(piospi)) + await asyncio.sleep_ms(0) # Ensure receive task is running + # Pins for Master + pin_miso = Pin(16, Pin.IN) # Not used: keep driver happy + pin_sck = Pin(18, Pin.OUT, value=0) + pin_mosi = Pin(19, Pin.OUT, value=0) + spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso) + print(spi) + print("\nBasic test\n") + await send(cs, spi, obuf[:256]) + await send(cs, spi, obuf[:20]) + print("\nOverrun test: send 512 bytes, rx buffer is 300 bytes.\n") + await send(cs, spi, obuf) + print("\nTest subsequent transfers\n") + await send(cs, spi, b"The quick brown fox jumps over the lazy dog") + await send(cs, spi, b"A short message") + await send(cs, spi, b"A longer message") + rt.cancel() # Terminate the read task + await asyncio.sleep_ms(0) + print("\nAsynchronous read into user supplied buffer\n") + asyncio.create_task(get_msg(piospi)) # Set up for a single read + await asyncio.sleep_ms(0) # Ensure above task gets to run + await send(cs, spi, b"Received by .as_read_into()") + await asyncio.sleep_ms(100) + print("\nDone") + + +asyncio.run(test()) diff --git a/rp2/spi/slave_sync_test.py b/rp2/spi/slave_sync_test.py new file mode 100644 index 0000000..6c4853d --- /dev/null +++ b/rp2/spi/slave_sync_test.py @@ -0,0 +1,64 @@ +# slave_sync_test.py Test synchronous interface of SpiSlave class + +# Released under the MIT License (MIT). See LICENSE. +# Copyright (c) 2025 Peter Hinch + +# Link pins +# 0-19 MOSI +# 1-18 SCK +# 2-17 CSN + +from machine import Pin, SPI, SoftSPI +from .spi_slave import SpiSlave +from time import sleep_ms + +# SPI send a passed buffer +def send(cs, spi, obuf): + cs(0) + spi.write(obuf) + cs(1) + sleep_ms(100) + + +buf = bytearray(300) # Read buffer + +# Callback runs when transfer complete (soft ISR context) +def receive(nbytes): + print(f"Received: {nbytes} bytes:") + print(bytes(buf[:nbytes])) + print() + + +def test(): + obuf = bytearray(range(512)) # Test data + cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive. + # Pins for slave + mosi = Pin(0, Pin.IN) + sck = Pin(1, Pin.IN) + csn = Pin(2, Pin.IN) + piospi = SpiSlave(callback=receive, sm_num=4, mosi=mosi, sck=sck, csn=csn) + # Pins for master + pin_miso = Pin(16, Pin.IN) # Not used: keep driver happy + pin_sck = Pin(18, Pin.OUT, value=0) + pin_mosi = Pin(19, Pin.OUT, value=0) + spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso) + + print("\nBasic test\n") + piospi.read_into(buf) + send(cs, spi, obuf[:256]) + piospi.read_into(buf) + send(cs, spi, obuf[:20]) + print("\nOverrun test: send 512 bytes, rx buffer is 300 bytes.\n") + piospi.read_into(buf) + send(cs, spi, obuf) + print("\nTest subsequent transfers\n") + piospi.read_into(buf) + send(cs, spi, b"The quick brown fox jumps over the lazy dog") + piospi.read_into(buf) + send(cs, spi, b"A short message") + piospi.read_into(buf) + send(cs, spi, b"A longer message") + print("\nDone") + + +test() diff --git a/temp/spi_dma.py b/rp2/spi/spi_master.py similarity index 96% rename from temp/spi_dma.py rename to rp2/spi/spi_master.py index bb64411..a47e34d 100644 --- a/temp/spi_dma.py +++ b/rp2/spi/spi_master.py @@ -1,4 +1,4 @@ -# spi_dma.py A nonblocking SPI master for RP2040/RP2350 +# spi_mater.py A nonblocking SPI master for RP2040/RP2350 # Inspired by # https://github.com/raspberrypi/pico-micropython-examples/blob/master/pio/pio_spi.py diff --git a/rp2/spi/spi_slave.py b/rp2/spi/spi_slave.py new file mode 100644 index 0000000..869c290 --- /dev/null +++ b/rp2/spi/spi_slave.py @@ -0,0 +1,117 @@ +# spi_slave.py An asynchronous, DMA driven, SPI slave using the PIO. + +# Released under the MIT License (MIT). See LICENSE. +# Copyright (c) 2025 Peter Hinch + +import rp2 +from machine import Pin, mem32 +import asyncio +from micropython import schedule, alloc_emergency_exception_buf + +alloc_emergency_exception_buf(100) + +# pin numbers (offset from start_pin) +# 0 MOSI +# 1 Clk +# 2 CS\ + + +@rp2.asm_pio(autopush=True, autopull=True) +def spi_in(): + label("escape") # Just started, transfer ended or overrun attempt. + out(y, 32) # Get maximum byte count (blocking wait) + wait(0, pins, 2) # Wait for CS/ True + wrap_target() # Input a byte + set(x, 7) + label("bit") + jmp(pin, "next") + jmp(not_osre, "done") # Data received: quit + jmp("bit") + label("next") # clk leading edge + in_(pins, 1) # Input a bit MSB first + wait(0, pins, 1) # clk trailing + jmp(y_dec, "continue") + label("overrun") # An overrun would occur. Wait for CS/ ISR to send data. + jmp(not_osre, "done") # Data received + jmp("overrun") + label("continue") + jmp(x_dec, "bit") # Post decrement + wrap() # Next byte + label("done") # ISR has sent data + out(x, 32) # Discard it + in_(y, 30) # Return amount of unfilled buffer truncated to 30 bits + # Truncation ensures that overrun returns a short int + jmp("escape") + + +class SpiSlave: + def __init__(self, buf=None, callback=None, sm_num=0, *, mosi, sck, csn): + # Get data request channel for a SM: RP2040 datasheet 2.5.3 RP2350 12.6.4.1 + def dreq(sm, rx=False): + d = (sm & 3) + ((sm >> 2) << 3) + return 4 + d if rx else d + + self._callback = callback + # Set up DMA + self._dma = rp2.DMA() + # Transfer bytes, rather than words, don't increment the read address and pace the transfer. + self._cfg = self._dma.pack_ctrl(size=0, inc_read=False, treq_sel=dreq(sm_num, True)) + self._sm_num = sm_num + self._buf = buf + self._nbytes = 0 # Number of received bytes + if buf is not None: + self._mvb = memoryview(buf) + self._tsf = asyncio.ThreadSafeFlag() + csn.irq(self._done, trigger=Pin.IRQ_RISING, hard=True) # IRQ occurs at end of transfer + self._sm = rp2.StateMachine( + sm_num, + spi_in, + in_shiftdir=rp2.PIO.SHIFT_LEFT, + push_thresh=8, + in_base=mosi, + jmp_pin=sck, + ) + + def __aiter__(self): # Asynchronous iterator support + return self + + async def __anext__(self): + if self._buf is None: + raise OSError("No buffer provided to constructor.") + + self.read_into(self._buf) # Initiate DMA and return. + await self._tsf.wait() # Wait for CS/ high (master signals transfer complete) + return self._mvb[: self._nbytes] + + # Initiate a read into a buffer. Immediate return. + def read_into(self, buf): + buflen = len(buf) + self._buflen = buflen # Save for ISR + self._dma.active(0) # De-activate befor re-configuring + self._sm.active(0) + self._tsf.clear() + self._dma.config(read=self._sm, write=buf, count=buflen, ctrl=self._cfg, trigger=False) + self._dma.active(1) + self._sm.restart() + self._sm.active(1) # Start SM + self._sm.put(buflen, 3) # Number of expected bits + + # Hard ISR for CS/ rising edge. + def _done(self, _): # Get no. of bytes received. + self._dma.active(0) + self._sm.put(0) # Request no. of received bits + if not self._sm.rx_fifo(): # Occurs if .read_into() never called while CSN is low: + return # ISR runs on trailing edge but SM is not running. Nothing to do. + sp = self._sm.get() >> 3 # Bits->bytes: space left in buffer or 7ffffff on overflow + self._nbytes = self._buflen - sp if sp != 0x7FFFFFF else self._buflen + self._dma.active(0) + self._sm.active(0) + self._tsf.set() + if self._callback is not None: + schedule(self._callback, self._nbytes) # Soft ISR + + # Await a read into a user-supplied buffer. Return no. of bytes read. + async def as_read_into(self, buf): + self.read_into(buf) # Start the read + await self._tsf.wait() # Wait for CS/ high (master signals transfer complete) + return self._nbytes diff --git a/temp/SPI_DMA.md b/temp/SPI_DMA.md deleted file mode 100644 index 3dab1eb..0000000 --- a/temp/SPI_DMA.md +++ /dev/null @@ -1,35 +0,0 @@ -# A nonblocking SPI master for RP2 - -The module `spi_dma` provides a class `RP2_SPI_DMA_MASTER` which uses DMA to -perform fast SPI output. A typical use case is to transfer the contents of a -frame buffer to a display as a background task. - -## Constructor - -This takes the following args: -* `sm_num` State machine no. (0..7 on RP2040, 0..11 on RP2350) -* `freq` SPI clock frequency in Hz. -* `sck` A `Pin` instance for `sck`. Pins are arbitrary. -* `mosi` A `Pin` instance for `mosi`. -* `callback` A callback to run when DMA is complete. This is run in a hard IRQ -context. Typical use is to set a `ThreadSafeFlag`. Note that the DMA completes -before transmission ends due to bytes stored in the SM FIFO. This is unlikely to -have practical consequences because of MicroPython latency: in particular -response to a `ThreadSafeFlag` typically takes >200μs. - -## Methods - -* `write(data : bytes)` arg a `bytes` or `bytearray` of data to transmit. Return -is rapid with transmission running in the background. Returns `None`. -* `deinit()` Disables the DMA and the SM. Returns `None`. - -## CS/ - -The application should assert CS/ (set to 0) prior to transmission and deassert -it after transmission is complete. - -## Test script and performance - -The file `spi_dma_test.py` illustrates usage with `asyncio`. The module has -been tested at 30MHz, but higher frequencies may be practical with care -given to wiring.