micropython-samples/rp2/spi/slave_async_test.py

74 wiersze
2.2 KiB
Python

# slave_async_test.py Test asynchronous interface of SpiSlave class
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2025 Peter Hinch
# Link pins
# 0-19 MOSI
# 1-18 SCK
# 2-17 CSN
from machine import Pin, SPI
import asyncio
from .spi_slave import SpiSlave
async def send(cs, spi, obuf):
cs(0)
spi.write(obuf)
cs(1)
await asyncio.sleep_ms(100)
async def receive(piospi):
async for msg in piospi:
print(f"Received: {len(msg)} bytes:")
print(bytes(msg))
print()
async def get_msg(piospi):
rbuf = bytearray(200)
nbytes = await piospi.as_read_into(rbuf)
print(bytes(rbuf[:nbytes]))
print(f"Received: {nbytes} bytes")
async def test():
obuf = bytearray(range(512)) # Test data
# Master CS/
cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive.
# Pins for slave
mosi = Pin(0, Pin.IN)
sck = Pin(1, Pin.IN)
csn = Pin(2, Pin.IN)
piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn)
rt = asyncio.create_task(receive(piospi))
await asyncio.sleep_ms(0) # Ensure receive task is running
# Pins for Master
pin_miso = Pin(16, Pin.IN) # Not used: keep driver happy
pin_sck = Pin(18, Pin.OUT, value=0)
pin_mosi = Pin(19, Pin.OUT, value=0)
spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso)
print(spi)
print("\nBasic test\n")
await send(cs, spi, obuf[:256])
await send(cs, spi, obuf[:20])
print("\nOverrun test: send 512 bytes, rx buffer is 300 bytes.\n")
await send(cs, spi, obuf)
print("\nTest subsequent transfers\n")
await send(cs, spi, b"The quick brown fox jumps over the lazy dog")
await send(cs, spi, b"A short message")
await send(cs, spi, b"A longer message")
rt.cancel() # Terminate the read task
await asyncio.sleep_ms(0)
print("\nAsynchronous read into user supplied buffer\n")
asyncio.create_task(get_msg(piospi)) # Set up for a single read
await asyncio.sleep_ms(0) # Ensure above task gets to run
await send(cs, spi, b"Received by .as_read_into()")
await asyncio.sleep_ms(100)
print("\nDone")
asyncio.run(test())