kopia lustrzana https://github.com/peterhinch/micropython-samples
Update spi_dma.
rodzic
d69cc9fcaa
commit
96b7bfe476
|
@ -0,0 +1,35 @@
|
||||||
|
# A nonblocking SPI master for RP2
|
||||||
|
|
||||||
|
The module `spi_dma` provides a class `RP2_SPI_DMA_MASTER` which uses DMA to
|
||||||
|
perform fast SPI output. A typical use case is to transfer the contents of a
|
||||||
|
frame buffer to a display as a background task.
|
||||||
|
|
||||||
|
## Constructor
|
||||||
|
|
||||||
|
This takes the following args:
|
||||||
|
* `sm_num` State machine no. (0..7 on RP2040, 0..11 on RP2350)
|
||||||
|
* `freq` SPI clock frequency in Hz.
|
||||||
|
* `sck` A `Pin` instance for `sck`. Pins are arbitrary.
|
||||||
|
* `mosi` A `Pin` instance for `mosi`.
|
||||||
|
* `callback` A callback to run when DMA is complete. This is run in a hard IRQ
|
||||||
|
context. Typical use is to set a `ThreadSafeFlag`. Note that the DMA completes
|
||||||
|
before transmission ends due to bytes stored in the SM FIFO. This is unlikely to
|
||||||
|
have practical consequences because of MicroPython latency: in particular
|
||||||
|
response to a `ThreadSafeFlag` typically takes >200μs.
|
||||||
|
|
||||||
|
## Methods
|
||||||
|
|
||||||
|
* `write(data : bytes)` arg a `bytes` or `bytearray` of data to transmit. Return
|
||||||
|
is rapid with transmission running in the background. Returns `None`.
|
||||||
|
* `deinit()` Disables the DMA and the SM. Returns `None`.
|
||||||
|
|
||||||
|
## CS/
|
||||||
|
|
||||||
|
The application should assert CS/ (set to 0) prior to transmission and deassert
|
||||||
|
it after transmission is complete.
|
||||||
|
|
||||||
|
## Test script and performance
|
||||||
|
|
||||||
|
The file `spi_dma_test.py` illustrates usage with `asyncio`. The module has
|
||||||
|
been tested at 30MHz, but higher frequencies may be practical with care
|
||||||
|
given to wiring.
|
|
@ -1,27 +1,20 @@
|
||||||
# Derived from https://github.com/raspberrypi/pico-micropython-examples/blob/master/pio/pio_spi.py
|
# spi_dma.py A nonblocking SPI master for RP2040/RP2350
|
||||||
import rp2
|
# Inspired by
|
||||||
from machine import Pin
|
# https://github.com/raspberrypi/pico-micropython-examples/blob/master/pio/pio_spi.py
|
||||||
import asyncio
|
|
||||||
from micropython import alloc_emergency_exception_buf
|
|
||||||
|
|
||||||
alloc_emergency_exception_buf(100)
|
# Released under the MIT License (MIT). See LICENSE.
|
||||||
|
# Copyright (c) 2025 Peter Hinch
|
||||||
|
|
||||||
|
import rp2
|
||||||
|
|
||||||
|
|
||||||
@rp2.asm_pio(autopull=True, pull_thresh=8, sideset_init=rp2.PIO.OUT_LOW, out_init=rp2.PIO.OUT_LOW)
|
@rp2.asm_pio(autopull=True, pull_thresh=8, sideset_init=rp2.PIO.OUT_LOW, out_init=rp2.PIO.OUT_LOW)
|
||||||
def spi_cpha0():
|
def spi_out():
|
||||||
# Note X must be preinitialised by setup code before first byte, we reload after sending each byte
|
|
||||||
set(x, 6)
|
|
||||||
# Actual program body follows
|
|
||||||
wrap_target()
|
wrap_target()
|
||||||
pull(ifempty).side(0x0)[1]
|
set(x, 7).side(0x0)
|
||||||
label("bitloop")
|
label("bitloop")
|
||||||
out(pins, 1).side(0x0)[1]
|
out(pins, 1).side(0x0) # Stalls with CLK low while FIFO is empty
|
||||||
jmp(x_dec, "bitloop").side(0x1)[1]
|
jmp(x_dec, "bitloop").side(0x1)
|
||||||
|
|
||||||
out(pins, 1).side(0x0)
|
|
||||||
set(x, 6).side(0x0)
|
|
||||||
mov(y, y).side(0x1) # NOP
|
|
||||||
jmp(not_osre, "bitloop").side(0x1) # Fallthru if TXFIFO empties
|
|
||||||
wrap()
|
wrap()
|
||||||
|
|
||||||
|
|
||||||
|
@ -31,6 +24,8 @@ def dreq(sm, rx=False):
|
||||||
return 4 + d if rx else d
|
return 4 + d if rx else d
|
||||||
|
|
||||||
|
|
||||||
|
# The callback runs when DMA is complete. This may be up to four byte times prior to
|
||||||
|
# the SM running out of data (FIFO depth).
|
||||||
class RP2_SPI_DMA_MASTER:
|
class RP2_SPI_DMA_MASTER:
|
||||||
def __init__(self, sm_num, freq, sck, mosi, callback):
|
def __init__(self, sm_num, freq, sck, mosi, callback):
|
||||||
self._sm_num = sm_num
|
self._sm_num = sm_num
|
||||||
|
@ -39,53 +34,13 @@ class RP2_SPI_DMA_MASTER:
|
||||||
dc = dreq(sm_num) # Data request channel
|
dc = dreq(sm_num) # Data request channel
|
||||||
# Transfer bytes, don't increment the write address, irq at end, and pace the transfer.
|
# Transfer bytes, don't increment the write address, irq at end, and pace the transfer.
|
||||||
self._ctrl = self._dma.pack_ctrl(size=0, inc_write=False, irq_quiet=False, treq_sel=dc)
|
self._ctrl = self._dma.pack_ctrl(size=0, inc_write=False, irq_quiet=False, treq_sel=dc)
|
||||||
f = 4 * freq # 4 clock cycles per bit
|
f = 2 * freq # 2 clock cycles per bit
|
||||||
self._sm = rp2.StateMachine(sm_num, spi_cpha0, freq=f, sideset_base=sck, out_base=mosi)
|
self._sm = rp2.StateMachine(sm_num, spi_out, freq=f, sideset_base=sck, out_base=mosi)
|
||||||
self._sm.active(1)
|
self._sm.active(1)
|
||||||
|
|
||||||
def close(self):
|
def deinit(self):
|
||||||
self._dma.active(0)
|
self._dma.active(0)
|
||||||
self._sm.active(0)
|
self._sm.active(0)
|
||||||
|
|
||||||
def write(self, data):
|
def write(self, data):
|
||||||
self._dma.config(read=data, write=self._sm, count=len(data), ctrl=self._ctrl, trigger=True)
|
self._dma.config(read=data, write=self._sm, count=len(data), ctrl=self._ctrl, trigger=True)
|
||||||
|
|
||||||
|
|
||||||
# ***** Test Script *****
|
|
||||||
pin_cs = Pin(20, Pin.OUT, value=1)
|
|
||||||
pin_sck = Pin(18, Pin.OUT, value=0)
|
|
||||||
pin_mosi = Pin(19, Pin.OUT, value=0)
|
|
||||||
|
|
||||||
tsf = asyncio.ThreadSafeFlag()
|
|
||||||
|
|
||||||
|
|
||||||
def callback(dma): # Hard ISR
|
|
||||||
tsf.set() # Flag user code that transfer is complete
|
|
||||||
|
|
||||||
|
|
||||||
spi = RP2_SPI_DMA_MASTER(0, 1_000_000, pin_sck, pin_mosi, callback)
|
|
||||||
|
|
||||||
|
|
||||||
async def eot(): # Handle end of transfer
|
|
||||||
while True:
|
|
||||||
await tsf.wait()
|
|
||||||
pin_cs(1)
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
src_data = b"the quick brown fox jumps over the lazy dog"
|
|
||||||
asyncio.create_task(eot())
|
|
||||||
n = 0
|
|
||||||
while True:
|
|
||||||
pin_cs(0)
|
|
||||||
spi.write(src_data) # "Immediate" return.
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
print(n)
|
|
||||||
n += 1
|
|
||||||
|
|
||||||
|
|
||||||
try:
|
|
||||||
asyncio.run(main())
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
spi.close()
|
|
||||||
asyncio.new_event_loop()
|
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
# spi_dma_test.py Test script for spi_dma.py
|
||||||
|
# Released under the MIT License (MIT). See LICENSE.
|
||||||
|
# Copyright (c) 2025 Peter Hinch
|
||||||
|
|
||||||
|
from machine import Pin
|
||||||
|
import asyncio
|
||||||
|
from spi_dma import RP2_SPI_DMA_MASTER
|
||||||
|
|
||||||
|
pin_cs = Pin(20, Pin.OUT, value=1)
|
||||||
|
pin_sck = Pin(18, Pin.OUT, value=0)
|
||||||
|
pin_mosi = Pin(19, Pin.OUT, value=0)
|
||||||
|
|
||||||
|
tsf = asyncio.ThreadSafeFlag()
|
||||||
|
|
||||||
|
|
||||||
|
def callback(dma): # Hard ISR
|
||||||
|
tsf.set() # Flag user code that transfer is complete
|
||||||
|
|
||||||
|
|
||||||
|
spi = RP2_SPI_DMA_MASTER(6, 1_000_000, pin_sck, pin_mosi, callback)
|
||||||
|
|
||||||
|
async def send(data):
|
||||||
|
pin_cs(0) # Assert CS/
|
||||||
|
spi.write(data) # "Immediate" return: minimal blocking.
|
||||||
|
await tsf.wait() # Wait for transfer complete (other tasks run)
|
||||||
|
pin_cs(1) # Deassert CS/
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
src_data = b"\xFF\x55\xAA\x00the quick brown fox jumps over the lazy dog"
|
||||||
|
n = 0
|
||||||
|
while True:
|
||||||
|
asyncio.create_task(send(src_data)) # Send as a background task
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
print(n)
|
||||||
|
n += 1
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.run(main())
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
spi.deinit()
|
||||||
|
asyncio.new_event_loop()
|
Ładowanie…
Reference in New Issue