kopia lustrzana https://github.com/peterhinch/micropython-samples
74 wiersze
2.2 KiB
Python
74 wiersze
2.2 KiB
Python
# slave_async_test.py Test asynchronous interface of SpiSlave class
|
|
|
|
# Released under the MIT License (MIT). See LICENSE.
|
|
# Copyright (c) 2025 Peter Hinch
|
|
|
|
# Link pins
|
|
# 0-19 MOSI
|
|
# 1-18 SCK
|
|
# 2-17 CSN
|
|
|
|
from machine import Pin, SPI
|
|
import asyncio
|
|
from .spi_slave import SpiSlave
|
|
|
|
|
|
async def send(cs, spi, obuf):
|
|
cs(0)
|
|
spi.write(obuf)
|
|
cs(1)
|
|
await asyncio.sleep_ms(100)
|
|
|
|
|
|
async def receive(piospi):
|
|
async for msg in piospi:
|
|
print(f"Received: {len(msg)} bytes:")
|
|
print(bytes(msg))
|
|
print()
|
|
|
|
|
|
async def get_msg(piospi):
|
|
rbuf = bytearray(200)
|
|
nbytes = await piospi.as_read_into(rbuf)
|
|
print(bytes(rbuf[:nbytes]))
|
|
print(f"Received: {nbytes} bytes")
|
|
|
|
|
|
async def test():
|
|
obuf = bytearray(range(512)) # Test data
|
|
# Master CS/
|
|
cs = Pin(17, Pin.OUT, value=1) # Ensure CS/ is False before we try to receive.
|
|
# Pins for slave
|
|
mosi = Pin(0, Pin.IN)
|
|
sck = Pin(1, Pin.IN)
|
|
csn = Pin(2, Pin.IN)
|
|
piospi = SpiSlave(buf=bytearray(300), sm_num=0, mosi=mosi, sck=sck, csn=csn)
|
|
rt = asyncio.create_task(receive(piospi))
|
|
await asyncio.sleep_ms(0) # Ensure receive task is running
|
|
# Pins for Master
|
|
pin_miso = Pin(16, Pin.IN) # Not used: keep driver happy
|
|
pin_sck = Pin(18, Pin.OUT, value=0)
|
|
pin_mosi = Pin(19, Pin.OUT, value=0)
|
|
spi = SPI(0, baudrate=10_000_000, sck=pin_sck, mosi=pin_mosi, miso=pin_miso)
|
|
print(spi)
|
|
print("\nBasic test\n")
|
|
await send(cs, spi, obuf[:256])
|
|
await send(cs, spi, obuf[:20])
|
|
print("\nOverrun test: send 512 bytes, rx buffer is 300 bytes.\n")
|
|
await send(cs, spi, obuf)
|
|
print("\nTest subsequent transfers\n")
|
|
await send(cs, spi, b"The quick brown fox jumps over the lazy dog")
|
|
await send(cs, spi, b"A short message")
|
|
await send(cs, spi, b"A longer message")
|
|
rt.cancel() # Terminate the read task
|
|
await asyncio.sleep_ms(0)
|
|
print("\nAsynchronous read into user supplied buffer\n")
|
|
asyncio.create_task(get_msg(piospi)) # Set up for a single read
|
|
await asyncio.sleep_ms(0) # Ensure above task gets to run
|
|
await send(cs, spi, b"Received by .as_read_into()")
|
|
await asyncio.sleep_ms(100)
|
|
print("\nDone")
|
|
|
|
|
|
asyncio.run(test())
|