SPI master supports MISO transfers.

master
Peter Hinch 2025-08-30 14:27:35 +01:00
rodzic bbea5ce197
commit 4a68bfd9b5
4 zmienionych plików z 112 dodań i 24 usunięć

Wyświetl plik

@ -7,6 +7,7 @@ These are intended to demonstrate the use of Pico-specific hardware.
1.2 [Constructor](./RP2.md#12-constructor)
1.3 [Methods](./RP2.md#13-methods)
1.4 [CS](./RP2.md#14-cs) How to control the CS/ pin.
1.5 [Callback](./RP2.md#15-callback) Notification of transfer complete.
2. [Nonblocking SPI slave](./RP2.md#2-nonblocking-spi-slave) High speed bulk data input.
2.1 [Introduction](./RP2.md#21-introduction)
2.2 [SpiSlave class](./RP2.md#22-spislave-class)
@ -36,11 +37,12 @@ They will be installed in directories:
# 1. Nonblocking SPI master
The module `spi_master` provides a class `SpiMaster` which uses DMA to perform
fast SPI output. A typical use case is to transfer the contents of a frame
buffer to a display as a background task. The following files are provided in
the `spi` directory:
fast SPI I/O. A typical use case is to transfer the contents of a frame buffer
to a display as a background task. The following files are provided in the `spi`
directory:
* `spi_master.py` The main module.
* `master_test.py` Test script - requires a scope or LA to check SPI output.
* `master_test.py` Test script - uses a loopback (MOSI linked to MISO) to verify
the master.
* `master_slave_test.py` Full test of master linked to slave, with the latter
printing results.
@ -51,10 +53,11 @@ those on the official class are highly quantised, with (for example) 20MHz
manifesting as 12MHz. The module has been tested to 30MHz, but higher rates
should be possible.
To run the full test, the following pins should be linked:
* 0-19 MOSI
To run the tests, the following pins should be linked:
* 0-19 MOSI (master-slave links).
* 1-18 SCK
* 2-17 CSN
* 16-19 MOSI-MISO (only needed for master_test.py).
To run a test issue (e.g.):
```py
@ -68,13 +71,15 @@ To run a test issue (e.g.):
This takes the following positional args:
* `sm_num` State machine no. (0..7 on RP2040, 0..11 on RP2350)
* `freq` SPI clock frequency in Hz.
* `sck` A `Pin` instance for `sck`. Pins are arbitrary.
* `mosi` A `Pin` instance for `mosi`.
* `callback` A callback to run when DMA is complete. This is run in a hard IRQ
context. Typical use is to set a `ThreadSafeFlag`. Note that the DMA completes
before transmission ends due to bytes stored in the SM FIFO. This is unlikely to
have practical consequences because of MicroPython latency: in particular
response to a `ThreadSafeFlag` typically takes >200μs.
* `sck` An output `Pin` instance for `sck`. Pins are arbitrary.
* `mosi` An output `Pin` instance for `mosi`.
* `callback` A callback to run when DMA is complete. See [below](./RP2.md#15-callback).
Optional keyword args, for the case where data coming from the slave must be
acquired (MISO):
* `miso` A `Pin` instance for MISO (defined as input).
* `ibuf` A `bytearray` for MISO data. If the quantity of data exceeds the length
of the buffer it will be truncated.
## 1.3 Methods
@ -89,6 +94,16 @@ it after transmission is complete. An external pullup resistor to 3.3V should be
provided to ensure that the receiving device sees CS/ `False` in the interval
between power-up and application start-up. A value of a few KΩ is suggested.
## 1.5 Callback
This runs when the DMA is complete. It takes no args and runs in a hard IRQ
context. Typical use is to set a `ThreadSafeFlag`, allowing a pending task to
resume. Typically this will deassert `CS/` and initiate processing of received
data. Note that the DMA completes before transmission ends due to bytes stored
in the SM FIFO. This is unlikely to have practical consequences because of
MicroPython latency: the master executes several MP instructions before the
callback runs, and the response to a `ThreadSafeFlag` typically takes >200μs.
# 2. Nonblocking SPI slave
This module requires incoming data to conform to the most common SPI case, being

Wyświetl plik

@ -17,7 +17,7 @@ from .spi_master import SpiMaster
tsf = asyncio.ThreadSafeFlag()
def callback(dma): # Hard ISR
def callback(): # Hard ISR
tsf.set() # Flag user code that transfer is complete

Wyświetl plik

@ -12,15 +12,18 @@ from .spi_master import SpiMaster
pin_cs = Pin(20, Pin.OUT, value=1)
pin_sck = Pin(18, Pin.OUT, value=0)
pin_mosi = Pin(19, Pin.OUT, value=0)
pin_miso = Pin(16, Pin.IN)
tsf = asyncio.ThreadSafeFlag()
def callback(dma): # Hard ISR
def callback(): # Hard ISR
tsf.set() # Flag user code that transfer is complete
print("cb")
spi = SpiMaster(6, 1_000_000, pin_sck, pin_mosi, callback)
buf = bytearray(100)
spi = SpiMaster(6, 1_000_000, pin_sck, pin_mosi, callback, miso=pin_miso, ibuf=buf)
async def send(data):
@ -31,13 +34,15 @@ async def send(data):
async def main():
src_data = b"\xFF\x55\xAA\x00the quick brown fox jumps over the lazy dog"
src_data = bytearray(b"\xFF\x55\xAA\x00the quick brown fox jumps over the lazy dog")
n = 0
while True:
asyncio.create_task(send(src_data)) # Send as a background task
await asyncio.sleep(1)
print(n)
print(n, buf[: len(src_data)])
n += 1
n &= 0xFF
src_data[0] = n
try:

Wyświetl plik

@ -5,9 +5,16 @@
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2025 Peter Hinch
# API
# .write() is the only way to send data. If constucted with an input buffer, any data
# received from the slave (MISO) will be captured. Incoming data is truncated to fit buffer.
import rp2
from micropython import alloc_emergency_exception_buf
alloc_emergency_exception_buf(100)
# Output-only SM
@rp2.asm_pio(autopull=True, pull_thresh=8, sideset_init=rp2.PIO.OUT_LOW, out_init=rp2.PIO.OUT_LOW)
def spi_out():
wrap_target()
@ -18,6 +25,26 @@ def spi_out():
wrap()
# Version accepts incoming data
@rp2.asm_pio(
autopull=True,
autopush=True,
pull_thresh=8,
push_thresh=8,
sideset_init=rp2.PIO.OUT_LOW,
out_init=rp2.PIO.OUT_LOW,
)
def spi_inout():
wrap_target()
set(x, 7).side(0x0)
label("bitloop")
out(pins, 1).side(0x0) # Stalls with CLK low while FIFO is empty
mov(y, y).side(0x1) # Set clock high
in_(pins, 1).side(0x1)
jmp(x_dec, "bitloop").side(0x0)
wrap()
# Get data request channel for a SM: RP2040 datasheet 2.5.3 RP2350 12.6.4.1
def dreq(sm, rx=False):
d = (sm & 3) + ((sm >> 2) << 3)
@ -27,20 +54,61 @@ def dreq(sm, rx=False):
# The callback runs when DMA is complete. This may be up to four byte times prior to
# the SM running out of data (FIFO depth).
class SpiMaster:
def __init__(self, sm_num, freq, sck, mosi, callback):
def __init__(self, sm_num, freq, sck, mosi, callback, *, miso=None, ibuf=None):
self._sm_num = sm_num
self._cb = callback
self._dma = rp2.DMA()
self._dma.irq(handler=callback, hard=True) # Assign callback
self._dma.irq(handler=self._done, hard=True) # Assign callback
dc = dreq(sm_num) # Data request channel
# Transfer bytes, don't increment the write address, irq at end, and pace the transfer.
self._ctrl = self._dma.pack_ctrl(size=0, inc_write=False, irq_quiet=False, treq_sel=dc)
f = 2 * freq # 2 clock cycles per bit
self._sm = rp2.StateMachine(sm_num, spi_out, freq=f, sideset_base=sck, out_base=mosi)
self._io = 0 if ibuf is None else len(ibuf)
self._ibuf = ibuf
if ibuf is None: # Output only
f = 2 * freq # 2 clock cycles per bit
self._sm = rp2.StateMachine(sm_num, spi_out, freq=f, sideset_base=sck, out_base=mosi)
self._sm.active(1)
else:
# I/O
f = 4 * freq # 4 cycles per bit
self._sm = rp2.StateMachine(
sm_num,
spi_inout,
freq=f,
sideset_base=sck,
out_base=mosi,
in_base=miso,
)
self._idma = rp2.DMA()
dc = dreq(sm_num, True) # Data in
self._ictrl = self._idma.pack_ctrl(
size=0, inc_read=False, irq_quiet=False, treq_sel=dc
)
# self._idma.irq(self._done)
self._sm.active(1)
def deinit(self):
def _done(self, dma): # I/O transfer complete
self._dma.active(0)
self._sm.restart()
if self._io:
self._idma.active(0)
self._sm.active(0)
self._cb()
# callback = self._cb
# callback()
def write(self, data):
self._dma.config(read=data, write=self._sm, count=len(data), ctrl=self._ctrl, trigger=True)
ld = len(data)
self._dma.config(read=data, write=self._sm, count=ld, ctrl=self._ctrl)
self._dma.active(1)
if self._io:
cnt = min(ld, self._io) # Prevent overrun of ibuf
self._idma.config(read=self._sm, write=self._ibuf, count=cnt, ctrl=self._ictrl)
self._idma.active(1)
self._sm.active(1) # Start SM
def deinit(self):
self._dma.close()
self._sm.active(0)
if self._io:
self._idma.close()