Update spi_dma.py

master
Peter Hinch 2025-07-12 18:04:29 +01:00
rodzic fb62f50103
commit d69cc9fcaa
1 zmienionych plików z 72 dodań i 46 usunięć

Wyświetl plik

@ -1,65 +1,91 @@
# Derived from https://github.com/raspberrypi/pico-micropython-examples/blob/master/pio/pio_spi.py # Derived from https://github.com/raspberrypi/pico-micropython-examples/blob/master/pio/pio_spi.py
import rp2 import rp2
from machine import Pin from machine import Pin
import time import asyncio
from micropython import alloc_emergency_exception_buf
# ***** Define connfiguration ***** alloc_emergency_exception_buf(100)
sm_num = 0
pio_num = 0
freq = 1_000_000
pincs = Pin(20, Pin.OUT)
pin_sck = Pin(18)
pin_mosi = Pin(19)
# ***** Transfer complete callback *****
def callback(dma):
pincs(1)
# Flag user code that transfer is complete
# ***** @rp2.asm_pio(autopull=True, pull_thresh=8, sideset_init=rp2.PIO.OUT_LOW, out_init=rp2.PIO.OUT_LOW)
@rp2.asm_pio(autopull=True, pull_thresh=8, sideset_init=(rp2.PIO.OUT_LOW,), out_init=rp2.PIO.OUT_LOW)
def spi_cpha0(): def spi_cpha0():
# Note X must be preinitialised by setup code before first byte, we reload after sending each byte # Note X must be preinitialised by setup code before first byte, we reload after sending each byte
set(x, 6) set(x, 6)
# Actual program body follows # Actual program body follows
wrap_target() wrap_target()
pull(ifempty) .side(0x0) [1] pull(ifempty).side(0x0)[1]
label("bitloop") label("bitloop")
out(pins, 1) .side(0x0) [1] out(pins, 1).side(0x0)[1]
#mov(y, y) .side(0x1) jmp(x_dec, "bitloop").side(0x1)[1]
jmp(x_dec, "bitloop") .side(0x1) [1]
out(pins, 1) .side(0x0) out(pins, 1).side(0x0)
set(x, 6) .side(0x0) set(x, 6).side(0x0)
mov(y, y) .side(0x1) # NOP mov(y, y).side(0x1) # NOP
jmp(not_osre, "bitloop") .side(0x1) # Fallthru if TXFIFO empties jmp(not_osre, "bitloop").side(0x1) # Fallthru if TXFIFO empties
wrap() wrap()
piospi_sm = rp2.StateMachine(sm_num, spi_cpha0, freq=4*freq, sideset_base=pin_sck, out_base=pin_mosi)
piospi_sm.active(1)
DATA_REQUEST_INDEX = (pio_num << 3) + sm_num # Get data request channel for a SM: RP2040 datasheet 2.5.3 RP2350 12.6.4.1
def dreq(sm, rx=False):
d = (sm & 3) + ((sm >> 2) << 3)
return 4 + d if rx else d
class RP2_SPI_DMA_MASTER:
def __init__(self, sm_num, freq, sck, mosi, callback):
self._sm_num = sm_num
self._dma = rp2.DMA()
self._dma.irq(handler=callback, hard=True) # Assign callback
dc = dreq(sm_num) # Data request channel
# Transfer bytes, don't increment the write address, irq at end, and pace the transfer.
self._ctrl = self._dma.pack_ctrl(size=0, inc_write=False, irq_quiet=False, treq_sel=dc)
f = 4 * freq # 4 clock cycles per bit
self._sm = rp2.StateMachine(sm_num, spi_cpha0, freq=f, sideset_base=sck, out_base=mosi)
self._sm.active(1)
def close(self):
self._dma.active(0)
self._sm.active(0)
def write(self, data):
self._dma.config(read=data, write=self._sm, count=len(data), ctrl=self._ctrl, trigger=True)
d = rp2.DMA()
d.irq(handler=callback, hard=True) # Assign callback
# Transfer bytes, rather than words, don't increment the write address, irq at end, and pace the transfer.
c = d.pack_ctrl(size=0, inc_write=False, irq_quiet=False, treq_sel=DATA_REQUEST_INDEX)
def send(data):
pincs(0) # Assert CS/
d.config(
read=data,
write=piospi_sm,
count=len(data),
ctrl=c,
trigger=True
)
# ***** Test Script ***** # ***** Test Script *****
n = 0 pin_cs = Pin(20, Pin.OUT, value=1)
while True: pin_sck = Pin(18, Pin.OUT, value=0)
src_data = b'the quick brown fox jumps over the lazy dog' pin_mosi = Pin(19, Pin.OUT, value=0)
send(src_data) # "Immediate" return.
time.sleep(1) tsf = asyncio.ThreadSafeFlag()
print(n)
n += 1
def callback(dma): # Hard ISR
tsf.set() # Flag user code that transfer is complete
spi = RP2_SPI_DMA_MASTER(0, 1_000_000, pin_sck, pin_mosi, callback)
async def eot(): # Handle end of transfer
while True:
await tsf.wait()
pin_cs(1)
async def main():
src_data = b"the quick brown fox jumps over the lazy dog"
asyncio.create_task(eot())
n = 0
while True:
pin_cs(0)
spi.write(src_data) # "Immediate" return.
await asyncio.sleep(1)
print(n)
n += 1
try:
asyncio.run(main())
except KeyboardInterrupt:
spi.close()
asyncio.new_event_loop()