RP2.spi: Add deinit method, adapt tests to use it.

master
Peter Hinch 2025-08-20 17:45:24 +01:00
rodzic e54fed1a75
commit 73d84bd5a6
9 zmienionych plików z 149 dodań i 72 usunięć

Wyświetl plik

@ -96,6 +96,17 @@ It has been tested at a clock rate of 24MHz on an RP2040 running at 250MHz.
The following files may be found in the `spi` directory:
* `spi_slave.py` Main module.
* `tx.py` Transmitter script: provides SPI data for receiver demos below.
These demos use two linked boards, one supplying data, the other receiving.
* `rx.py` Demo of nonblocking receiver.
* `rxb.py` Blocking read demo.
* `arx.py` Asynchronous read demo.
Demos are run from the `rp2` directory by issuing (e.g.):
```python
>>> import spi.rx
```
Tests. These run on a single board and test special cases such as recovery from
overruns.
* `slave_sync_test` Test using synchronous code.
* `slave_async_test` Test using asynchronous code.
* `master_slave_test.py` Full test of master linked to slave, with the latter
@ -108,7 +119,7 @@ the following pins should be linked:
* 1-18 SCK
* 2-17 CSN
Tests are run by issuing (e.g.):
Tests are run from the `rp2` directory by issuing (e.g.):
```py
>>> import spi.master_slave_test
```
@ -154,6 +165,8 @@ large enough for the expected message. The method returns immediately. When a
message arrives and reception is complete, the callback runs. Its integer arg is
the number of bytes received. If a message is too long to fit the buffer, excess
bytes are lost.
* `deinit()` Close the interface. This should be done on exit to avoid hanging
at the REPL.
The nonblocking `.read_into()` method enables processing to be done while
awaiting a complete message. The drawback (compared to `.read()`) is that
@ -213,6 +226,9 @@ phase "ping pong" buffering). Reception is via an asynchronous method
```
As with all asynchronous code, this task pauses while others continue to run.
On application exit `SpiSlave.deinit()` should be called to avoid hanging at the
REPL.
# 2.6 Operation
The slave will ignore all interface activity until CS/ is driven low. It then

Wyświetl plik

@ -1,3 +1,7 @@
# spi.arx.py Demo of asynchronous SPI slave
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2025 Peter Hinch
import asyncio
from machine import Pin, reset
@ -8,14 +12,17 @@ mosi = Pin(0, Pin.IN)
sck = Pin(1, Pin.IN)
csn = Pin(2, Pin.IN)
async def main():
piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn)
async def main():
async for msg in piospi:
print(f"Received: {len(msg)} bytes:")
print(bytes(msg))
print()
try:
asyncio.run(main())
finally:
reset()
piospi.deinit()

Wyświetl plik

@ -21,7 +21,19 @@ def callback(dma): # Hard ISR
tsf.set() # Flag user code that transfer is complete
async def send(cs, spi, data):
# Sender uses nonblocking master
cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive.
pin_sck = Pin(18, Pin.OUT, value=0)
pin_mosi = Pin(19, Pin.OUT, value=0)
spi = SpiMaster(4, 10_000_000, pin_sck, pin_mosi, callback)
# Pins for slave
mosi = Pin(0, Pin.IN)
sck = Pin(1, Pin.IN)
csn = Pin(2, Pin.IN)
piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn)
async def send(data):
cs(0) # Assert CS/
spi.write(data) # "Immediate" return: minimal blocking.
await tsf.wait() # Wait for transfer complete (other tasks run)
@ -38,24 +50,18 @@ async def receive(piospi):
async def test():
obuf = bytearray(range(512)) # Test data
# Master CS/
cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive.
# Pins for slave
mosi = Pin(0, Pin.IN)
sck = Pin(1, Pin.IN)
csn = Pin(2, Pin.IN)
piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn)
rt = asyncio.create_task(receive(piospi))
await asyncio.sleep_ms(0) # Ensure receive task is running
# Pins for Master
pin_sck = Pin(18, Pin.OUT, value=0)
pin_mosi = Pin(19, Pin.OUT, value=0)
spi = SpiMaster(4, 10_000_000, pin_sck, pin_mosi, callback)
print("\nBasic test\n")
await send(cs, spi, obuf[:256])
await send(cs, spi, obuf[:20])
await send(cs, spi, b"The quick brown fox jumps over the lazy dog")
await send(obuf[:256])
await send(obuf[:20])
await send(b"The quick brown fox jumps over the lazy dog")
print("\nDone")
try:
asyncio.run(test())
finally:
piospi.deinit()
spi.deinit()
asyncio.new_event_loop()

Wyświetl plik

@ -1,3 +1,8 @@
# spi.rx.py Demo of nonblocking SPI reception
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2025 Peter Hinch
from machine import Pin, reset
from .spi_slave import SpiSlave
from time import sleep_ms
@ -32,4 +37,4 @@ def test():
try:
test()
finally:
reset()
piospi.deinit()

24
rp2/spi/rxb.py 100644
Wyświetl plik

@ -0,0 +1,24 @@
# spi.rxb.py Demo of SPI slave blocking read.
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2025 Peter Hinch
from machine import Pin, reset
from .spi_slave import SpiSlave
mosi = Pin(0, Pin.IN)
sck = Pin(1, Pin.IN)
csn = Pin(2, Pin.IN)
piospi = SpiSlave(bytearray(300), sm_num=4, mosi=mosi, sck=sck, csn=csn)
def test():
while True:
print(bytes(piospi.read())) # Message received. Process it.
try:
test()
finally:
piospi.deinit()

Wyświetl plik

@ -12,8 +12,21 @@ from machine import Pin, SPI
import asyncio
from .spi_slave import SpiSlave
# Pins for Master
cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive.
pin_miso = Pin(16, Pin.IN) # Not used: keep driver happy
pin_sck = Pin(18, Pin.OUT, value=0)
pin_mosi = Pin(19, Pin.OUT, value=0)
spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso)
async def send(cs, spi, obuf):
# Pins for slave
mosi = Pin(0, Pin.IN)
sck = Pin(1, Pin.IN)
csn = Pin(2, Pin.IN)
piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn)
async def send(obuf):
cs(0)
spi.write(obuf)
cs(1)
@ -36,38 +49,30 @@ async def get_msg(piospi):
async def test():
obuf = bytearray(range(512)) # Test data
# Master CS/
cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive.
# Pins for slave
mosi = Pin(0, Pin.IN)
sck = Pin(1, Pin.IN)
csn = Pin(2, Pin.IN)
piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn)
rt = asyncio.create_task(receive(piospi))
await asyncio.sleep_ms(0) # Ensure receive task is running
# Pins for Master
pin_miso = Pin(16, Pin.IN) # Not used: keep driver happy
pin_sck = Pin(18, Pin.OUT, value=0)
pin_mosi = Pin(19, Pin.OUT, value=0)
spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso)
print(spi)
print("\nBasic test\n")
await send(cs, spi, obuf[:256])
await send(cs, spi, obuf[:20])
await send(obuf[:256])
await send(obuf[:20])
print("\nOverrun test: send 512 bytes, rx buffer is 300 bytes.\n")
await send(cs, spi, obuf)
await send(obuf)
print("\nTest subsequent transfers\n")
await send(cs, spi, b"The quick brown fox jumps over the lazy dog")
await send(cs, spi, b"A short message")
await send(cs, spi, b"A longer message")
await send(b"The quick brown fox jumps over the lazy dog")
await send(b"A short message")
await send(b"A longer message")
rt.cancel() # Terminate the read task
await asyncio.sleep_ms(0)
print("\nAsynchronous read into user supplied buffer\n")
asyncio.create_task(get_msg(piospi)) # Set up for a single read
await asyncio.sleep_ms(0) # Ensure above task gets to run
await send(cs, spi, b"Received by .as_read_into()")
await send(b"Received by .as_read_into()")
await asyncio.sleep_ms(100)
print("\nDone")
try:
asyncio.run(test())
finally:
piospi.deinit()
asyncio.new_event_loop()

Wyświetl plik

@ -12,53 +12,59 @@ from machine import Pin, SPI, SoftSPI
from .spi_slave import SpiSlave
from time import sleep_ms
# Create synchronous master using standard library
cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive.
pin_miso = Pin(16, Pin.IN) # Not used: keep driver happy
pin_sck = Pin(18, Pin.OUT, value=0)
pin_mosi = Pin(19, Pin.OUT, value=0)
spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso)
# Callback for receiver
buf = bytearray(300) # Read buffer
def receive(nbytes): # Soft ISR runs when data received.
print(f"Received: {nbytes} bytes:")
print(bytes(buf[:nbytes]))
print()
# Pins for slave
mosi = Pin(0, Pin.IN)
sck = Pin(1, Pin.IN)
csn = Pin(2, Pin.IN)
piospi = SpiSlave(callback=receive, sm_num=4, mosi=mosi, sck=sck, csn=csn)
# SPI send a passed buffer
def send(cs, spi, obuf):
def send(obuf):
cs(0)
spi.write(obuf)
cs(1)
sleep_ms(100)
buf = bytearray(300) # Read buffer
# Callback runs when transfer complete (soft ISR context)
def receive(nbytes):
print(f"Received: {nbytes} bytes:")
print(bytes(buf[:nbytes]))
print()
def test():
obuf = bytearray(range(512)) # Test data
cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive.
# Pins for slave
mosi = Pin(0, Pin.IN)
sck = Pin(1, Pin.IN)
csn = Pin(2, Pin.IN)
piospi = SpiSlave(callback=receive, sm_num=4, mosi=mosi, sck=sck, csn=csn)
# Pins for master
pin_miso = Pin(16, Pin.IN) # Not used: keep driver happy
pin_sck = Pin(18, Pin.OUT, value=0)
pin_mosi = Pin(19, Pin.OUT, value=0)
spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso)
print("\nBasic test\n")
piospi.read_into(buf)
send(cs, spi, obuf[:256])
send(obuf[:256])
piospi.read_into(buf)
send(cs, spi, obuf[:20])
send(obuf[:20])
print("\nOverrun test: send 512 bytes, rx buffer is 300 bytes.\n")
piospi.read_into(buf)
send(cs, spi, obuf)
send(obuf)
print("\nTest subsequent transfers\n")
piospi.read_into(buf)
send(cs, spi, b"The quick brown fox jumps over the lazy dog")
send(b"The quick brown fox jumps over the lazy dog")
piospi.read_into(buf)
send(cs, spi, b"A short message")
send(b"A short message")
piospi.read_into(buf)
send(cs, spi, b"A longer message")
send(b"A longer message")
print("\nDone")
try:
test()
finally:
piospi.deinit()

Wyświetl plik

@ -93,7 +93,7 @@ class SpiSlave:
self._rinto(self._buf)
while not self._read_done:
pass
return self._buf[: self._nbytes]
return self._mvb[: self._nbytes]
# Initiate a nonblocking read into a buffer. Immediate return.
def read_into(self, buf):
@ -120,6 +120,7 @@ class SpiSlave:
self._dma.active(0)
self._sm.put(0) # Request no. of received bits
if not self._sm.rx_fifo(): # Occurs if ._rinto() never called while CSN is low:
# a transmission was missed.
# print("GH")
return # ISR runs on trailing edge but SM is not running. Nothing to do.
sp = self._sm.get() >> 3 # Bits->bytes: space left in buffer or 7ffffff on overflow
@ -137,3 +138,7 @@ class SpiSlave:
self._rinto(buf) # Start the read
await self._tsf.wait() # Wait for CS/ high (master signals transfer complete)
return self._nbytes
def deinit(self):
self._dma.active(0)
self._sm.active(0)

Wyświetl plik

@ -1,3 +1,8 @@
# spi.tx.py Send data to SPI slave
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2025 Peter Hinch
from machine import Pin, SPI
from time import sleep_ms
@ -13,9 +18,7 @@ spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso)
def send(obuf):
cs(0)
spi.write(obuf)
# sleep_ms(10)
cs(1)
# print("sent", obuf)
sleep_ms(1000)