RP2: Bidirectional SPI initial commit.

master
Peter Hinch 2025-09-07 18:10:14 +01:00
rodzic 4a68bfd9b5
commit c89510342d
7 zmienionych plików z 156 dodań i 38 usunięć

Wyświetl plik

@ -8,9 +8,12 @@ from machine import Pin, reset
from .spi_slave import SpiSlave
from time import sleep_ms
mosi = Pin(0, Pin.IN)
sck = Pin(1, Pin.IN)
csn = Pin(2, Pin.IN)
# mosi = Pin(0, Pin.IN)
# sck = Pin(1, Pin.IN)
# csn = Pin(2, Pin.IN)
mosi = Pin(12, Pin.IN)
sck = Pin(13, Pin.IN)
csn = Pin(14, Pin.IN)
piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn)

Wyświetl plik

@ -7,6 +7,7 @@
# 0-19 MOSI
# 1-18 SCK
# 2-17 CSN
# 3-16 MISO
from machine import Pin
import asyncio
@ -18,6 +19,7 @@ tsf = asyncio.ThreadSafeFlag()
def callback(): # Hard ISR
print("cb")
tsf.set() # Flag user code that transfer is complete
@ -25,31 +27,38 @@ def callback(): # Hard ISR
cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive.
pin_sck = Pin(18, Pin.OUT, value=0)
pin_mosi = Pin(19, Pin.OUT, value=0)
spi = SpiMaster(4, 10_000_000, pin_sck, pin_mosi, callback)
pin_miso = Pin(16, Pin.IN)
ibuf = bytearray(20)
spi = SpiMaster(4, 10_000_000, pin_sck, pin_mosi, callback, miso=pin_miso, ibuf=ibuf)
# Pins for slave
mosi = Pin(0, Pin.IN)
sck = Pin(1, Pin.IN)
csn = Pin(2, Pin.IN)
piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn)
miso = Pin(3, Pin.OUT, value=0)
piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn, miso=miso)
async def send(data):
cs(0) # Assert CS/
spi.write(data) # "Immediate" return: minimal blocking.
print("GH01")
await tsf.wait() # Wait for transfer complete (other tasks run)
print("GH02")
cs(1) # Deassert CS/
await asyncio.sleep_ms(100)
print("Master received", ibuf)
async def receive(piospi):
async for msg in piospi:
print(f"Received: {len(msg)} bytes:")
print(f"Slave received: {len(msg)} bytes:")
print(bytes(msg))
print()
async def test():
obuf = bytearray(range(512)) # Test data
# piospi.write(b"Hello from slave", -1) # Repeat
rt = asyncio.create_task(receive(piospi))
await asyncio.sleep_ms(0) # Ensure receive task is running
print("\nBasic test\n")

Wyświetl plik

@ -9,7 +9,7 @@ from machine import Pin
import asyncio
from .spi_master import SpiMaster
pin_cs = Pin(20, Pin.OUT, value=1)
pin_cs = Pin(17, Pin.OUT, value=1)
pin_sck = Pin(18, Pin.OUT, value=0)
pin_mosi = Pin(19, Pin.OUT, value=0)
pin_miso = Pin(16, Pin.IN)
@ -19,7 +19,6 @@ tsf = asyncio.ThreadSafeFlag()
def callback(): # Hard ISR
tsf.set() # Flag user code that transfer is complete
print("cb")
buf = bytearray(100)
@ -37,9 +36,9 @@ async def main():
src_data = bytearray(b"\xFF\x55\xAA\x00the quick brown fox jumps over the lazy dog")
n = 0
while True:
asyncio.create_task(send(src_data)) # Send as a background task
await send(src_data)
print(n, bytes(buf[: len(src_data)]))
await asyncio.sleep(1)
print(n, buf[: len(src_data)])
n += 1
n &= 0xFF
src_data[0] = n

Wyświetl plik

@ -0,0 +1,44 @@
# master_slave_test.py Test asynchronous interface of SpiSlave and master class
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2025 Peter Hinch
from machine import Pin
import asyncio
from .spi_slave import SpiSlave
# Pins for slave
mosi = Pin(12, Pin.IN)
sck = Pin(13, Pin.IN)
csn = Pin(14, Pin.IN)
miso = Pin(15, Pin.OUT, value=0)
piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn, miso=miso)
async def receive(piospi):
n = 0
async for msg in piospi:
piospi.write(f"Message {n} from slave")
n += 1
print(f"Slave received: {len(msg)} bytes:")
print(bytes(msg))
print()
async def test():
obuf = bytearray(range(512)) # Test data
rt = asyncio.create_task(receive(piospi))
await asyncio.sleep_ms(0) # Ensure receive task is running
print("\nBasic test\n")
while True:
# piospi.write("hello")
await asyncio.sleep(3)
print("\nDone")
try:
asyncio.run(test())
finally:
piospi.deinit()
asyncio.new_event_loop()

Wyświetl plik

@ -68,23 +68,20 @@ class SpiMaster:
f = 2 * freq # 2 clock cycles per bit
self._sm = rp2.StateMachine(sm_num, spi_out, freq=f, sideset_base=sck, out_base=mosi)
self._sm.active(1)
else:
# I/O
f = 4 * freq # 4 cycles per bit
self._sm = rp2.StateMachine(
sm_num,
spi_inout,
freq=f,
sideset_base=sck,
out_base=mosi,
in_base=miso,
)
self._idma = rp2.DMA()
dc = dreq(sm_num, True) # Data in
self._ictrl = self._idma.pack_ctrl(
size=0, inc_read=False, irq_quiet=False, treq_sel=dc
)
# self._idma.irq(self._done)
return
# I/O
f = 4 * freq # 4 cycles per bit
self._sm = rp2.StateMachine(
sm_num,
spi_inout,
freq=f,
sideset_base=sck,
out_base=mosi,
in_base=miso,
)
self._idma = rp2.DMA()
dc = dreq(sm_num, True) # Data in
self._ictrl = self._idma.pack_ctrl(size=0, inc_read=False, irq_quiet=False, treq_sel=dc)
self._sm.active(1)
def _done(self, dma): # I/O transfer complete
@ -93,9 +90,7 @@ class SpiMaster:
if self._io:
self._idma.active(0)
self._sm.active(0)
self._cb()
# callback = self._cb
# callback()
self._cb() # User callback.
def write(self, data):
ld = len(data)

Wyświetl plik

@ -23,7 +23,7 @@ def spi_in():
wait(0, pins, 2) # Wait for CS/ True
wrap_target() # Input a byte
set(x, 7)
label("bit")
label("bit") # Await a +ve clock edge or incoming "quit" signal
jmp(pin, "next")
jmp(not_osre, "done") # Data received: quit
jmp("bit")
@ -31,6 +31,7 @@ def spi_in():
in_(pins, 1) # Input a bit MSB first
wait(0, pins, 1) # clk trailing
jmp(y_dec, "continue")
# Last trailing clock edge received
label("overrun") # An overrun would occur. Wait for CS/ ISR to send data.
jmp(not_osre, "done") # Data received
jmp("overrun")
@ -44,27 +45,50 @@ def spi_in():
jmp("escape")
# in_base = MOSI
# out_base = MISO
# DMA feeds OSR via autopull
@rp2.asm_pio(autopull=True, pull_thresh=8, out_init=rp2.PIO.OUT_LOW)
def spi_out():
wait(0, pins, 2) # Wait for CS/ True
wrap_target()
set(x, 7)
label("bit") # Await a +ve clock edge
wait(1, pins, 1)
out(pins, 1) # Stalls here if out of data: IRQ resets SM
wait(0, pins, 1) # Await low clock edge
jmp(x_dec, "bit")
wrap()
class SpiSlave:
def __init__(self, buf=None, callback=None, sm_num=0, *, mosi, sck, csn):
def __init__(self, buf=None, callback=None, sm_num=0, *, mosi, sck, csn, miso=None):
# Get data request channel for a SM: RP2040 datasheet 2.5.3 RP2350 12.6.4.1
def dreq(sm, rx=False):
d = (sm & 3) + ((sm >> 2) << 3)
return 4 + d if rx else d
self._io = miso is not None
self._csn = csn
self._wbuf = None # Write buffer
self._reps = 0 # Write repeat
self._callback = callback
self._docb = False # By default CB des not run
# Set up DMA
# Set up read DMA
self._dma = rp2.DMA()
# Transfer bytes, rather than words, don't increment the read address and pace the transfer.
self._cfg = self._dma.pack_ctrl(size=0, inc_read=False, treq_sel=dreq(sm_num, True))
self._sm_num = sm_num
# Input (MOSI) SM
self._buf = buf
self._nbytes = 0 # Number of received bytes
if buf is not None:
self._mvb = memoryview(buf)
self._tsf = asyncio.ThreadSafeFlag()
self._read_done = False # Synchronisation for .read()
csn.irq(self._done, trigger=Pin.IRQ_RISING, hard=True) # IRQ occurs at end of transfer
# IRQ occurs at end of transfer HACK mem error if hard L184
csn.irq(self._done, trigger=Pin.IRQ_RISING, hard=True)
self._sm = rp2.StateMachine(
sm_num,
spi_in,
@ -73,6 +97,18 @@ class SpiSlave:
in_base=mosi,
jmp_pin=sck,
)
if self._io:
# Output (MISO) SM
# Write DMA
self._wdma = rp2.DMA()
self._wcfg = self._wdma.pack_ctrl(size=0, inc_write=False, treq_sel=dreq(sm_num + 1))
self._osm = rp2.StateMachine(
sm_num + 1,
spi_out,
pull_thresh=8,
in_base=mosi,
out_base=miso,
)
def __aiter__(self): # Asynchronous iterator support
return self
@ -103,13 +139,35 @@ class SpiSlave:
else:
raise ValueError("Missing callback function.")
# Send a buffer on next transfer. reps==-1 is forever (until next write)
def write(self, buf, reps=1):
self._wbuf = buf[:] # Copy in case caller modifies before it gets sent (queue?)
self._reps = reps
def _rinto(self, buf): # .read_into() without callback
buflen = len(buf)
self._buflen = buflen # Save for ISR
self._dma.active(0) # De-activate befor re-configuring
self._sm.active(0)
self._tsf.clear()
self._dma.config(read=self._sm, write=buf, count=buflen, ctrl=self._cfg, trigger=False)
self._dma.config(read=self._sm, write=buf, count=buflen, ctrl=self._cfg)
if self._io:
self._wdma.active(0)
self._osm.active(0)
if self._reps: # Repeat forever if _reps==-1
if self._reps > 0:
self._reps -= 1
# TODO necessary? Can write buffer be bigger than read?
n = min(buflen, len(self._wbuf))
print("sending", n, self._wbuf[:n])
# print(self._dma, self._wdma)
self._wdma.config(
read=self._wbuf, write=self._osm, count=n, ctrl=self._wcfg, trigger=True
)
self._wdma.active(1)
self._osm.restart() # osm waits for CS/ low
self._osm.active(1)
self._dma.active(1)
self._sm.restart()
self._sm.active(1) # Start SM
@ -118,17 +176,21 @@ class SpiSlave:
# Hard ISR for CS/ rising edge.
def _done(self, _): # Get no. of bytes received.
self._dma.active(0)
if self._io:
self._wdma.active(0)
self._sm.put(0) # Request no. of received bits
if not self._sm.rx_fifo(): # Occurs if ._rinto() never called while CSN is low:
# a transmission was missed.
# print("GH")
print("Missed TX?")
return # ISR runs on trailing edge but SM is not running. Nothing to do.
# TODO Next line occasionally memfails if IRQ is hard
sp = self._sm.get() >> 3 # Bits->bytes: space left in buffer or 7ffffff on overflow
self._nbytes = self._buflen - sp if sp != 0x7FFFFFF else self._buflen
self._dma.active(0)
self._sm.active(0)
self._tsf.set()
self._read_done = True
print("GH3")
if self._docb: # Only run CB if user has called .read_into()
self._docb = False
schedule(self._callback, self._nbytes) # Soft ISR
@ -141,4 +203,10 @@ class SpiSlave:
def deinit(self):
self._dma.active(0)
self._dma.close()
self._csn.irq(None)
if self._io:
self._wdma.active(0)
self._wdma.close()
self._osm.active(0)
self._sm.active(0)

Wyświetl plik

@ -12,7 +12,7 @@ pin_miso = Pin(16, Pin.IN) # Not used: keep driver happy
pin_sck = Pin(18, Pin.OUT, value=0)
pin_mosi = Pin(19, Pin.OUT, value=0)
spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso)
spi = SPI(0, baudrate=1_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso)
def send(obuf):