kopia lustrzana https://github.com/micropython/micropython-lib
micropython/drivers: Move "nrf24l01" radio driver from main repo.
Signed-off-by: Jim Mussared <jim.mussared@gmail.com>pull/525/head
rodzic
f46401f849
commit
c1c0eb0c39
|
@ -0,0 +1 @@
|
|||
module("nrf24l01.py", opt=3)
|
|
@ -0,0 +1,252 @@
|
|||
"""NRF24L01 driver for MicroPython
|
||||
"""
|
||||
|
||||
from micropython import const
|
||||
import utime
|
||||
|
||||
# nRF24L01+ registers
|
||||
CONFIG = const(0x00)
|
||||
EN_RXADDR = const(0x02)
|
||||
SETUP_AW = const(0x03)
|
||||
SETUP_RETR = const(0x04)
|
||||
RF_CH = const(0x05)
|
||||
RF_SETUP = const(0x06)
|
||||
STATUS = const(0x07)
|
||||
RX_ADDR_P0 = const(0x0A)
|
||||
TX_ADDR = const(0x10)
|
||||
RX_PW_P0 = const(0x11)
|
||||
FIFO_STATUS = const(0x17)
|
||||
DYNPD = const(0x1C)
|
||||
|
||||
# CONFIG register
|
||||
EN_CRC = const(0x08) # enable CRC
|
||||
CRCO = const(0x04) # CRC encoding scheme; 0=1 byte, 1=2 bytes
|
||||
PWR_UP = const(0x02) # 1=power up, 0=power down
|
||||
PRIM_RX = const(0x01) # RX/TX control; 0=PTX, 1=PRX
|
||||
|
||||
# RF_SETUP register
|
||||
POWER_0 = const(0x00) # -18 dBm
|
||||
POWER_1 = const(0x02) # -12 dBm
|
||||
POWER_2 = const(0x04) # -6 dBm
|
||||
POWER_3 = const(0x06) # 0 dBm
|
||||
SPEED_1M = const(0x00)
|
||||
SPEED_2M = const(0x08)
|
||||
SPEED_250K = const(0x20)
|
||||
|
||||
# STATUS register
|
||||
RX_DR = const(0x40) # RX data ready; write 1 to clear
|
||||
TX_DS = const(0x20) # TX data sent; write 1 to clear
|
||||
MAX_RT = const(0x10) # max retransmits reached; write 1 to clear
|
||||
|
||||
# FIFO_STATUS register
|
||||
RX_EMPTY = const(0x01) # 1 if RX FIFO is empty
|
||||
|
||||
# constants for instructions
|
||||
R_RX_PL_WID = const(0x60) # read RX payload width
|
||||
R_RX_PAYLOAD = const(0x61) # read RX payload
|
||||
W_TX_PAYLOAD = const(0xA0) # write TX payload
|
||||
FLUSH_TX = const(0xE1) # flush TX FIFO
|
||||
FLUSH_RX = const(0xE2) # flush RX FIFO
|
||||
NOP = const(0xFF) # use to read STATUS register
|
||||
|
||||
|
||||
class NRF24L01:
|
||||
def __init__(self, spi, cs, ce, channel=46, payload_size=16):
|
||||
assert payload_size <= 32
|
||||
|
||||
self.buf = bytearray(1)
|
||||
|
||||
# store the pins
|
||||
self.spi = spi
|
||||
self.cs = cs
|
||||
self.ce = ce
|
||||
|
||||
# init the SPI bus and pins
|
||||
self.init_spi(4000000)
|
||||
|
||||
# reset everything
|
||||
ce.init(ce.OUT, value=0)
|
||||
cs.init(cs.OUT, value=1)
|
||||
|
||||
self.payload_size = payload_size
|
||||
self.pipe0_read_addr = None
|
||||
utime.sleep_ms(5)
|
||||
|
||||
# set address width to 5 bytes and check for device present
|
||||
self.reg_write(SETUP_AW, 0b11)
|
||||
if self.reg_read(SETUP_AW) != 0b11:
|
||||
raise OSError("nRF24L01+ Hardware not responding")
|
||||
|
||||
# disable dynamic payloads
|
||||
self.reg_write(DYNPD, 0)
|
||||
|
||||
# auto retransmit delay: 1750us
|
||||
# auto retransmit count: 8
|
||||
self.reg_write(SETUP_RETR, (6 << 4) | 8)
|
||||
|
||||
# set rf power and speed
|
||||
self.set_power_speed(POWER_3, SPEED_250K) # Best for point to point links
|
||||
|
||||
# init CRC
|
||||
self.set_crc(2)
|
||||
|
||||
# clear status flags
|
||||
self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT)
|
||||
|
||||
# set channel
|
||||
self.set_channel(channel)
|
||||
|
||||
# flush buffers
|
||||
self.flush_rx()
|
||||
self.flush_tx()
|
||||
|
||||
def init_spi(self, baudrate):
|
||||
try:
|
||||
master = self.spi.MASTER
|
||||
except AttributeError:
|
||||
self.spi.init(baudrate=baudrate, polarity=0, phase=0)
|
||||
else:
|
||||
self.spi.init(master, baudrate=baudrate, polarity=0, phase=0)
|
||||
|
||||
def reg_read(self, reg):
|
||||
self.cs(0)
|
||||
self.spi.readinto(self.buf, reg)
|
||||
self.spi.readinto(self.buf)
|
||||
self.cs(1)
|
||||
return self.buf[0]
|
||||
|
||||
def reg_write_bytes(self, reg, buf):
|
||||
self.cs(0)
|
||||
self.spi.readinto(self.buf, 0x20 | reg)
|
||||
self.spi.write(buf)
|
||||
self.cs(1)
|
||||
return self.buf[0]
|
||||
|
||||
def reg_write(self, reg, value):
|
||||
self.cs(0)
|
||||
self.spi.readinto(self.buf, 0x20 | reg)
|
||||
ret = self.buf[0]
|
||||
self.spi.readinto(self.buf, value)
|
||||
self.cs(1)
|
||||
return ret
|
||||
|
||||
def flush_rx(self):
|
||||
self.cs(0)
|
||||
self.spi.readinto(self.buf, FLUSH_RX)
|
||||
self.cs(1)
|
||||
|
||||
def flush_tx(self):
|
||||
self.cs(0)
|
||||
self.spi.readinto(self.buf, FLUSH_TX)
|
||||
self.cs(1)
|
||||
|
||||
# power is one of POWER_x defines; speed is one of SPEED_x defines
|
||||
def set_power_speed(self, power, speed):
|
||||
setup = self.reg_read(RF_SETUP) & 0b11010001
|
||||
self.reg_write(RF_SETUP, setup | power | speed)
|
||||
|
||||
# length in bytes: 0, 1 or 2
|
||||
def set_crc(self, length):
|
||||
config = self.reg_read(CONFIG) & ~(CRCO | EN_CRC)
|
||||
if length == 0:
|
||||
pass
|
||||
elif length == 1:
|
||||
config |= EN_CRC
|
||||
else:
|
||||
config |= EN_CRC | CRCO
|
||||
self.reg_write(CONFIG, config)
|
||||
|
||||
def set_channel(self, channel):
|
||||
self.reg_write(RF_CH, min(channel, 125))
|
||||
|
||||
# address should be a bytes object 5 bytes long
|
||||
def open_tx_pipe(self, address):
|
||||
assert len(address) == 5
|
||||
self.reg_write_bytes(RX_ADDR_P0, address)
|
||||
self.reg_write_bytes(TX_ADDR, address)
|
||||
self.reg_write(RX_PW_P0, self.payload_size)
|
||||
|
||||
# address should be a bytes object 5 bytes long
|
||||
# pipe 0 and 1 have 5 byte address
|
||||
# pipes 2-5 use same 4 most-significant bytes as pipe 1, plus 1 extra byte
|
||||
def open_rx_pipe(self, pipe_id, address):
|
||||
assert len(address) == 5
|
||||
assert 0 <= pipe_id <= 5
|
||||
if pipe_id == 0:
|
||||
self.pipe0_read_addr = address
|
||||
if pipe_id < 2:
|
||||
self.reg_write_bytes(RX_ADDR_P0 + pipe_id, address)
|
||||
else:
|
||||
self.reg_write(RX_ADDR_P0 + pipe_id, address[0])
|
||||
self.reg_write(RX_PW_P0 + pipe_id, self.payload_size)
|
||||
self.reg_write(EN_RXADDR, self.reg_read(EN_RXADDR) | (1 << pipe_id))
|
||||
|
||||
def start_listening(self):
|
||||
self.reg_write(CONFIG, self.reg_read(CONFIG) | PWR_UP | PRIM_RX)
|
||||
self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT)
|
||||
|
||||
if self.pipe0_read_addr is not None:
|
||||
self.reg_write_bytes(RX_ADDR_P0, self.pipe0_read_addr)
|
||||
|
||||
self.flush_rx()
|
||||
self.flush_tx()
|
||||
self.ce(1)
|
||||
utime.sleep_us(130)
|
||||
|
||||
def stop_listening(self):
|
||||
self.ce(0)
|
||||
self.flush_tx()
|
||||
self.flush_rx()
|
||||
|
||||
# returns True if any data available to recv
|
||||
def any(self):
|
||||
return not bool(self.reg_read(FIFO_STATUS) & RX_EMPTY)
|
||||
|
||||
def recv(self):
|
||||
# get the data
|
||||
self.cs(0)
|
||||
self.spi.readinto(self.buf, R_RX_PAYLOAD)
|
||||
buf = self.spi.read(self.payload_size)
|
||||
self.cs(1)
|
||||
# clear RX ready flag
|
||||
self.reg_write(STATUS, RX_DR)
|
||||
|
||||
return buf
|
||||
|
||||
# blocking wait for tx complete
|
||||
def send(self, buf, timeout=500):
|
||||
self.send_start(buf)
|
||||
start = utime.ticks_ms()
|
||||
result = None
|
||||
while result is None and utime.ticks_diff(utime.ticks_ms(), start) < timeout:
|
||||
result = self.send_done() # 1 == success, 2 == fail
|
||||
if result == 2:
|
||||
raise OSError("send failed")
|
||||
|
||||
# non-blocking tx
|
||||
def send_start(self, buf):
|
||||
# power up
|
||||
self.reg_write(CONFIG, (self.reg_read(CONFIG) | PWR_UP) & ~PRIM_RX)
|
||||
utime.sleep_us(150)
|
||||
# send the data
|
||||
self.cs(0)
|
||||
self.spi.readinto(self.buf, W_TX_PAYLOAD)
|
||||
self.spi.write(buf)
|
||||
if len(buf) < self.payload_size:
|
||||
self.spi.write(b"\x00" * (self.payload_size - len(buf))) # pad out data
|
||||
self.cs(1)
|
||||
|
||||
# enable the chip so it can send the data
|
||||
self.ce(1)
|
||||
utime.sleep_us(15) # needs to be >10us
|
||||
self.ce(0)
|
||||
|
||||
# returns None if send still in progress, 1 for success, 2 for fail
|
||||
def send_done(self):
|
||||
if not (self.reg_read(STATUS) & (TX_DS | MAX_RT)):
|
||||
return None # tx not finished
|
||||
|
||||
# either finished or failed: get and clear status flags, power down
|
||||
status = self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT)
|
||||
self.reg_write(CONFIG, self.reg_read(CONFIG) & ~PWR_UP)
|
||||
return 1 if status & TX_DS else 2
|
|
@ -0,0 +1,150 @@
|
|||
"""Test for nrf24l01 module. Portable between MicroPython targets."""
|
||||
|
||||
import usys
|
||||
import ustruct as struct
|
||||
import utime
|
||||
from machine import Pin, SPI
|
||||
from nrf24l01 import NRF24L01
|
||||
from micropython import const
|
||||
|
||||
# Slave pause between receiving data and checking for further packets.
|
||||
_RX_POLL_DELAY = const(15)
|
||||
# Slave pauses an additional _SLAVE_SEND_DELAY ms after receiving data and before
|
||||
# transmitting to allow the (remote) master time to get into receive mode. The
|
||||
# master may be a slow device. Value tested with Pyboard, ESP32 and ESP8266.
|
||||
_SLAVE_SEND_DELAY = const(10)
|
||||
|
||||
if usys.platform == "pyboard":
|
||||
cfg = {"spi": 2, "miso": "Y7", "mosi": "Y8", "sck": "Y6", "csn": "Y5", "ce": "Y4"}
|
||||
elif usys.platform == "esp8266": # Hardware SPI
|
||||
cfg = {"spi": 1, "miso": 12, "mosi": 13, "sck": 14, "csn": 4, "ce": 5}
|
||||
elif usys.platform == "esp32": # Software SPI
|
||||
cfg = {"spi": -1, "miso": 32, "mosi": 33, "sck": 25, "csn": 26, "ce": 27}
|
||||
else:
|
||||
raise ValueError("Unsupported platform {}".format(usys.platform))
|
||||
|
||||
# Addresses are in little-endian format. They correspond to big-endian
|
||||
# 0xf0f0f0f0e1, 0xf0f0f0f0d2
|
||||
pipes = (b"\xe1\xf0\xf0\xf0\xf0", b"\xd2\xf0\xf0\xf0\xf0")
|
||||
|
||||
|
||||
def master():
|
||||
csn = Pin(cfg["csn"], mode=Pin.OUT, value=1)
|
||||
ce = Pin(cfg["ce"], mode=Pin.OUT, value=0)
|
||||
if cfg["spi"] == -1:
|
||||
spi = SPI(-1, sck=Pin(cfg["sck"]), mosi=Pin(cfg["mosi"]), miso=Pin(cfg["miso"]))
|
||||
nrf = NRF24L01(spi, csn, ce, payload_size=8)
|
||||
else:
|
||||
nrf = NRF24L01(SPI(cfg["spi"]), csn, ce, payload_size=8)
|
||||
|
||||
nrf.open_tx_pipe(pipes[0])
|
||||
nrf.open_rx_pipe(1, pipes[1])
|
||||
nrf.start_listening()
|
||||
|
||||
num_needed = 16
|
||||
num_successes = 0
|
||||
num_failures = 0
|
||||
led_state = 0
|
||||
|
||||
print("NRF24L01 master mode, sending %d packets..." % num_needed)
|
||||
|
||||
while num_successes < num_needed and num_failures < num_needed:
|
||||
# stop listening and send packet
|
||||
nrf.stop_listening()
|
||||
millis = utime.ticks_ms()
|
||||
led_state = max(1, (led_state << 1) & 0x0F)
|
||||
print("sending:", millis, led_state)
|
||||
try:
|
||||
nrf.send(struct.pack("ii", millis, led_state))
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
# start listening again
|
||||
nrf.start_listening()
|
||||
|
||||
# wait for response, with 250ms timeout
|
||||
start_time = utime.ticks_ms()
|
||||
timeout = False
|
||||
while not nrf.any() and not timeout:
|
||||
if utime.ticks_diff(utime.ticks_ms(), start_time) > 250:
|
||||
timeout = True
|
||||
|
||||
if timeout:
|
||||
print("failed, response timed out")
|
||||
num_failures += 1
|
||||
|
||||
else:
|
||||
# recv packet
|
||||
(got_millis,) = struct.unpack("i", nrf.recv())
|
||||
|
||||
# print response and round-trip delay
|
||||
print(
|
||||
"got response:",
|
||||
got_millis,
|
||||
"(delay",
|
||||
utime.ticks_diff(utime.ticks_ms(), got_millis),
|
||||
"ms)",
|
||||
)
|
||||
num_successes += 1
|
||||
|
||||
# delay then loop
|
||||
utime.sleep_ms(250)
|
||||
|
||||
print("master finished sending; successes=%d, failures=%d" % (num_successes, num_failures))
|
||||
|
||||
|
||||
def slave():
|
||||
csn = Pin(cfg["csn"], mode=Pin.OUT, value=1)
|
||||
ce = Pin(cfg["ce"], mode=Pin.OUT, value=0)
|
||||
if cfg["spi"] == -1:
|
||||
spi = SPI(-1, sck=Pin(cfg["sck"]), mosi=Pin(cfg["mosi"]), miso=Pin(cfg["miso"]))
|
||||
nrf = NRF24L01(spi, csn, ce, payload_size=8)
|
||||
else:
|
||||
nrf = NRF24L01(SPI(cfg["spi"]), csn, ce, payload_size=8)
|
||||
|
||||
nrf.open_tx_pipe(pipes[1])
|
||||
nrf.open_rx_pipe(1, pipes[0])
|
||||
nrf.start_listening()
|
||||
|
||||
print("NRF24L01 slave mode, waiting for packets... (ctrl-C to stop)")
|
||||
|
||||
while True:
|
||||
if nrf.any():
|
||||
while nrf.any():
|
||||
buf = nrf.recv()
|
||||
millis, led_state = struct.unpack("ii", buf)
|
||||
print("received:", millis, led_state)
|
||||
for led in leds:
|
||||
if led_state & 1:
|
||||
led.on()
|
||||
else:
|
||||
led.off()
|
||||
led_state >>= 1
|
||||
utime.sleep_ms(_RX_POLL_DELAY)
|
||||
|
||||
# Give master time to get into receive mode.
|
||||
utime.sleep_ms(_SLAVE_SEND_DELAY)
|
||||
nrf.stop_listening()
|
||||
try:
|
||||
nrf.send(struct.pack("i", millis))
|
||||
except OSError:
|
||||
pass
|
||||
print("sent response")
|
||||
nrf.start_listening()
|
||||
|
||||
|
||||
try:
|
||||
import pyb
|
||||
|
||||
leds = [pyb.LED(i + 1) for i in range(4)]
|
||||
except:
|
||||
leds = []
|
||||
|
||||
print("NRF24L01 test module loaded")
|
||||
print("NRF24L01 pinout for test:")
|
||||
print(" CE on", cfg["ce"])
|
||||
print(" CSN on", cfg["csn"])
|
||||
print(" SCK on", cfg["sck"])
|
||||
print(" MISO on", cfg["miso"])
|
||||
print(" MOSI on", cfg["mosi"])
|
||||
print("run nrf24l01test.slave() on slave, then nrf24l01test.master() on master")
|
Ładowanie…
Reference in New Issue