diff --git a/rp2/RP2.md b/rp2/RP2.md index 09be6c9..09e7b1f 100644 --- a/rp2/RP2.md +++ b/rp2/RP2.md @@ -96,6 +96,17 @@ It has been tested at a clock rate of 24MHz on an RP2040 running at 250MHz. The following files may be found in the `spi` directory: * `spi_slave.py` Main module. +* `tx.py` Transmitter script: provides SPI data for receiver demos below. +These demos use two linked boards, one supplying data, the other receiving. +* `rx.py` Demo of nonblocking receiver. +* `rxb.py` Blocking read demo. +* `arx.py` Asynchronous read demo. +Demos are run from the `rp2` directory by issuing (e.g.): +```python +>>> import spi.rx +``` +Tests. These run on a single board and test special cases such as recovery from +overruns. * `slave_sync_test` Test using synchronous code. * `slave_async_test` Test using asynchronous code. * `master_slave_test.py` Full test of master linked to slave, with the latter @@ -108,7 +119,7 @@ the following pins should be linked: * 1-18 SCK * 2-17 CSN -Tests are run by issuing (e.g.): +Tests are run from the `rp2` directory by issuing (e.g.): ```py >>> import spi.master_slave_test ``` @@ -154,6 +165,8 @@ large enough for the expected message. The method returns immediately. When a message arrives and reception is complete, the callback runs. Its integer arg is the number of bytes received. If a message is too long to fit the buffer, excess bytes are lost. +* `deinit()` Close the interface. This should be done on exit to avoid hanging +at the REPL. The nonblocking `.read_into()` method enables processing to be done while awaiting a complete message. The drawback (compared to `.read()`) is that @@ -213,6 +226,9 @@ phase "ping pong" buffering). Reception is via an asynchronous method ``` As with all asynchronous code, this task pauses while others continue to run. +On application exit `SpiSlave.deinit()` should be called to avoid hanging at the +REPL. + # 2.6 Operation The slave will ignore all interface activity until CS/ is driven low. It then diff --git a/rp2/spi/arx.py b/rp2/spi/arx.py index e7afa93..c905e1b 100644 --- a/rp2/spi/arx.py +++ b/rp2/spi/arx.py @@ -1,3 +1,7 @@ +# spi.arx.py Demo of asynchronous SPI slave + +# Released under the MIT License (MIT). See LICENSE. +# Copyright (c) 2025 Peter Hinch import asyncio from machine import Pin, reset @@ -8,14 +12,17 @@ mosi = Pin(0, Pin.IN) sck = Pin(1, Pin.IN) csn = Pin(2, Pin.IN) +piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn) + + async def main(): - piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn) async for msg in piospi: print(f"Received: {len(msg)} bytes:") print(bytes(msg)) print() + try: asyncio.run(main()) finally: - reset() + piospi.deinit() diff --git a/rp2/spi/master_slave_test.py b/rp2/spi/master_slave_test.py index a6d3b2d..df65347 100644 --- a/rp2/spi/master_slave_test.py +++ b/rp2/spi/master_slave_test.py @@ -21,7 +21,19 @@ def callback(dma): # Hard ISR tsf.set() # Flag user code that transfer is complete -async def send(cs, spi, data): +# Sender uses nonblocking master +cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive. +pin_sck = Pin(18, Pin.OUT, value=0) +pin_mosi = Pin(19, Pin.OUT, value=0) +spi = SpiMaster(4, 10_000_000, pin_sck, pin_mosi, callback) +# Pins for slave +mosi = Pin(0, Pin.IN) +sck = Pin(1, Pin.IN) +csn = Pin(2, Pin.IN) +piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn) + + +async def send(data): cs(0) # Assert CS/ spi.write(data) # "Immediate" return: minimal blocking. await tsf.wait() # Wait for transfer complete (other tasks run) @@ -38,24 +50,18 @@ async def receive(piospi): async def test(): obuf = bytearray(range(512)) # Test data - # Master CS/ - cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive. - # Pins for slave - mosi = Pin(0, Pin.IN) - sck = Pin(1, Pin.IN) - csn = Pin(2, Pin.IN) - piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn) rt = asyncio.create_task(receive(piospi)) await asyncio.sleep_ms(0) # Ensure receive task is running - # Pins for Master - pin_sck = Pin(18, Pin.OUT, value=0) - pin_mosi = Pin(19, Pin.OUT, value=0) - spi = SpiMaster(4, 10_000_000, pin_sck, pin_mosi, callback) print("\nBasic test\n") - await send(cs, spi, obuf[:256]) - await send(cs, spi, obuf[:20]) - await send(cs, spi, b"The quick brown fox jumps over the lazy dog") + await send(obuf[:256]) + await send(obuf[:20]) + await send(b"The quick brown fox jumps over the lazy dog") print("\nDone") -asyncio.run(test()) +try: + asyncio.run(test()) +finally: + piospi.deinit() + spi.deinit() + asyncio.new_event_loop() diff --git a/rp2/spi/rx.py b/rp2/spi/rx.py index 7726a4c..6a90dea 100644 --- a/rp2/spi/rx.py +++ b/rp2/spi/rx.py @@ -1,3 +1,8 @@ +# spi.rx.py Demo of nonblocking SPI reception + +# Released under the MIT License (MIT). See LICENSE. +# Copyright (c) 2025 Peter Hinch + from machine import Pin, reset from .spi_slave import SpiSlave from time import sleep_ms @@ -32,4 +37,4 @@ def test(): try: test() finally: - reset() + piospi.deinit() diff --git a/rp2/spi/rxb.py b/rp2/spi/rxb.py new file mode 100644 index 0000000..ccff543 --- /dev/null +++ b/rp2/spi/rxb.py @@ -0,0 +1,24 @@ +# spi.rxb.py Demo of SPI slave blocking read. + +# Released under the MIT License (MIT). See LICENSE. +# Copyright (c) 2025 Peter Hinch + +from machine import Pin, reset +from .spi_slave import SpiSlave + +mosi = Pin(0, Pin.IN) +sck = Pin(1, Pin.IN) +csn = Pin(2, Pin.IN) + +piospi = SpiSlave(bytearray(300), sm_num=4, mosi=mosi, sck=sck, csn=csn) + + +def test(): + while True: + print(bytes(piospi.read())) # Message received. Process it. + + +try: + test() +finally: + piospi.deinit() diff --git a/rp2/spi/slave_async_test.py b/rp2/spi/slave_async_test.py index d57ca32..865c1ca 100644 --- a/rp2/spi/slave_async_test.py +++ b/rp2/spi/slave_async_test.py @@ -12,8 +12,21 @@ from machine import Pin, SPI import asyncio from .spi_slave import SpiSlave +# Pins for Master +cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive. +pin_miso = Pin(16, Pin.IN) # Not used: keep driver happy +pin_sck = Pin(18, Pin.OUT, value=0) +pin_mosi = Pin(19, Pin.OUT, value=0) +spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso) -async def send(cs, spi, obuf): +# Pins for slave +mosi = Pin(0, Pin.IN) +sck = Pin(1, Pin.IN) +csn = Pin(2, Pin.IN) +piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn) + + +async def send(obuf): cs(0) spi.write(obuf) cs(1) @@ -36,38 +49,30 @@ async def get_msg(piospi): async def test(): obuf = bytearray(range(512)) # Test data - # Master CS/ - cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive. - # Pins for slave - mosi = Pin(0, Pin.IN) - sck = Pin(1, Pin.IN) - csn = Pin(2, Pin.IN) - piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn) rt = asyncio.create_task(receive(piospi)) await asyncio.sleep_ms(0) # Ensure receive task is running - # Pins for Master - pin_miso = Pin(16, Pin.IN) # Not used: keep driver happy - pin_sck = Pin(18, Pin.OUT, value=0) - pin_mosi = Pin(19, Pin.OUT, value=0) - spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso) print(spi) print("\nBasic test\n") - await send(cs, spi, obuf[:256]) - await send(cs, spi, obuf[:20]) + await send(obuf[:256]) + await send(obuf[:20]) print("\nOverrun test: send 512 bytes, rx buffer is 300 bytes.\n") - await send(cs, spi, obuf) + await send(obuf) print("\nTest subsequent transfers\n") - await send(cs, spi, b"The quick brown fox jumps over the lazy dog") - await send(cs, spi, b"A short message") - await send(cs, spi, b"A longer message") + await send(b"The quick brown fox jumps over the lazy dog") + await send(b"A short message") + await send(b"A longer message") rt.cancel() # Terminate the read task await asyncio.sleep_ms(0) print("\nAsynchronous read into user supplied buffer\n") asyncio.create_task(get_msg(piospi)) # Set up for a single read await asyncio.sleep_ms(0) # Ensure above task gets to run - await send(cs, spi, b"Received by .as_read_into()") + await send(b"Received by .as_read_into()") await asyncio.sleep_ms(100) print("\nDone") -asyncio.run(test()) +try: + asyncio.run(test()) +finally: + piospi.deinit() + asyncio.new_event_loop() diff --git a/rp2/spi/slave_sync_test.py b/rp2/spi/slave_sync_test.py index 6c4853d..06c0aba 100644 --- a/rp2/spi/slave_sync_test.py +++ b/rp2/spi/slave_sync_test.py @@ -12,53 +12,59 @@ from machine import Pin, SPI, SoftSPI from .spi_slave import SpiSlave from time import sleep_ms +# Create synchronous master using standard library +cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive. +pin_miso = Pin(16, Pin.IN) # Not used: keep driver happy +pin_sck = Pin(18, Pin.OUT, value=0) +pin_mosi = Pin(19, Pin.OUT, value=0) +spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso) + +# Callback for receiver +buf = bytearray(300) # Read buffer + + +def receive(nbytes): # Soft ISR runs when data received. + print(f"Received: {nbytes} bytes:") + print(bytes(buf[:nbytes])) + print() + + +# Pins for slave +mosi = Pin(0, Pin.IN) +sck = Pin(1, Pin.IN) +csn = Pin(2, Pin.IN) +piospi = SpiSlave(callback=receive, sm_num=4, mosi=mosi, sck=sck, csn=csn) + # SPI send a passed buffer -def send(cs, spi, obuf): +def send(obuf): cs(0) spi.write(obuf) cs(1) sleep_ms(100) -buf = bytearray(300) # Read buffer - -# Callback runs when transfer complete (soft ISR context) -def receive(nbytes): - print(f"Received: {nbytes} bytes:") - print(bytes(buf[:nbytes])) - print() - - def test(): obuf = bytearray(range(512)) # Test data - cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive. - # Pins for slave - mosi = Pin(0, Pin.IN) - sck = Pin(1, Pin.IN) - csn = Pin(2, Pin.IN) - piospi = SpiSlave(callback=receive, sm_num=4, mosi=mosi, sck=sck, csn=csn) - # Pins for master - pin_miso = Pin(16, Pin.IN) # Not used: keep driver happy - pin_sck = Pin(18, Pin.OUT, value=0) - pin_mosi = Pin(19, Pin.OUT, value=0) - spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso) print("\nBasic test\n") piospi.read_into(buf) - send(cs, spi, obuf[:256]) + send(obuf[:256]) piospi.read_into(buf) - send(cs, spi, obuf[:20]) + send(obuf[:20]) print("\nOverrun test: send 512 bytes, rx buffer is 300 bytes.\n") piospi.read_into(buf) - send(cs, spi, obuf) + send(obuf) print("\nTest subsequent transfers\n") piospi.read_into(buf) - send(cs, spi, b"The quick brown fox jumps over the lazy dog") + send(b"The quick brown fox jumps over the lazy dog") piospi.read_into(buf) - send(cs, spi, b"A short message") + send(b"A short message") piospi.read_into(buf) - send(cs, spi, b"A longer message") + send(b"A longer message") print("\nDone") -test() +try: + test() +finally: + piospi.deinit() diff --git a/rp2/spi/spi_slave.py b/rp2/spi/spi_slave.py index 48d9a4b..439db5f 100644 --- a/rp2/spi/spi_slave.py +++ b/rp2/spi/spi_slave.py @@ -93,7 +93,7 @@ class SpiSlave: self._rinto(self._buf) while not self._read_done: pass - return self._buf[: self._nbytes] + return self._mvb[: self._nbytes] # Initiate a nonblocking read into a buffer. Immediate return. def read_into(self, buf): @@ -120,6 +120,7 @@ class SpiSlave: self._dma.active(0) self._sm.put(0) # Request no. of received bits if not self._sm.rx_fifo(): # Occurs if ._rinto() never called while CSN is low: + # a transmission was missed. # print("GH") return # ISR runs on trailing edge but SM is not running. Nothing to do. sp = self._sm.get() >> 3 # Bits->bytes: space left in buffer or 7ffffff on overflow @@ -137,3 +138,7 @@ class SpiSlave: self._rinto(buf) # Start the read await self._tsf.wait() # Wait for CS/ high (master signals transfer complete) return self._nbytes + + def deinit(self): + self._dma.active(0) + self._sm.active(0) diff --git a/rp2/spi/tx.py b/rp2/spi/tx.py index be79824..0d7e12f 100644 --- a/rp2/spi/tx.py +++ b/rp2/spi/tx.py @@ -1,3 +1,8 @@ +# spi.tx.py Send data to SPI slave + +# Released under the MIT License (MIT). See LICENSE. +# Copyright (c) 2025 Peter Hinch + from machine import Pin, SPI from time import sleep_ms @@ -13,9 +18,7 @@ spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso) def send(obuf): cs(0) spi.write(obuf) - # sleep_ms(10) cs(1) - # print("sent", obuf) sleep_ms(1000)