lora: Add lora modem drivers for SX127x and SX126x.

Includes:
- component oriented driver, to only install the parts that are needed
- synchronous operation
- async wrapper class for asynchronous operation
- two examples with async & synchronous versions
- documentation

This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
pull/501/head
Angus Gratton 2022-06-29 09:50:15 +10:00 zatwierdzone przez Damien George
rodzic 7128d423c2
commit 1957f24020
22 zmienionych plików z 4955 dodań i 0 usunięć

Plik diff jest za duży Load Diff

Wyświetl plik

@ -0,0 +1,93 @@
# LoRa Reliable Delivery Example
This example shows a basic custom protocol for reliable one way communication
from low-power remote devices to a central base device:
- A single "receiver" device, running on mains power, listens continuously for
messages from one or more "sender" devices. Messages are payloads inside LoRa packets,
with some additional framing and address in the LoRa packet payload.
- "Sender" devices are remote sensor nodes, possibly battery powered. These wake
up periodically, read some data from a sensor, and send it in a message to the receiver.
- Messages are transmitted "reliably" with some custom header information,
meaning the receiver will acknowledge it received each message and the sender
will retry sending if it doesn't receive the acknowledgement.
## Source Files
* `lora_rd_settings.py` contains some common settings that are imported by
sender and receiver. These settings will need to be modified for the correct
frequency and other settings, before running the examples.
* `receiver.py` and `receiver_async.py` contain a synchronous (low-level API)
and asynchronous (iterator API) implementation of the same receiver program,
respectively. These two programs should work the same, they are intended show
different ways the driver can be used.
* `sender.py` and `sender_async.py` contain a synchronous (simple API) and
asynchronous (async API) implementation of the same sender program,
respectively. Because the standard async API resembles the Simple API, these
implementations are *very* similar. The two programs should work the same,
they are intended to show different ways the driver can be used.
## Running the examples
One way to run this example interactively:
1. Install or "freeze in" the necessary lora modem driver package (`lora-sx127x`
or `lora-sx126x`) and optionally the `lora-async` package if using the async
examples (see main lora `README.md` in the above directory for details).
2. Edit the `lora_rd_settings.py` file to set the frequency and other protocol
settings for your region and hardware (see main lora `README.md`).
3. Edit the program you plan to run and fill in the `get_modem()` function with
the correct modem type, pin assignments, etc. for your board (see top-level
README). Note the `get_modem()` function should use the existing `lora_cfg`
variable, which holds the settings imported from `lora_rd_settings.py`.
4. Change to this directory in a terminal.
5. Run `mpremote mount . exec receiver.py` on one board and `mpremote mount
. exec sender.py` on another (or swap in `receiver_async.py` and/or
`sender_async.py` as desired).
Consult the [mpremote
documentation](https://docs.micropython.org/en/latest/reference/mpremote.html)
for an explanation of these commands and the options needed to run two copies of
`mpremote` on different serial ports at the same time.
## Automatic Performance Tuning
- When sending an ACK, the receiver includes the RSSI of the received
packet. Senders will automatically modify their output_power to minimize the
power consumption required to reach the receiver. Similarly, if no ACK is
received then they will increase their output power and also re-run Image
calibration in order to maximize RX performance.
## Message payloads
Messages are LoRa packets, set up as follows:
LoRA implicit header mode, CRCs enabled.
* Each remote device has a unique sixteen-bit ID (range 00x0000 to 0xFFFE). ID
0xFFFF is reserved for the single receiver device.
* An eight-bit message counter is used to identify duplicate messages
* Data message format is:
- Sender ID (two bytes, little endian)
- Counter byte (incremented on each new message, not incremented on retry).
- Message length (1 byte)
- Message (variable length)
- Checksum byte (sum of all proceeding bytes in message, modulo 256). The LoRa
packet has its own 16-bit CRC, this is included as an additional way to
disambiguate other LoRa packets that might appear the same.
* After receiving a valid data message, the receiver device should send
an acknowledgement message 25ms after the modem receive completed.
Acknowledgement message format:
- 0xFFFF (receiver station ID as two bytes)
- Sender's Device ID from received message (two bytes, little endian)
- Counter byte from received message
- Checksum byte from received message
- RSSI value as received by radio (one signed byte)
* If the remote device doesn't receive a packet with the acknowledgement
message, it retries up to a configurable number of times (default 4) with a
basic exponential backoff formula.

Wyświetl plik

@ -0,0 +1,38 @@
# MicroPython lora reliable_delivery example - common protocol settings
# MIT license; Copyright (c) 2023 Angus Gratton
#
######
# To be able to be able to communicate, most of these settings need to match on both radios.
# Consult the example README for more information about how to use the example.
######
# LoRa protocol configuration
#
# Currently configured for relatively slow & low bandwidth settings, which
# gives more link budget and possible range.
#
# These settings should match on receiver.
#
# Check the README and local regulations to know what configuration settings
# are available.
lora_cfg = {
"freq_khz": 916000,
"sf": 10,
"bw": "62.5", # kHz
"coding_rate": 8,
"preamble_len": 12,
"output_power": 10, # dBm
}
# Single receiver has a fixed 16-bit ID value (senders each have a unique value).
RECEIVER_ID = 0xFFFF
# Length of an ACK message in bytes.
ACK_LENGTH = 7
# Send the ACK this many milliseconds after receiving a valid message
#
# This can be quite a bit lower (25ms or so) if wakeup times are short
# and _DEBUG is turned off on the modems (logging to UART delays everything).
ACK_DELAY_MS = 100

Wyświetl plik

@ -0,0 +1,163 @@
# MicroPython lora reliable_delivery example - synchronous receiver program
# MIT license; Copyright (c) 2023 Angus Gratton
import struct
import time
import machine
from machine import SPI, Pin
from micropython import const
from lora import RxPacket
from lora_rd_settings import RECEIVER_ID, ACK_LENGTH, ACK_DELAY_MS, lora_cfg
# Change _DEBUG to const(True) to get some additional debugging output
# about timing, RSSI, etc.
#
# For a lot more debugging detail, go to the modem driver and set _DEBUG there to const(True)
_DEBUG = const(False)
# Keep track of the last counter value we got from each known sender
# this allows us to tell if packets are being lost
last_counters = {}
def get_modem():
# from lora import SX1276
# return SX1276(
# spi=SPI(1, baudrate=2000_000, polarity=0, phase=0,
# miso=Pin(19), mosi=Pin(27), sck=Pin(5)),
# cs=Pin(18),
# dio0=Pin(26),
# dio1=Pin(35),
# reset=Pin(14),
# lora_cfg=lora_cfg,
# )
raise NotImplementedError("Replace this function with one that returns a lora modem instance")
def main():
print("Initializing...")
modem = get_modem()
print("Main loop started")
receiver = Receiver(modem)
while True:
# With wait=True, this function blocks until something is received and always
# returns non-None
sender_id, data = receiver.recv(wait=True)
# Do something with the data!
print(f"Received {data} from {sender_id:#x}")
class Receiver:
def __init__(self, modem):
self.modem = modem
self.last_counters = {} # Track the last counter value we got from each sender ID
self.rx_packet = None # Reuse RxPacket object when possible, save allocation
self.ack_buffer = bytearray(ACK_LENGTH) # reuse the same buffer for ACK packets
self.skipped_packets = 0 # Counter of skipped packets
modem.calibrate()
# Start receiving immediately. We expect the modem to receive continuously
self.will_irq = modem.start_recv(continuous=True)
print("Modem initialized and started receive...")
def recv(self, wait=True):
# Receive a packet from the sender, including sending an ACK.
#
# Returns a tuple of the 16-bit sender id and the sensor data payload.
#
# This function should be called very frequently from the main loop (at
# least every ACK_DELAY_MS milliseconds), to avoid not sending ACKs in time.
#
# If 'wait' argument is True (default), the function blocks indefinitely
# until a packet is received. If False then it will return None
# if no packet is available.
#
# Note that because we called start_recv(continuous=True), the modem
# will keep receiving on its own - even if when we call send() to
# send an ACK.
while True:
rx = self.modem.poll_recv(rx_packet=self.rx_packet)
if isinstance(rx, RxPacket): # value will be True or an RxPacket instance
decoded = self._handle_rx(rx)
if decoded:
return decoded # valid LoRa packet and valid for this application
if not wait:
return None
# Otherwise, wait for an IRQ (or have a short sleep) and then poll recv again
# (receiver is not a low power node, so don't bother with sleep modes.)
if self.will_irq:
while not self.modem.irq_triggered():
machine.idle()
else:
time.sleep_ms(1)
def _handle_rx(self, rx):
# Internal function to handle a received packet and either send an ACK
# and return the sender and the payload, or return None if packet
# payload is invalid or a duplicate.
if len(rx) < 5: # 4 byte header plus 1 byte checksum
print("Invalid packet length")
return None
sender_id, counter, data_len = struct.unpack("<HBB", rx)
csum = rx[-1]
if len(rx) != data_len + 5:
print("Invalid length in payload header")
return None
calc_csum = sum(b for b in rx[:-1]) & 0xFF
if csum != calc_csum:
print(f"Invalid checksum. calc={calc_csum:#x} received={csum:#x}")
return None
# Packet is valid!
if _DEBUG:
print(f"RX {data_len} byte message RSSI {rx.rssi} at timestamp {rx.ticks_ms}")
# Send the ACK
struct.pack_into(
"<HHBBb", self.ack_buffer, 0, RECEIVER_ID, sender_id, counter, csum, rx.rssi
)
# Time send to start as close to ACK_DELAY_MS after message was received as possible
tx_at_ms = time.ticks_add(rx.ticks_ms, ACK_DELAY_MS)
tx_done = self.modem.send(self.ack_buffer, tx_at_ms=tx_at_ms)
if _DEBUG:
tx_time = time.ticks_diff(tx_done, tx_at_ms)
expected = self.modem.get_time_on_air_us(ACK_LENGTH) / 1000
print(f"ACK TX {tx_at_ms}ms -> {tx_done}ms took {tx_time}ms expected {expected}")
# Check if the data we received is fresh or stale
if sender_id not in self.last_counters:
print(f"New device id {sender_id:#x}")
elif self.last_counters[sender_id] == counter:
print(f"Duplicate packet received from {sender_id:#x}")
return None
elif counter != 1:
# If the counter from this sender has gone up by more than 1 since
# last time we got a packet, we know there is some packet loss.
#
# (ignore the case where the new counter is 1, as this probably
# means a reset.)
delta = (counter - 1 - self.last_counters[sender_id]) & 0xFF
if delta:
print(f"Skipped/lost {delta} packets from {sender_id:#x}")
self.skipped_packets += delta
self.last_counters[sender_id] = counter
return sender_id, rx[4:-1]
if __name__ == "__main__":
main()

Wyświetl plik

@ -0,0 +1,121 @@
# MicroPython lora reliable_delivery example - asynchronous receiver program
# MIT license; Copyright (c) 2023 Angus Gratton
import struct
import time
import asyncio
from machine import SPI, Pin
from micropython import const
from lora_rd_settings import RECEIVER_ID, ACK_LENGTH, ACK_DELAY_MS, lora_cfg
# Change _DEBUG to const(True) to get some additional debugging output
# about timing, RSSI, etc.
#
# For a lot more debugging detail, go to the modem driver and set _DEBUG there to const(True)
_DEBUG = const(False)
# Keep track of the last counter value we got from each known sender
# this allows us to tell if packets are being lost
last_counters = {}
def get_async_modem():
# from lora import AsyncSX1276
# return AsyncSX1276(
# spi=SPI(1, baudrate=2000_000, polarity=0, phase=0,
# miso=Pin(19), mosi=Pin(27), sck=Pin(5)),
# cs=Pin(18),
# dio0=Pin(26),
# dio1=Pin(35),
# reset=Pin(14),
# lora_cfg=lora_cfg,
# )
raise NotImplementedError("Replace this function with one that returns a lora modem instance")
def main():
# Initializing the modem.
#
print("Initializing...")
modem = get_async_modem()
asyncio.run(recv_continuous(modem, rx_callback))
async def rx_callback(sender_id, data):
# Do something with the data!
print(f"Received {data} from {sender_id:#x}")
async def recv_continuous(modem, callback):
# Async task which receives packets from the AsyncModem recv_continuous()
# iterator, checks if they are valid, and send back an ACK if needed.
#
# On each successful message, we await callback() to allow the application
# to do something with the data. Callback args are sender_id (as int) and the bytes
# of the message payload.
last_counters = {} # Track the last counter value we got from each sender ID
ack_buffer = bytearray(ACK_LENGTH) # reuse the same buffer for ACK packets
skipped_packets = 0 # Counter of skipped packets
modem.calibrate()
async for rx in modem.recv_continuous():
# Filter 'rx' packet to determine if it's valid for our application
if len(rx) < 5: # 4 byte header plus 1 byte checksum
print("Invalid packet length")
continue
sender_id, counter, data_len = struct.unpack("<HBB", rx)
csum = rx[-1]
if len(rx) != data_len + 5:
print("Invalid length in payload header")
continue
calc_csum = sum(b for b in rx[:-1]) & 0xFF
if csum != calc_csum:
print(f"Invalid checksum. calc={calc_csum:#x} received={csum:#x}")
continue
# Packet is valid!
if _DEBUG:
print(f"RX {data_len} byte message RSSI {rx.rssi} at timestamp {rx.ticks_ms}")
# Send the ACK
struct.pack_into("<HHBBb", ack_buffer, 0, RECEIVER_ID, sender_id, counter, csum, rx.rssi)
# Time send to start as close to ACK_DELAY_MS after message was received as possible
tx_at_ms = time.ticks_add(rx.ticks_ms, ACK_DELAY_MS)
tx_done = await modem.send(ack_buffer, tx_at_ms=tx_at_ms)
if _DEBUG:
tx_time = time.ticks_diff(tx_done, tx_at_ms)
expected = modem.get_time_on_air_us(ACK_LENGTH) / 1000
print(f"ACK TX {tx_at_ms}ms -> {tx_done}ms took {tx_time}ms expected {expected}")
# Check if the data we received is fresh or stale
if sender_id not in last_counters:
print(f"New device id {sender_id:#x}")
elif last_counters[sender_id] == counter:
print(f"Duplicate packet received from {sender_id:#x}")
continue
elif counter != 1:
# If the counter from this sender has gone up by more than 1 since
# last time we got a packet, we know there is some packet loss.
#
# (ignore the case where the new counter is 1, as this probably
# means a reset.)
delta = (counter - 1 - last_counters[sender_id]) & 0xFF
if delta:
print(f"Skipped/lost {delta} packets from {sender_id:#x}")
skipped_packets += delta
last_counters[sender_id] = counter
await callback(sender_id, rx[4:-1])
if __name__ == "__main__":
main()

Wyświetl plik

@ -0,0 +1,213 @@
# MicroPython lora reliable_delivery example - synchronous sender program
# MIT license; Copyright (c) 2023 Angus Gratton
import machine
from machine import SPI, Pin
import random
import struct
import time
from lora_rd_settings import RECEIVER_ID, ACK_LENGTH, ACK_DELAY_MS, lora_cfg
SLEEP_BETWEEN_MS = 5000 # Main loop should sleep this long between sending data to the receiver
MAX_RETRIES = 4 # Retry each message this often if no ACK is received
# Initial retry is after this long. Increases by 1.25x each subsequent retry.
BASE_RETRY_TIMEOUT_MS = 1000
# Add random jitter to each retry period, up to this long. Useful to prevent two
# devices ending up in sync.
RETRY_JITTER_MS = 1500
# If reported RSSI value is lower than this, increase
# output power 1dBm
RSSI_WEAK_THRESH = -110
# If reported RSSI value is higher than this, decrease
# output power 1dBm
RSSI_STRONG_THRESH = -70
# IMPORTANT: Set this to the maximum output power in dBm that is permitted in
# your regulatory environment.
OUTPUT_MAX_DBM = 15
OUTPUT_MIN_DBM = -20
def get_modem():
# from lora import SX1276
# return SX1276(
# spi=SPI(1, baudrate=2000_000, polarity=0, phase=0,
# miso=Pin(19), mosi=Pin(27), sck=Pin(5)),
# cs=Pin(18),
# dio0=Pin(26),
# dio1=Pin(35),
# reset=Pin(14),
# lora_cfg=lora_cfg,
# )
raise NotImplementedError("Replace this function with one that returns a lora modem instance")
def main():
modem = get_modem()
# Unique ID of this sender, 16-bit number. This method of generating an ID is pretty crummy,
# if using this in a real application then probably better to store these in the filesystem or
# something like that
DEVICE_ID = sum(b for b in machine.unique_id()) & 0xFFFF
sender = Sender(modem, DEVICE_ID)
while True:
sensor_data = get_sensor_data()
sender.send(sensor_data)
# Sleep until the next time we should read the sensor data and send it to
# the receiver.
#
# The goal for the device firmware is to spend most of its time in the lowest
# available sleep state, to save power.
#
# Note that if the sensor(s) in a real program generates events, these can be
# hooked to interrupts and used to wake Micropython up to send data,
# instead.
modem.sleep()
time.sleep_ms(SLEEP_BETWEEN_MS) # TODO see if this can be machine.lightsleep()
def get_sensor_data():
# Return a bytes object with the latest sensor data to send to the receiver.
#
# As this is just an example, we send a dummy payload which is just a string
# containing our ticks_ms() timestamp.
#
# In a real application the sensor data should usually be binary data and
# not a string, to save transmission size.
return f"Hello, ticks_ms={time.ticks_ms()}".encode()
class Sender:
def __init__(self, modem, device_id):
self.modem = modem
self.device_id = device_id
self.counter = 0
self.output_power = lora_cfg["output_power"] # start with common settings power level
self.rx_ack = None # reuse the ack message object when we can
print(f"Sender initialized with ID {device_id:#x}")
random.seed(device_id)
self.adjust_output_power(0) # set the initial value within MIN/MAX
modem.calibrate()
def send(self, sensor_data, adjust_output_power=True):
# Send a packet of sensor data to the receiver reliably.
#
# Returns True if data was successfully sent and ACKed, False otherwise.
#
# If adjust_output_power==True then increase or decrease output power
# according to the RSSI reported in the ACK packet.
self.counter = (self.counter + 1) & 0xFF
# Prepare the simple payload with header and checksum
# See README for a summary of the simple data message format
payload = bytearray(len(sensor_data) + 5)
struct.pack_into("<HBB", payload, 0, self.device_id, self.counter, len(sensor_data))
payload[4:-1] = sensor_data
payload[-1] = sum(b for b in payload) & 0xFF
# Calculate the time on air (in milliseconds) for an ACK packet
ack_packet_ms = self.modem.get_time_on_air_us(ACK_LENGTH) // 1000 + 1
timeout = BASE_RETRY_TIMEOUT_MS
print(f"Sending {len(payload)} bytes")
# Send the payload, until we receive an acknowledgement or run out of retries
for _ in range(MAX_RETRIES):
# Using simple API here.
#
# We could reduce power consumption a little by going to sleep
# instead of waiting for the send/recv to complete, but doing
# this well involves setting port-specific wakeup settings. Provided
# "dio" pin is assigned then the MCU will mostly be in a lower power
# idle state while the radio sends, at least.
sent_at = self.modem.send(payload)
# We expect the receiver of a valid message to start sending the ACK
# approximately ACK_DELAY_MS after receiving the message (to allow
# the sender time to reconfigure the modem.)
#
# We start receiving as soon as we can, but allow up to
# ACK_DELAY_MS*2 of total timing leeway - plus the time on air for
# the message itself
maybe_ack = self.modem.recv(ack_packet_ms + ACK_DELAY_MS * 2, rx_packet=self.rx_ack)
# Check if the packet we received is a valid ACK
rssi = self._ack_is_valid(maybe_ack, payload[-1])
if rssi is not None: # ACK is valid
self.rx_ack == maybe_ack
delta = time.ticks_diff(maybe_ack.ticks_ms, sent_at)
print(
f"ACKed with RSSI {rssi}, {delta}ms after sent "
+ f"(skew {delta-ACK_DELAY_MS-ack_packet_ms}ms)"
)
if adjust_output_power:
if rssi > RSSI_STRONG_THRESH:
self.adjust_output_power(-1)
elif rssi < RSSI_WEAK_THRESH:
self.adjust_output_power(1)
return True
# Otherwise, prepare to sleep briefly and then retry
next_try_at = time.ticks_add(sent_at, timeout)
sleep_time = time.ticks_diff(next_try_at, time.ticks_ms()) + random.randrange(
RETRY_JITTER_MS
)
if sleep_time > 0:
self.modem.sleep()
time.sleep_ms(sleep_time) # TODO: see if this can be machine.lightsleep
# add 25% timeout for next iteration
timeout = (timeout * 5) // 4
print(f"Failed, no ACK after {MAX_RETRIES} retries.")
if adjust_output_power:
self.adjust_output_power(2)
self.modem.calibrate_image() # try and improve the RX sensitivity for next time
return False
def _ack_is_valid(self, maybe_ack, csum):
# Private function to verify if the RxPacket held in 'maybe_ack' is a valid ACK for the
# current device_id and counter value, and provided csum value.
#
# If it is, returns the reported RSSI value from the packet.
# If not, returns None
if (not maybe_ack) or len(maybe_ack) != ACK_LENGTH:
return None
base_id, ack_id, ack_counter, ack_csum, rssi = struct.unpack("<HHBBb", maybe_ack)
if (
base_id != RECEIVER_ID
or ack_id != self.device_id
or ack_counter != self.counter
or ack_csum != csum
):
return None
return rssi
def adjust_output_power(self, delta_dbm):
# Adjust the modem output power by +/-delta_dbm, max of OUTPUT_MAX_DBM
#
# (note: the radio may also apply its own power limit internally.)
new = max(min(self.output_power + delta_dbm, OUTPUT_MAX_DBM), OUTPUT_MIN_DBM)
self.output_power = new
print(f"New output_power {new}/{OUTPUT_MAX_DBM} (delta {delta_dbm})")
self.modem.configure({"output_power": self.output_power})
if __name__ == "__main__":
main()

Wyświetl plik

@ -0,0 +1,205 @@
# MicroPython lora reliable_delivery example - asynchronous sender program
# MIT license; Copyright (c) 2023 Angus Gratton
import machine
from machine import SPI, Pin
import random
import struct
import time
import asyncio
from lora_rd_settings import RECEIVER_ID, ACK_LENGTH, ACK_DELAY_MS, lora_cfg
SLEEP_BETWEEN_MS = 5000 # Main loop should sleep this long between sending data to the receiver
MAX_RETRIES = 4 # Retry each message this often if no ACK is received
# Initial retry is after this long. Increases by 1.25x each subsequent retry.
BASE_RETRY_TIMEOUT_MS = 1000
# Add random jitter to each retry period, up to this long. Useful to prevent two
# devices ending up in sync.
RETRY_JITTER_MS = 1500
# If reported RSSI value is lower than this, increase
# output power 1dBm
RSSI_WEAK_THRESH = -110
# If reported RSSI value is higher than this, decrease
# output power 1dBm
RSSI_STRONG_THRESH = -70
# IMPORTANT: Set this to the maximum output power in dBm that is permitted in
# your regulatory environment.
OUTPUT_MAX_DBM = 15
OUTPUT_MIN_DBM = -20
def get_async_modem():
# from lora import AsyncSX1276
# return AsyncSX1276(
# spi=SPI(1, baudrate=2000_000, polarity=0, phase=0,
# miso=Pin(19), mosi=Pin(27), sck=Pin(5)),
# cs=Pin(18),
# dio0=Pin(26),
# dio1=Pin(35),
# reset=Pin(14),
# lora_cfg=lora_cfg,
# )
raise NotImplementedError("Replace this function with one that returns a lora modem instance")
def main():
modem = get_async_modem()
asyncio.run(sender_task(modem))
async def sender_task(modem):
# Unique ID of this sender, 16-bit number. This method of generating an ID is pretty crummy,
# if using this in a real application then probably better to store these in the filesystem or
# something like that
DEVICE_ID = sum(b for b in machine.unique_id()) & 0xFFFF
sender = AsyncSender(modem, DEVICE_ID)
while True:
sensor_data = await get_sensor_data()
await sender.send(sensor_data)
# Sleep until the next time we should read the sensor data and send it to
# the receiver. awaiting here means other tasks will run.
modem.sleep()
await asyncio.sleep_ms(SLEEP_BETWEEN_MS)
async def get_sensor_data():
# Return a bytes object with the latest sensor data to send to the receiver.
#
# As this is just an example, we send a dummy payload which is just a string
# containing our ticks_ms() timestamp.
#
# In a real application the sensor data should usually be binary data and
# not a string, to save transmission size.
return f"Hello, ticks_ms={time.ticks_ms()}".encode()
class AsyncSender:
def __init__(self, modem, device_id):
self.modem = modem
self.device_id = device_id
self.counter = 0
self.output_power = lora_cfg["output_power"] # start with common settings power level
self.rx_ack = None # reuse the ack message object when we can
print(f"Sender initialized with ID {device_id:#x}")
random.seed(device_id)
self.adjust_output_power(0) # set the initial value within MIN/MAX
modem.calibrate()
async def send(self, sensor_data, adjust_output_power=True):
# Send a packet of sensor data to the receiver reliably.
#
# Returns True if data was successfully sent and ACKed, False otherwise.
#
# If adjust_output_power==True then increase or decrease output power
# according to the RSSI reported in the ACK packet.
self.counter = (self.counter + 1) & 0xFF
# Prepare the simple payload with header and checksum
# See README for a summary of the simple data message format
payload = bytearray(len(sensor_data) + 5)
struct.pack_into("<HBB", payload, 0, self.device_id, self.counter, len(sensor_data))
payload[4:-1] = sensor_data
payload[-1] = sum(b for b in payload) & 0xFF
# Calculate the time on air (in milliseconds) for an ACK packet
ack_packet_ms = self.modem.get_time_on_air_us(ACK_LENGTH) // 1000 + 1
timeout = BASE_RETRY_TIMEOUT_MS
print(f"Sending {len(payload)} bytes")
# Send the payload, until we receive an acknowledgement or run out of retries
for _ in range(MAX_RETRIES):
sent_at = await self.modem.send(payload)
# We expect the receiver of a valid message to start sending the ACK
# approximately ACK_DELAY_MS after receiving the message (to allow
# the sender time to reconfigure the modem.)
#
# We start receiving as soon as we can, but allow up to
# ACK_DELAY_MS*2 of total timing leeway - plus the time on air for
# the packet itself
maybe_ack = await self.modem.recv(
ack_packet_ms + ACK_DELAY_MS * 2, rx_packet=self.rx_ack
)
# Check if the packet we received is a valid ACK
rssi = self._ack_is_valid(maybe_ack, payload[-1])
if rssi is not None: # ACK is valid
self.rx_ack == maybe_ack
delta = time.ticks_diff(maybe_ack.ticks_ms, sent_at)
print(
f"ACKed with RSSI {rssi}, {delta}ms after sent "
+ f"(skew {delta-ACK_DELAY_MS-ack_packet_ms}ms)"
)
if adjust_output_power:
if rssi > RSSI_STRONG_THRESH:
self.adjust_output_power(-1)
elif rssi < RSSI_WEAK_THRESH:
self.adjust_output_power(1)
return True
# Otherwise, prepare to sleep briefly and then retry
next_try_at = time.ticks_add(sent_at, timeout)
sleep_time = time.ticks_diff(next_try_at, time.ticks_ms()) + random.randrange(
RETRY_JITTER_MS
)
if sleep_time > 0:
self.modem.sleep()
await asyncio.sleep_ms(sleep_time)
# add 25% timeout for next iteration
timeout = (timeout * 5) // 4
print(f"Failed, no ACK after {MAX_RETRIES} retries.")
if adjust_output_power:
self.adjust_output_power(2)
self.modem.calibrate_image() # try and improve the RX sensitivity for next time
return False
def _ack_is_valid(self, maybe_ack, csum):
# Private function to verify if the RxPacket held in 'maybe_ack' is a valid ACK for the
# current device_id and counter value, and provided csum value.
#
# If it is, returns the reported RSSI value from the packet.
# If not, returns None
if (not maybe_ack) or len(maybe_ack) != ACK_LENGTH:
return None
base_id, ack_id, ack_counter, ack_csum, rssi = struct.unpack("<HHBBb", maybe_ack)
if (
base_id != RECEIVER_ID
or ack_id != self.device_id
or ack_counter != self.counter
or ack_csum != csum
):
return None
return rssi
def adjust_output_power(self, delta_dbm):
# Adjust the modem output power by +/-delta_dbm, max of OUTPUT_MAX_DBM
#
# (note: the radio may also apply its own power limit internally.)
new = max(min(self.output_power + delta_dbm, OUTPUT_MAX_DBM), OUTPUT_MIN_DBM)
self.output_power = new
print(f"New output_power {new}/{OUTPUT_MAX_DBM} (delta {delta_dbm})")
self.modem.configure({"output_power": self.output_power})
if __name__ == "__main__":
main()

Wyświetl plik

@ -0,0 +1,26 @@
# LoRa Simple RX/TX Example
## Source Files
* `simple_rxtx.py` is a very simple implementation of a program to alternately
receive and send LoRa packets.
* `simple_rxtx_async.py` is the same program implemented using async Python.
## Running the examples
One way to run this example interactively:
1. Install or "freeze in" the necessary lora modem driver package (`lora-sx127x`
or `lora-sx126x`) and optionally the `lora-async` package if using the async
examples (see main lora `README.md` in the above directory for details).
3. Edit the program you plan to run and fill in the `get_modem()` function with
the correct modem type, pin assignments, etc. for your board (see top-level
README).
4. Change to this directory in a terminal.
5. Run `mpremote run simple_rxtx.py` or `mpremote run
simple_rxtx_async.py` as applicable.
Consult the [mpremote
documentation](https://docs.micropython.org/en/latest/reference/mpremote.html)
for an explanation of these commands and the options needed to run two copies of
`mpremote` on different serial ports at the same time.

Wyświetl plik

@ -0,0 +1,51 @@
# MicroPython lora simple_rxtx example - synchronous API version
# MIT license; Copyright (c) 2023 Angus Gratton
import time
from machine import Pin, SPI
def get_modem():
# from lora import SX1276
#
# lora_cfg = {
# "freq_khz": 916000,
# "sf": 8,
# "bw": "500", # kHz
# "coding_rate": 8,
# "preamble_len": 12,
# "output_power": 0, # dBm
# }
#
# return SX1276(
# spi=SPI(1, baudrate=2000_000, polarity=0, phase=0,
# miso=Pin(19), mosi=Pin(27), sck=Pin(5)),
# cs=Pin(18),
# dio0=Pin(26),
# dio1=Pin(35),
# reset=Pin(14),
# lora_cfg=lora_cfg,
# )
raise NotImplementedError("Replace this function with one that returns a lora modem instance")
def main():
print("Initializing...")
modem = get_modem()
counter = 0
while True:
print("Sending...")
modem.send(f"Hello world from MicroPython #{counter}".encode())
print("Receiving...")
rx = modem.recv(timeout_ms=5000)
if rx:
print(f"Received: {repr(rx)}")
else:
print("Timeout!")
time.sleep(2)
counter += 1
if __name__ == "__main__":
main()

Wyświetl plik

@ -0,0 +1,60 @@
# MicroPython lora simple_rxtx example - asynchronous API version
# MIT license; Copyright (c) 2023 Angus Gratton
import asyncio
from machine import Pin, SPI
def get_async_modem():
# from lora import AsyncSX1276
#
# lora_cfg = {
# "freq_khz": 916000,
# "sf": 8,
# "bw": "500", # kHz
# "coding_rate": 8,
# "preamble_len": 12,
# "output_power": 0, # dBm
# }
#
# return AsyncSX1276(
# spi=SPI(1, baudrate=2000_000, polarity=0, phase=0,
# miso=Pin(19), mosi=Pin(27), sck=Pin(5)),
# cs=Pin(18),
# dio0=Pin(26),
# dio1=Pin(35),
# reset=Pin(14),
# lora_cfg=lora_cfg,
# )
raise NotImplementedError("Replace this function with one that returns a lora modem instance")
async def main_task():
modem = get_async_modem()
await asyncio.gather(
asyncio.create_task(send_coro(modem)),
asyncio.create_task(recv_coro(modem)),
)
async def recv_coro(modem):
while True:
print("Receiving...")
rx = await modem.recv(2000)
if rx:
print(f"Received: {repr(rx)}")
else:
print("Receive timeout!")
async def send_coro(modem):
counter = 0
while True:
print("Sending...")
await modem.send(f"Hello world from async MicroPython #{counter}".encode())
print("Sent!")
await asyncio.sleep(5)
counter += 1
if __name__ == "__main__":
asyncio.run(main_task())

Wyświetl plik

@ -0,0 +1,144 @@
# MicroPython LoRa async modem driver
# MIT license; Copyright (c) 2023 Angus Gratton
#
# LoRa is a registered trademark or service mark of Semtech Corporation or its affiliates.
import asyncio
import time
from micropython import const
# Set to True to get some additional printed debug output.
_DEBUG = const(False)
# Some "belts and braces" timeouts when using IRQs, to wake up if an IRQ hasn't
# fired as expected. Also a timeout for how rapidly to poll the modem during TX
# if no IRQ is enabled.
#
# All in milliseconds
_RX_POLL_WITH_IRQ = const(1000)
_RX_POLL_NO_IRQ = const(50)
_TX_POLL_LATE_IRQ = const(10)
class AsyncModem:
# Mixin-like base class that provides coroutines for modem send, recv & recv_continuous.
#
# Don't instantiate this class directly, instantiate one of the 'AsyncXYZ'
# modem classes defined in the lora module.
def _after_init(self):
# Separate flags for (rx, tx) as only one task can block on a flag
self._flags = (asyncio.ThreadSafeFlag(), asyncio.ThreadSafeFlag())
self.set_irq_callback(self._callback)
async def recv(self, timeout_ms=None, rx_length=0xFF, rx_packet=None):
# Async function to receive a single LoRa packet, with an optional timeout
#
# Same API as the "Simple API" synchronous function
self._flags[0].clear()
will_irq = self.start_recv(timeout_ms, False, rx_length)
return await self._recv(will_irq, rx_packet)
def recv_continuous(self, rx_length=0xFF, rx_packet=None):
# Returns an Async Iterator that continuously yields LoRa packets, until
# the modem is told to sleep() or standby().
#
# Once MicroPython has PEP525 support (PR #6668) then this somewhat
# fiddly implementation can probably be merged with recv() into a much simpler
# single _recv() private coro that either yields or returns, depending on
# an argument. The public API can stay the same.
will_irq = self.start_recv(None, True, rx_length)
return AsyncContinuousReceiver(self, will_irq, rx_packet)
async def _recv(self, will_irq, rx_packet):
# Second half of receiving is implemented here to share code with AsyncContinuousReceiver,
# until future refactor is possible (see comment immediately above this one.)
rx = True
while rx is True:
await self._wait(will_irq, 0, _RX_POLL_WITH_IRQ if will_irq else _RX_POLL_NO_IRQ)
rx = self.poll_recv(rx_packet)
if rx: # RxPacket instance
return rx
async def send(self, packet, tx_at_ms=None):
self._flags[1].clear()
self.prepare_send(packet)
timeout_ms = self.get_time_on_air_us(len(packet)) // 1000 + 50
if tx_at_ms is not None:
await asyncio.sleep_ms(max(0, time.ticks_diff(tx_at_ms, time.ticks_ms())))
if _DEBUG:
print("start_send")
will_irq = self.start_send()
tx = True
while tx is True:
await self._wait(will_irq, 1, timeout_ms)
tx = self.poll_send()
if _DEBUG:
print(f"poll_send returned tx={tx}")
# If we've already waited the estimated send time (plus 50ms) and the modem
# is not done, timeout and poll more rapidly from here on (unsure if
# this is necessary, outside of a serious bug, but doesn't hurt.)
timeout_ms = _TX_POLL_LATE_IRQ
return tx
async def _wait(self, will_irq, idx, timeout_ms):
# Common code path for blocking on an interrupt, if configured.
#
# idx is into _flags tuple and is 0 for rx and 1 for tx
#
# timeout_ms is the expected send time for sends, or a reasonable
# polling timeout for receives.
if _DEBUG:
print(f"wait will_irq={will_irq} timeout_ms={timeout_ms}")
if will_irq:
# In theory we don't need to ever timeout on the flag as the ISR will always
# fire, but this gives a workaround for possible race conditions or dropped interrupts.
try:
await asyncio.wait_for_ms(self._flags[idx].wait(), timeout_ms)
except asyncio.TimeoutError:
pass
else:
await asyncio.sleep_ms(timeout_ms)
if _DEBUG:
print(f"wait complete")
def _callback(self, _):
# IRQ callback from BaseModem._radio_isr. Hard IRQ context unless _DEBUG
# is on.
#
# Set both RX & TX flag. This isn't necessary for "real" interrupts, but may be necessary
# to wake both for the case of a "soft" interrupt triggered by sleep() or standby(), where
# both tasks need to unblock and return.
#
# The poll_recv() method is a no-op if _tx is set, so waking up both
# blocked tasks doesn't have much overhead (no second polling of modem
# IRQ flags, for example).
for f in self._flags:
f.set()
class AsyncContinuousReceiver:
# Stop-gap async iterator implementation until PEP525 support comes in, see
# longer comment on AsyncModem.recv_continuous() above.
def __init__(self, modem, will_irq, rx_packet):
self.modem = modem
self.will_irq = will_irq
self.rx_packet = rx_packet
def __aiter__(self):
return self
async def __anext__(self):
res = await self.modem._recv(self.will_irq, self.rx_packet)
if not res:
raise StopAsyncIteration
return res

Wyświetl plik

@ -0,0 +1,3 @@
metadata(version="0.1")
require("lora")
package("lora")

Wyświetl plik

@ -0,0 +1,877 @@
# MicroPython LoRa SX126x (SX1261, SX1262) driver
# MIT license; Copyright (c) 2023 Angus Gratton
#
# LoRa is a registered trademark or service mark of Semtech Corporation or its affiliates.
#
# In comments, abbreviation "DS" = Semtech SX1261/62 Datasheet Rev 2.1 (December 2021)
import struct
import time
from micropython import const
from machine import Pin
from .modem import BaseModem, ConfigError, RxPacket, _clamp, _flag
# Set _DEBUG to const(True) to print all SPI commands sent to the device, and all responses,
# plus a few additional pieces of information.
_DEBUG = const(False)
_REG_RXGAINCR = const(0x8AC) # Reset value 0x94
_REG_LSYNCRH = const(0x740)
_REG_LSYNCRL = const(0x741)
_CMD_CFG_DIO_IRQ = const(0x08) # args: IrqMask, Irq1Mask, Irq2Mask,. Irq3Mask
_CMD_CLR_ERRORS = const(0x07)
_CMD_CLR_IRQ_STATUS = const(0x02) # no args
_CMD_GET_ERROR = const(0x17)
_CMD_GET_IRQ_STATUS = const(0x12) # args: (r) Status, IrqStatus
_CMD_GET_RX_BUFFER_STATUS = const(0x13) # args: (r) Status, RxPayloadLength, RxBufferPointer
_CMD_GET_STATUS = const(0xC0) # args: (r) Status
_CMD_GET_PACKET_STATUS = const(0x14)
_CMD_READ_REGISTER = const(0x1D) # args: addr (2b), status, (r) Data0 ... DataN
_CMD_READ_BUFFER = const(0x1E) # args: Offset, (r) Status, Data0 ... DataN
_CMD_SET_BUFFER_BASE_ADDRESS = const(0x8F) # args: TxBaseAddr, RxBaseAddr
_CMD_SET_MODULATION_PARAMS = const(0x8B) # args (LoRa): Sf, Bw, Cr, Ldro
_CMD_SET_PACKET_PARAMS = const(
0x8C
) # args (LoRa): PbLength, HeaderType, PayloadLength, CrcType, InvertIQ
_CMD_SET_PACKET_TYPE = const(0x8A) # args: PktType
_CMD_SET_PA_CONFIG = const(0x95) # args: PaDutyCycle, HpMax, HpSel, 0x01
_CMD_SET_RF_FREQUENCY = const(0x86) # args: RfFreg
_CMD_SET_RX = const(0x82) # args: Timeout
_CMD_SET_SLEEP = const(0x84) # args: SleepCfg
_CMD_SET_STANDBY = const(0x80) # args: StandbyCfg
_CMD_SET_DIO3_AS_TCXO_CTRL = const(0x97) # args: Trim, Timeout (3b)
_CMD_SET_DIO2_AS_RF_SWITCH_CTRL = const(0x9D)
_CMD_SET_TX = const(0x83) # args: Timeout
_CMD_SET_TX_PARAMS = const(0x8E) # args: Power, RampTime
_CMD_WRITE_BUFFER = const(0x0E) # args: Offset, Data0 ... DataN
_CMD_WRITE_REGISTER = const(0x0D) # args: Addr, Data0 ... Data N
_CMD_CALIBRATE = const(0x89)
_CMD_CALIBRATE_IMAGE = const(0x98)
_STATUS_MODE_MASK = const(0x7 << 4)
_STATUS_MODE_SHIFT = const(4)
_STATUS_MODE_STANDBY_RC = const(0x2)
_STATUS_MODE_STANDBY_HSE32 = const(0x3)
_STATUS_MODE_FS = const(0x4)
_STATUS_MODE_RX = const(0x5)
_STATUS_MODE_TX = const(0x6)
_STATUS_CMD_MASK = const(0x6) # bits 1-3, bit 0 is reserved
_STATUS_CMD_SHIFT = const(1)
_STATUS_CMD_DATA_AVAIL = const(0x2)
_STATUS_CMD_TIMEOUT = const(0x3)
_STATUS_CMD_ERROR = const(0x4)
_STATUS_CMD_EXEC_FAIL = const(0x5)
_STATUS_CMD_TX_COMPLETE = const(0x6)
_CFG_SF_MIN = const(6) # inclusive
_CFG_SF_MAX = const(12) # inclusive
_IRQ_TX_DONE = const(1 << 0)
_IRQ_RX_DONE = const(1 << 1)
_IRQ_PREAMBLE_DETECTED = const(1 << 2)
_IRQ_SYNC_DETECTED = const(1 << 3)
_IRQ_HEADER_VALID = const(1 << 4)
_IRQ_HEADER_ERR = const(1 << 5)
_IRQ_CRC_ERR = const(1 << 6)
_IRQ_CAD_DONE = const(1 << 7)
_IRQ_CAD_DETECTED = const(1 << 8)
_IRQ_TIMEOUT = const(1 << 9)
# Register values
_REG_IQ_POLARITY_SETUP = const(0x0736)
_REG_RX_GAIN = const(0x08AC)
_REG_RTC_CTRL = const(0x0902) # DS 15.3 has a typo on this value! Confirmed from Semtech driver
_REG_EVT_CLR = const(0x0944)
_REG_EVT_CLR_MASK = const(0x02)
# IRQs the driver cares about when receiving
_IRQ_DRIVER_RX_MASK = const(_IRQ_RX_DONE | _IRQ_TIMEOUT | _IRQ_CRC_ERR | _IRQ_HEADER_ERR)
# Except when entering/waking from sleep, typical busy period <105us (ref RM0453 Table 33)
#
# However, if dio3_tcxo_start_time_us is set then can take a longer
# time to become valid, so a field is set on the modem object with the full timeout.
#
# In any case, timeouts here are to catch broken/bad hardware or massive driver
# bugs rather than commonplace issues.
#
_CMD_BUSY_TIMEOUT_BASE_US = const(200)
# Datasheet says 3.5ms needed to run a full Calibrate command (all blocks),
# however testing shows it can be as much as as 18ms.
_CALIBRATE_TYPICAL_TIME_US = const(3500)
_CALIBRATE_TIMEOUT_US = const(30000)
# Magic value used by SetRx command to indicate a continuous receive
_CONTINUOUS_TIMEOUT_VAL = const(0xFFFFFF)
class _SX126x(BaseModem):
# common IRQ masks used by the base class functions
_IRQ_RX_COMPLETE = _IRQ_RX_DONE | _IRQ_TIMEOUT
_IRQ_TX_COMPLETE = _IRQ_TX_DONE
# Common base class for SX1261, SX1262 and (pending) STM32WL55. These are all basically
# the same except for which PA ranges are supported
#
# Don't construct this directly, construct lora.SX1261, lora.SX1262, lora.AsyncSX1261
# or lora.AsyncSX1262
def __init__(
self,
spi,
cs,
busy,
dio1,
dio2_rf_sw,
dio3_tcxo_millivolts,
dio3_tcxo_start_time_us,
reset,
lora_cfg,
ant_sw,
):
super().__init__(ant_sw)
self._spi = spi
self._cs = cs
self._busy = busy
self._sleep = True # assume the radio is in sleep mode to start, will wake on _cmd
self._dio1 = dio1
busy.init(Pin.IN)
cs.init(Pin.OUT, value=1)
if dio1:
dio1.init(Pin.IN)
self._busy_timeout = _CMD_BUSY_TIMEOUT_BASE_US
self._buf = bytearray(9) # shared buffer for commands
# These settings are kept in the object (as can't read them back from the modem)
self._output_power = 14
self._bw = 125
assert self._bw_hz == 125000 # This field is set in base class, must match self._bw
# RampTime register value
# 0x02 is 40us, default value appears undocumented but this is the SX1276 default
self._ramp_val = 0x02
if reset:
# If the caller supplies a reset pin argument, reset the radio
reset.init(Pin.OUT, value=0)
time.sleep_ms(1)
reset(1)
time.sleep_ms(5)
else:
self.standby() # Otherwise, at least put the radio to a known state
status = self._get_status()
if (status[0] != _STATUS_MODE_STANDBY_RC and status[0] != _STATUS_MODE_STANDBY_HSE32) or (
status[1] > 1
):
# This check exists to determine that the SPI settings and modem
# selection are correct. Otherwise it's possible for the driver to
# run for quite some time before it detects an invalid response.
raise RuntimeError("Invalid initial status {}.".format(status))
if dio2_rf_sw:
self._cmd("BB", _CMD_SET_DIO2_AS_RF_SWITCH_CTRL, 1)
if dio3_tcxo_millivolts:
# Enable TCXO power via DIO3, if enabled
#
# timeout register is set in units of 15.625us each, use integer math
# to calculate and round up:
self._busy_timeout = (_CMD_BUSY_TIMEOUT_BASE_US + dio3_tcxo_start_time_us) * 2
timeout = (dio3_tcxo_start_time_us * 1000 + 15624) // 15625
if timeout < 0 or timeout > 1 << 24:
raise ValueError("{} out of range".format("dio3_tcxo_start_time_us"))
if dio3_tcxo_millivolts < 1600 or dio3_tcxo_millivolts > 3300:
raise ValueError("{} out of range".format("dio3_tcxo_millivolts"))
dv = dio3_tcxo_millivolts // 100 # 16 to 33
tcxo_trim_lookup = (
16,
17,
18,
22,
24,
27,
30,
33,
) # DS Table 13-35
while dv not in tcxo_trim_lookup:
dv -= 1
reg_tcxo_trim = tcxo_trim_lookup.index(dv)
self._cmd(">BI", _CMD_SET_DIO3_AS_TCXO_CTRL, (reg_tcxo_trim << 24) + timeout)
time.sleep_ms(15)
# As per DS 13.3.6 SetDIO3AsTCXOCtrl, should expect error
# value 0x20 "XOSC_START_ERR" to be flagged as XOSC has only just
# started now. So clear it.
self._clear_errors()
self._check_error()
# If DIO1 is set, mask in just the IRQs that the driver may need to be
# interrupted by. This is important because otherwise an unrelated IRQ
# can trigger the ISR and may not be reset by the driver, leaving DIO1 high.
#
# If DIO1 is not set, all IRQs can stay masked which is the power-on state.
if dio1:
# Note: we set both Irq mask and DIO1 mask to the same value, which is redundant
# (one could be 0xFFFF) but may save a few bytes of bytecode.
self._cmd(
">BHHHH",
_CMD_CFG_DIO_IRQ,
(_IRQ_RX_DONE | _IRQ_TX_DONE | _IRQ_TIMEOUT), # IRQ mask
(_IRQ_RX_DONE | _IRQ_TX_DONE | _IRQ_TIMEOUT), # DIO1 mask
0x0, # DIO2Mask, not used
0x0, # DIO3Mask, not used
)
dio1.irq(self._radio_isr, trigger=Pin.IRQ_RISING)
self._clear_irq()
self._cmd("BB", _CMD_SET_PACKET_TYPE, 1) # LoRa
if lora_cfg:
self.configure(lora_cfg)
def sleep(self, warm_start=True):
# Put the modem into sleep mode. Driver will wake the modem automatically the next
# time an operation starts, or call standby() to wake it manually.
#
# If the warm_start parameter is False (non-default) then the modem will
# lose all settings on wake. The only way to use this parameter value is
# to destroy this modem object after calling it, and then instantiate a new
# modem object on wake.
#
self._check_error() # check errors before going to sleep because we clear on wake
self.standby() # save some code size, this clears the driver's rx/tx state
self._cmd("BB", _CMD_SET_SLEEP, _flag(1 << 2, warm_start))
self._sleep = True
def _standby(self):
# Send the command for standby mode.
#
# **Don't call this function directly, call standby() instead.**
#
# (This private version doesn't update the driver's internal state.)
self._cmd("BB", _CMD_SET_STANDBY, 1) # STDBY_XOSC mode
self._clear_irq() # clear IRQs in case we just cancelled a send or receive
def is_idle(self):
# Returns True if the modem is idle (either in standby or in sleep).
#
# Note this function can return True in the case where the modem has temporarily gone to
# standby but there's a receive configured in software that will resume receiving the next
# time poll_recv() or poll_send() is called.
if self._sleep:
return True # getting status wakes from sleep
mode, _ = self._get_status()
return mode in (_STATUS_MODE_STANDBY_HSE32, _STATUS_MODE_STANDBY_RC)
def _wakeup(self):
# Wake the modem from sleep. This is called automatically the first
# time a modem command is sent after sleep() was called to put the modem to
# sleep.
#
# To manually wake the modem without initiating a new operation, call standby().
self._cs(0)
time.sleep_us(20)
self._cs(1)
self._sleep = False
self._clear_errors() # Clear "XOSC failed to start" which will reappear at this time
self._check_error() # raise an exception if any other error appears
def _decode_status(self, raw_status, check_errors=True):
# split the raw status, which often has reserved bits set, into the mode value
# and the command status value
mode = (raw_status & _STATUS_MODE_MASK) >> _STATUS_MODE_SHIFT
cmd = (raw_status & _STATUS_CMD_MASK) >> _STATUS_CMD_SHIFT
if check_errors and cmd in (_STATUS_CMD_EXEC_FAIL, _STATUS_CMD_ERROR):
raise RuntimeError("Status {},{} indicates command error".format(mode, cmd))
return (mode, cmd)
def _get_status(self):
# Issue the GetStatus command and return the decoded status of (mode value, command status)
res = self._cmd("B", _CMD_GET_STATUS, n_read=1)[0]
return self._decode_status(res)
def _check_error(self):
# Raise a RuntimeError if the radio has reported an error state.
#
# Return the decoded status, otherwise.
res = self._cmd("B", _CMD_GET_ERROR, n_read=3)
status = self._decode_status(res[0], False)
op_error = (res[1] << 8) + res[2]
if op_error != 0:
raise RuntimeError("Internal radio Status {} OpError {:#x}".format(status, op_error))
self._decode_status(res[0]) # raise an exception here if status shows an error
return status
def _clear_errors(self):
# Clear any errors flagged in the modem
self._cmd(">BH", _CMD_CLR_ERRORS, 0)
def _clear_irq(self, clear_bits=0xFFFF):
# Clear IRQs flagged in the modem
#
# By default, clears all IRQ bits. Otherwise, argument is the mask of bits to clear.
self._cmd(">BH", _CMD_CLR_IRQ_STATUS, clear_bits)
self._last_irq = None
def _set_tx_ant(self, tx_ant):
# Only STM32WL55 allows switching tx_ant from LP to HP
raise ConfigError("tx_ant")
def _symbol_offsets(self):
# Called from BaseModem.get_time_on_air_us().
#
# This function provides a way to implement the different SF5 and SF6 in SX126x,
# by returning two offsets: one for the overall number of symbols, and one for the
# number of bits used to calculate the symbol length of the payload.
return (2, -8) if self._sf in (5, 6) else (0, 0)
def configure(self, lora_cfg):
if self._rx is not False:
raise RuntimeError("Receiving")
if "preamble_len" in lora_cfg:
self._preamble_len = lora_cfg["preamble_len"]
self._invert_iq = (
lora_cfg.get("invert_iq_rx", self._invert_iq[0]),
lora_cfg.get("invert_iq_tx", self._invert_iq[1]),
self._invert_iq[2],
)
if "freq_khz" in lora_cfg:
self._rf_freq_hz = int(lora_cfg["freq_khz"] * 1000)
rffreq = (
self._rf_freq_hz << 25
) // 32_000_000 # RF-PLL frequency = 32e^6 * RFFreq / 2^25
if not rffreq:
raise ConfigError("freq_khz") # set to a value too low
self._cmd(">BI", _CMD_SET_RF_FREQUENCY, rffreq)
if "syncword" in lora_cfg:
syncword = lora_cfg["syncword"]
if syncword < 0x100:
# "Translation from SX127x to SX126x : 0xYZ -> 0xY4Z4 :
# if you do not set the two 4 you might lose sensitivity"
# see
# https://www.thethingsnetwork.org/forum/t/should-private-lorawan-networks-use-a-different-sync-word/34496/15
syncword = 0x0404 + ((syncword & 0x0F) << 4) + ((syncword & 0xF0) << 8)
self._cmd(">BBH", _CMD_WRITE_REGISTER, _REG_LSYNCRH, syncword)
if "output_power" in lora_cfg:
pa_config_args, self._output_power = self._get_pa_tx_params(lora_cfg["output_power"])
self._cmd("BBBBB", _CMD_SET_PA_CONFIG, *pa_config_args)
if "pa_ramp_us" in lora_cfg:
self._ramp_val = self._get_pa_ramp_val(
lora_cfg, [10, 20, 40, 80, 200, 800, 1700, 3400]
)
if "output_power" in lora_cfg or "pa_ramp_us" in lora_cfg:
# Only send the SetTxParams command if power level or PA ramp time have changed
self._cmd("BBB", _CMD_SET_TX_PARAMS, self._output_power, self._ramp_val)
if any(key in lora_cfg for key in ("sf", "bw", "coding_rate")):
if "sf" in lora_cfg:
self._sf = lora_cfg["sf"]
if self._sf < _CFG_SF_MIN or self._sf > _CFG_SF_MAX:
raise ConfigError("sf")
if "bw" in lora_cfg:
self._bw = lora_cfg["bw"]
if "coding_rate" in lora_cfg:
self._coding_rate = lora_cfg["coding_rate"]
if self._coding_rate < 4 or self._coding_rate > 8: # 4/4 through 4/8, linearly
raise ConfigError("coding_rate")
bw_val, self._bw_hz = {
"7.8": (0x00, 7800),
"10.4": (0x08, 10400),
"15.6": (0x01, 15600),
"20.8": (0x09, 20800),
"31.25": (0x02, 31250),
"41.7": (0x0A, 41700),
"62.5": (0x03, 62500),
"125": (0x04, 125000),
"250": (0x05, 250000),
"500": (0x06, 500000),
}[str(self._bw)]
self._cmd(
"BBBBB",
_CMD_SET_MODULATION_PARAMS,
self._sf,
bw_val,
self._coding_rate - 4, # 4/4=0, 4/5=1, etc
self._get_ldr_en(), # Note: BaseModem.get_n_symbols_x4() depends on this logic
)
if "rx_boost" in lora_cfg:
# See DS Table 9-3 "Rx Gain Configuration"
self._reg_write(_REG_RX_GAIN, 0x96 if lora_cfg["rx_boost"] else 0x94)
self._check_error()
def _invert_workaround(self, enable):
# Apply workaround for DS 15.4 Optimizing the Inverted IQ Operation
if self._invert_iq[2] != enable:
val = self._read_read(_REG_IQ_POLARITY_SETUP)
val = (val & ~4) | _flag(4, enable)
self._reg_write(_REG_IQ_POLARITY_SETUP, val)
self._invert_iq[2] = enable
def _get_irq(self):
# Get currently set IRQ bits.
irq_status = self._cmd("B", _CMD_GET_IRQ_STATUS, n_read=3)
status = self._decode_status(irq_status[0])
flags = (irq_status[1] << 8) + irq_status[2]
if _DEBUG:
print("Status {} flags {:#x}".format(status, flags))
return flags
def calibrate(self):
# Send the Calibrate command to the radio to calibrate RC oscillators, PLL and ADC.
#
# See DS 13.1.12 Calibrate Function
# calibParam 0xFE means to calibrate all blocks.
self._cmd("<BB", _CMD_CALIBRATE, 0xFE)
time.sleep_us(_CALIBRATE_TYPICAL_TIME_US)
# a falling edge of BUSY indicates calibration is done
self._wait_not_busy(_CALIBRATE_TIMEOUT_US)
def calibrate_image(self):
# Send the CalibrateImage command to the modem to improve reception in
# the currently configured frequency band.
#
# See DS 9.2.1 Image Calibration for Specified Frequency Bands
# and 13.1.13 CalibrateImage
mhz = self._rf_freq_hz // 1_000_000
if 430 <= mhz <= 440:
args = 0x6B6F
elif 470 <= mhz <= 510:
args = 0x7581
elif 779 <= mhz <= 787:
args = 0xC1C5
elif 863 <= mhz <= 870:
args = 0xD7DB
elif 902 <= mhz <= 928:
args = 0xE1E9
else:
# DS says "Contact your Semtech representative for the other optimal
# calibration settings outside of the given frequency bands"
raise ValueError()
self._cmd(">BH", _CMD_CALIBRATE_IMAGE, args)
# Can't find anythign in Datasheet about how long image calibration
# takes or exactly how it signals completion. Assuming it will be
# similar to _CMD_CALIBRATE.
self._wait_not_busy(_CALIBRATE_TIMEOUT_US)
def start_recv(self, timeout_ms=None, continuous=False, rx_length=0xFF):
# Start receiving.
#
# Part of common low-level modem API, see README.md for usage.
super().start_recv(timeout_ms, continuous, rx_length) # sets _rx
if self._tx:
# Send is in progress and has priority, _check_recv() will start recv
# once send finishes (caller needs to call poll_send() for this to happen.)
if _DEBUG:
print("Delaying receive until send completes")
return self._dio1
# Put the modem in a known state. It's possible a different
# receive was in progress, this prevent anything changing while
# we set up the new receive
self._standby() # calling private version to keep driver state as-is
# Allocate the full FIFO for RX
self._cmd("BBB", _CMD_SET_BUFFER_BASE_ADDRESS, 0xFF, 0x0)
self._cmd(
">BHBBBB",
_CMD_SET_PACKET_PARAMS,
self._preamble_len,
self._implicit_header,
rx_length, # PayloadLength, only used in implicit header mode
self._crc_en, # CRCType, only used in implicit header mode
self._invert_iq[0], # InvertIQ
)
self._invert_workaround(self._invert_iq[0])
if continuous:
timeout = _CONTINUOUS_TIMEOUT_VAL
elif timeout_ms is not None:
timeout = max(1, timeout_ms * 64) # units of 15.625us
else:
timeout = 0 # Single receive mode, no timeout
self._cmd(">BBH", _CMD_SET_RX, timeout >> 16, timeout)
return self._dio1
def poll_recv(self, rx_packet=None):
old_rx = self._rx
rx = super().poll_recv(rx_packet)
if rx is not True and old_rx is not False and isinstance(old_rx, int):
# Receiving has just stopped, and a timeout was previously set.
#
# Workaround for errata DS 15.3 "Implicit Header Mode Timeout Behaviour",
# which recommends to add the following after "ANY Rx with Timeout active sequence"
self._reg_write(_REG_RTC_CTRL, 0x00)
self._reg_write(_REG_EVT_CLR, self._reg_read(_REG_EVT_CLR) | _REG_EVT_CLR_MASK)
return rx
def _rx_flags_success(self, flags):
# Returns True if IRQ flags indicate successful receive.
# Specifically, from the bits in _IRQ_DRIVER_RX_MASK:
# - _IRQ_RX_DONE must be set
# - _IRQ_TIMEOUT must not be set
# - _IRQ_CRC_ERR must not be set
# - _IRQ_HEADER_ERR must not be set
#
# (Note: this is a function because the result for SX1276 depends on
# current config, but the result is constant here.)
return flags & _IRQ_DRIVER_RX_MASK == _IRQ_RX_DONE
def _read_packet(self, rx_packet, flags):
# Private function to read received packet (RxPacket object) from the
# modem, if there is one.
#
# Called from poll_recv() function, which has already checked the IRQ flags
# and verified a valid receive happened.
ticks_ms = self._get_last_irq()
res = self._cmd("B", _CMD_GET_RX_BUFFER_STATUS, n_read=3)
rx_payload_len = res[1]
rx_buffer_ptr = res[2] # should be 0
if rx_packet is None or len(rx_packet) != rx_payload_len:
rx_packet = RxPacket(rx_payload_len)
self._cmd("BB", _CMD_READ_BUFFER, rx_buffer_ptr, n_read=1, read_buf=rx_packet)
pkt_status = self._cmd("B", _CMD_GET_PACKET_STATUS, n_read=4)
rx_packet.ticks_ms = ticks_ms
rx_packet.snr = pkt_status[2] # SNR, units: dB *4
rx_packet.rssi = 0 - pkt_status[1] // 2 # RSSI, units: dBm
rx_packet.crc_error = (flags & _IRQ_CRC_ERR) != 0
return rx_packet
def prepare_send(self, packet):
# Prepare modem to start sending. Should be followed by a call to start_send()
#
# Part of common low-level modem API, see README.md for usage.
if len(packet) > 255:
raise ConfigError("packet too long")
# Put the modem in a known state. Any current receive is suspended at this point,
# but calling _check_recv() will resume it later.
self._standby() # calling private version to keep driver state as-is
self._check_error()
# Set the board antenna for correct TX mode
if self._ant_sw:
self._ant_sw.tx(self._tx_hp())
self._last_irq = None
self._cmd(
">BHBBBB",
_CMD_SET_PACKET_PARAMS,
self._preamble_len,
self._implicit_header,
len(packet),
self._crc_en,
self._invert_iq[1], # _invert_iq_tx
)
self._invert_workaround(self._invert_iq[1])
# Allocate the full FIFO for TX
self._cmd("BBB", _CMD_SET_BUFFER_BASE_ADDRESS, 0x0, 0xFF)
self._cmd("BB", _CMD_WRITE_BUFFER, 0x0, write_buf=packet)
# Workaround for DS 15.1 Modulation Quality with 500 kHZ LoRa Bandwidth
# ... apparently this needs to be done "*before each packet transmission*"
if self._bw_hz == 500_000:
self._reg_write(0x0889, self._reg_read(0x0889) & 0xFB)
else:
self._reg_write(0x0889, self._reg_read(0x0889) | 0x04)
def start_send(self):
# Actually start a send that was loaded by calling prepare_send().
#
# This is split into a separate function to allow more precise timing.
#
# The driver doesn't verify the caller has done the right thing here, the
# modem will no doubt do something weird if prepare_send() was not called!
#
# Part of common low-level modem API, see README.md for usage.
# Currently we don't pass any TX timeout argument to the modem1,
# which the datasheet ominously offers as "security" for the Host MCU if
# the send doesn't start for some reason.
self._cmd("BBBB", _CMD_SET_TX, 0x0, 0x0, 0x0)
if _DEBUG:
print("status {}".format(self._get_status()))
self._check_error()
self._tx = True
return self._dio1
def _wait_not_busy(self, timeout_us):
# Wait until the radio de-asserts the busy line
start = time.ticks_us()
ticks_diff = 0
while self._busy():
ticks_diff = time.ticks_diff(time.ticks_us(), start)
if ticks_diff > timeout_us:
raise RuntimeError("BUSY timeout")
time.sleep_us(1)
if _DEBUG and ticks_diff > 105:
# By default, debug log any busy time that takes longer than the
# datasheet-promised Typical 105us (this happens when starting the 32MHz oscillator,
# if it's turned on and off by the modem, and maybe other times.)
print(f"BUSY {ticks_diff}us")
def _cmd(self, fmt, *write_args, n_read=0, write_buf=None, read_buf=None):
# Execute an SX1262 command
# fmt - Format string suitable for use with struct.pack. First item should be 'B' and
# corresponds to the command opcode.
# write_args - Arguments suitable for struct.pack using fmt. First argument should be a
# command opcode byte.
#
# Optional arguments:
# write_buf - Extra buffer to write from (for FIFO writes). Mutually exclusive with n_read
# or read_buf.
# n_read - Number of result bytes to read back at end
# read_buf - Extra buffer to read into (for FIFO reads)
#
# Returns None if n_read==0, otherwise a memoryview of length n_read which points into a
# shared buffer (buffer will be clobbered on next call to _cmd!)
if self._sleep:
self._wakeup()
# Ensure "busy" from previously issued command has de-asserted. Usually this will
# have happened well before _cmd() is called again.
self._wait_not_busy(self._busy_timeout)
# Pack write_args into _buf and wrap a memoryview of the correct length around it
wrlen = struct.calcsize(fmt)
assert n_read + wrlen <= len(self._buf) # if this fails, make _buf bigger!
struct.pack_into(fmt, self._buf, 0, *write_args)
buf = memoryview(self._buf)[: (wrlen + n_read)]
if _DEBUG:
print(">>> {}".format(buf[:wrlen].hex()))
if write_buf:
print(">>> {}".format(write_buf.hex()))
self._cs(0)
self._spi.write_readinto(buf, buf)
if write_buf:
self._spi.write(write_buf) # Used by _CMD_WRITE_BUFFER only
if read_buf:
self._spi.readinto(read_buf, 0xFF) # Used by _CMD_READ_BUFFER only
self._cs(1)
if n_read > 0:
res = memoryview(buf)[wrlen : (wrlen + n_read)] # noqa: E203
if _DEBUG:
print("<<< {}".format(res.hex()))
return res
def _reg_read(self, addr):
return self._cmd("BBBB", _CMD_READ_REGISTER, addr >> 8, addr & 0xFF, n_read=1)[0]
def _reg_write(self, addr, val):
return self._cmd("BBBB", _CMD_WRITE_REGISTER, addr >> 8, addr & 0xFF, val & 0xFF)
class _SX1262(_SX126x):
# Don't construct this directly, construct lora.SX1262 or lora.AsyncSX1262
def __init__(
self,
spi,
cs,
busy,
dio1=None,
dio2_rf_sw=True,
dio3_tcxo_millivolts=None,
dio3_tcxo_start_time_us=1000,
reset=None,
lora_cfg=None,
ant_sw=None,
):
super().__init__(
spi,
cs,
busy,
dio1,
dio2_rf_sw,
dio3_tcxo_millivolts,
dio3_tcxo_start_time_us,
reset,
lora_cfg,
ant_sw,
)
# Apply workaround for DS 15.2 "Better Resistance of the SX1262 Tx to Antenna Mismatch
self._reg_write(0x8D8, self._reg_read(0x8D8) | 0x1E)
def _tx_hp(self):
# SX1262 has High Power only (deviceSel==0)
return True
def _get_pa_tx_params(self, output_power):
# Given an output power level in dB, return a 2-tuple:
# - First item is the 3 arguments for SetPaConfig command
# - Second item is the power level argument value for SetTxParams command.
#
# DS 13.1.14.1 "PA Optimal Settings" gives optimally efficient
# values for output power +22, +20, +17, +14 dBm and "these changes make
# the use of nominal power either sub-optimal or unachievable" (hence it
# recommends setting +22dBm nominal TX Power for all these).
#
# However the modem supports output power as low as -9dBm, and there's
# no explanation in the datasheet of how to best set other output power
# levels.
#
# Semtech's own driver (sx126x.c in LoRaMac-node) only ever executes
# SetPaConfig with the values shown in the datasheet for +22dBm, and
# then executes SetTxParams with power set to the nominal value in
# dBm.
#
# Try for best of both worlds here: If the caller requests an "Optimal"
# value, use the datasheet values. Otherwise set nominal power only as
# per Semtech's driver.
output_power = int(_clamp(output_power, -9, 22))
DEFAULT = (0x4, 0x7, 0x0, 0x1)
OPTIMAL = {
22: (DEFAULT, 22),
20: ((0x3, 0x5, 0x0, 0x1), 22),
17: ((0x2, 0x3, 0x0, 0x1), 22),
14: ((0x2, 0x2, 0x0, 0x1), 22),
}
if output_power in OPTIMAL:
# Datasheet optimal values
return OPTIMAL[output_power]
else:
# Nominal values, as per Semtech driver
return (DEFAULT, output_power & 0xFF)
class _SX1261(_SX126x):
# Don't construct this directly, construct lora.SX1261, or lora.AsyncSX1261
def __init__(
self,
spi,
cs,
busy,
dio1=None,
dio2_rf_sw=True,
dio3_tcxo_millivolts=None,
dio3_tcxo_start_time_us=1000,
reset=None,
lora_cfg=None,
ant_sw=None,
):
super().__init__(
spi,
cs,
busy,
dio1,
dio2_rf_sw,
dio3_tcxo_millivolts,
dio3_tcxo_start_time_us,
reset,
lora_cfg,
ant_sw,
)
def _tx_hp(self):
# SX1261 has Low Power only (deviceSel==1)
return False
def _get_pa_tx_params(self, output_power):
# Given an output power level in dB, return a 2-tuple:
# - First item is the 3 arguments for SetPaConfig command
# - Second item is the power level argument value for SetTxParams command.
#
# As noted above for SX1262, DS 13.1.14.1 "PA Optimal Settings"
# gives optimally efficient values for output power +15, +14, +10 dBm
# but nothing specific to the other power levels (down to -17dBm).
#
# Therefore do the same as for SX1262 to set optimal values if known, nominal otherwise.
output_power = _clamp(int(output_power), -17, 15)
DEFAULT = (0x4, 0x0, 0x1, 0x1)
OPTIMAL = {
15: ((0x06, 0x0, 0x1, 0x1), 14),
14: (DEFAULT, 14),
10: ((0x1, 0x0, 0x1, 0x1), 13),
}
if output_power == 15 and self._rf_freq_hz < 400_000_000:
# DS 13.1.14.1 has Note that PaDutyCycle is limited to 0x4 below 400MHz,
# so disallow the 15dBm optimal setting.
output_power = 14
if output_power in OPTIMAL:
# Datasheet optimal values
return OPTIMAL[output_power]
else:
# Nominal values, as per Semtech driver
return (DEFAULT, output_power & 0xFF)
# Define the actual modem classes that use the SyncModem & AsyncModem "mixin-like" classes
# to create sync and async variants.
try:
from .sync_modem import SyncModem
class SX1261(_SX1261, SyncModem):
pass
class SX1262(_SX1262, SyncModem):
pass
except ImportError:
pass
try:
from .async_modem import AsyncModem
class AsyncSX1261(_SX1261, AsyncModem):
pass
class AsyncSX1262(_SX1262, AsyncModem):
pass
except ImportError:
pass

Wyświetl plik

@ -0,0 +1,3 @@
metadata(version="0.1")
require("lora")
package("lora")

Wyświetl plik

@ -0,0 +1,886 @@
# MicroPython LoRa SX127x driver
# MIT license; Copyright (c) 2023 Angus Gratton
#
# LoRa is a registered trademark or service mark of Semtech Corporation or its affiliates.
#
# In comments, abbreviation "DS" = Semtech SX1276/77/78/79 Datasheet rev 7 (May 2020)
from micropython import const
from .modem import BaseModem, ConfigError, RxPacket, _clamp, _flag
from machine import Pin
import struct
import time
# Set _DEBUG to const(True) to print all register reads and writes, and current register values
# even when an update isn't needed. Plus a few additional pieces of information.
_DEBUG = const(False)
_WRITE_REG_BIT = const(1 << 7)
# Registers and fields as bytecode-zerocost constants
#
# Where possible names are direct from DS section 4.4
# (This means some names are slightly inconsistent, as per datasheet...)
_REG_FIFO = const(0x00)
_REG_OPMODE = const(0x01)
_OPMODE_LONGRANGEMODE_LORA = const(1 << 7)
_OPMODE_LONGRANGEMODE_FSK_OOK = const(0)
_OPMODE_MODE_MASK = const(0x7)
_OPMODE_MODE_SLEEP = const(0x0)
_OPMODE_MODE_STDBY = const(0x1)
_OPMODE_MODE_FSTX = const(0x2) # Frequency synthesis (TX)
_OPMODE_MODE_TX = const(0x3)
_OPMODE_MODE_FSRX = const(0x4) # Frequency synthesis (RX)
_OPMODE_MODE_RX_CONTINUOUS = const(0x5)
_OPMODE_MODE_RX_SINGLE = const(0x6)
_OPMODE_MODE_CAD = const(0x7) # Channel Activity Detection
_REG_FR_MSB = const(0x06)
_REG_FR_MID = const(0x07)
_REG_FR_LSB = const(0x08)
_REG_PA_CONFIG = const(0x09)
_PA_CONFIG_PASELECT_PA_BOOST_PIN = const(1 << 7)
_PA_CONFIG_PASELECT_RFO_PIN = const(0x0)
_PA_CONFIG_MAXPOWER_SHIFT = const(0x4)
_PA_CONFIG_MAXPOWER_MASK = const(0x7)
_PA_CONFIG_OUTPUTPOWER_SHIFT = const(0)
_PA_CONFIG_OUTPUTPOWER_MASK = const(0xF)
_REG_PA_RAMP = const(0x0A)
_PA_RAMP_MASK = const(0x0F)
_REG_LNA = const(0x0C)
_LNA_GAIN_MASK = const(0x7)
_LNA_GAIN_SHIFT = const(5)
_LNA_BOOST_HF_MASK = 0x3
_LNA_BOOST_HF_SHIFT = 0x0
_REG_FIFO_ADDR_PTR = const(0x0D)
_REG_FIFO_TX_BASE_ADDR = const(0x0E)
_REG_FIFO_RX_BASE_ADDR = const(0x0F)
_REG_FIFO_RX_CURRENT_ADDR = const(0x10)
_REG_IRQ_FLAGS_MASK = const(0x11)
_REG_IRQ_FLAGS = const(0x12)
# IRQ mask bits are the same as the IRQ flag bits
_IRQ_RX_TIMEOUT = const(1 << 7)
_IRQ_RX_DONE = const(1 << 6)
_IRQ_PAYLOAD_CRC_ERROR = const(1 << 5)
_IRQ_VALID_HEADER = const(1 << 4)
_IRQ_TX_DONE = const(1 << 3)
_IRQ_CAD_DONE = const(1 << 2)
_IRQ_FHSS_CHANGE_CHANNEL = const(1 << 1)
_IRQ_CAD_DETECTED = const(1 << 0)
_REG_RX_NB_BYTES = const(0x13)
_REG_RX_HEADER_CNT_VALUE_MSB = const(0x14)
_REG_RX_HEADER_CNT_VALUE_LSB = const(0x13)
_REG_RX_PACKET_CNT_VALUE_MSB = const(0x16)
_REG_RX_PACKET_CNT_VALUE_LSB = const(0x17)
_REG_MODEM_STAT = const(0x18)
_MODEM_STAT_RX_CODING_RATE_MASK = const(0xE)
_MODEM_STAT_RX_CODING_RATE_SHIFT = const(5)
_MODEM_STAT_MODEM_CLEAR = const(1 << 4)
_MODEM_STAT_HEADER_INFO_VALID = const(1 << 3)
_MODEM_STAT_RX_ONGOING = const(1 << 2)
_MODEM_STAT_SIGNAL_SYNC = const(1 << 1) # Signal synchronized
_MODEM_STAT_SIGNAL_DET = const(1 << 0) # Signal detected
_REG_PKT_SNR_VAL = const(0x19)
_REG_PKT_RSSI_VAL = const(0x1A)
_REG_RSSI_VAL = const(0x1B)
_REG_HOP_CHANNEL = const(0x1C)
_HOP_CHANNEL_PLL_TIMEOUT = const(1 << 7)
_HOP_CHANNEL_CRC_ON_PAYLOAD = const(1 << 6)
_HOP_CHANNEL_FHSS_PRESENT_CHANNEL_MASK = const(0x1F)
_REG_MODEM_CONFIG1 = const(0x1D)
_MODEM_CONFIG1_BW_MASK = const(0xF)
_MODEM_CONFIG1_BW_SHIFT = const(4)
_MODEM_CONFIG1_BW7_8 = const(0x0)
_MODEM_CONFIG1_BW10_4 = const(0x1)
_MODEM_CONFIG1_BW15_6 = const(0x2)
_MODEM_CONFIG1_BW20_8 = const(0x3)
_MODEM_CONFIG1_BW31_25 = const(0x4)
_MODEM_CONFIG1_BW41_7 = const(0x5)
_MODEM_CONFIG1_BW62_5 = const(0x6)
_MODEM_CONFIG1_BW125 = const(0x7)
_MODEM_CONFIG1_BW250 = const(0x8) # not supported in lower band (169MHz)
_MODEM_CONFIG1_BW500 = const(0x9) # not supported in lower band (169MHz)
_MODEM_CONFIG1_CODING_RATE_MASK = const(0x7)
_MODEM_CONFIG1_CODING_RATE_SHIFT = const(1)
_MODEM_CONFIG1_CODING_RATE_45 = const(0b001)
_MODEM_CONFIG1_CODING_RATE_46 = const(0b010)
_MODEM_CONFIG1_CODING_RATE_47 = const(0b011)
_MODEM_CONFIG1_CODING_RATE_48 = const(0b100)
_MODEM_CONFIG1_IMPLICIT_HEADER_MODE_ON = const(1 << 0)
_REG_MODEM_CONFIG2 = const(0x1E)
_MODEM_CONFIG2_SF_MASK = const(0xF) # Spreading Factor
_MODEM_CONFIG2_SF_SHIFT = const(4)
# SF values are integers 6-12 for SF6-SF12, so skipping constants for these
_MODEM_CONFIG2_SF_MIN = const(6) # inclusive
_MODEM_CONFIG2_SF_MAX = const(12) # inclusive
_MODEM_CONFIG2_TX_CONTINUOUS = const(1 << 3)
_MODEM_CONFIG2_RX_PAYLOAD_CRC_ON = const(1 << 2)
_MODEM_CONFIG2_SYMB_TIMEOUT_MSB_MASK = 0x3
_REG_SYMB_TIMEOUT_LSB = const(0x1F)
_REG_PREAMBLE_LEN_MSB = const(0x20)
_REG_PREAMBLE_LEN_LSB = const(0x21)
_REG_PAYLOAD_LEN = const(0x22) # Only for implicit header mode & TX
_REG_MAX_PAYLOAD_LEN = const(0x23)
_REG_HOP_PERIOD = const(0x24)
_REG_FIFO_TXBYTE_ADDR = const(0x25)
_REG_MODEM_CONFIG3 = const(0x26)
_MODEM_CONFIG3_AGC_ON = const(1 << 2)
_MODEM_CONFIG3_LOW_DATA_RATE_OPTIMIZE = const(1 << 3)
_REG_DETECT_OPTIMIZE = const(0x31)
_DETECT_OPTIMIZE_AUTOMATIC_IF_ON = const(
1 << 7
) # Bit should be cleared after reset, as per errata
_DETECT_OPTIMIZE_MASK = 0x7
_DETECT_OPTIMIZE_SF6 = const(0x05)
_DETECT_OPTIMIZE_OTHER = const(0x03)
# RegInvertIQ is not correctly documented in DS Rev 7 (May 2020).
#
# The correct behaviour for interoperability with other LoRa devices is as
# written here:
# https://github.com/eclipse/upm/blob/master/src/sx1276/sx1276.cxx#L1310
#
# Same as used in the Semtech mbed driver, here:
# https://github.com/ARMmbed/mbed-semtech-lora-rf-drivers/blob/master/SX1276/SX1276_LoRaRadio.cpp#L778
# https://github.com/ARMmbed/mbed-semtech-lora-rf-drivers/blob/master/SX1276/registers/sx1276Regs-LoRa.h#L443
#
# Specifically:
# - The TX bit in _REG_INVERT_IQ is opposite to what's documented in the datasheet
# (0x01 normal, 0x00 inverted)
# - The RX bit in _REG_INVERT_IQ is as documented in the datasheet (0x00 normal, 0x40 inverted)
# - When enabling LoRa mode, the default register value becomes 0x27 (normal RX & TX)
# rather than the documented power-on value of 0x26.
_REG_INVERT_IQ = const(0x33)
_INVERT_IQ_RX = const(1 << 6)
_INVERT_IQ_TX_OFF = const(1 << 0)
_REG_DETECTION_THRESHOLD = const(0x37)
_DETECTION_THRESHOLD_SF6 = const(0x0C)
_DETECTION_THRESHOLD_OTHER = const(0x0A) # SF7 to SF12
_REG_SYNC_WORD = const(0x39)
_REG_FSKOOK_IMAGE_CAL = const(0x3B) # NOTE: Only accessible in FSK/OOK mode
_IMAGE_CAL_START = const(1 << 6)
_IMAGE_CAL_RUNNING = const(1 << 5)
_IMAGE_CAL_AUTO = const(1 << 7)
_REG_INVERT_IQ2 = const(0x3B)
_INVERT_IQ2_ON = const(0x19)
_INVERT_IQ2_OFF = const(0x1D)
_REG_DIO_MAPPING1 = const(0x40)
_DIO0_MAPPING_MASK = const(0x3)
_DIO0_MAPPING_SHIFT = const(6)
_DIO1_MAPPING_MASK = const(0x3)
_DIO1_MAPPING_SHIFT = const(4)
_DIO2_MAPPING_MASK = const(0x3)
_DIO2_MAPPING_SHIFT = const(2)
_DIO3_MAPPING_MASK = const(0x3)
_DIO3_MAPPING_SHIFT = const(0)
_REG_DIO_MAPPING2 = const(0x41)
_DIO4_MAPPING_MASK = const(0x3)
_DIO4_MAPPING_SHIFT = const(6)
_DIO5_MAPPING_MASK = const(0x3)
_DIO5_MAPPING_SHIFT = const(4)
_REG_PA_DAC = const(0x4D)
_PA_DAC_DEFAULT_VALUE = const(0x84) # DS 3.4.3 High Power +20 dBm Operation
_PA_DAC_HIGH_POWER_20DBM = const(0x87)
_REG_VERSION = const(0x42)
# IRQs the driver masks in when receiving
_IRQ_DRIVER_RX_MASK = const(
_IRQ_RX_DONE | _IRQ_RX_TIMEOUT | _IRQ_VALID_HEADER | _IRQ_PAYLOAD_CRC_ERROR
)
class _SX127x(BaseModem):
# Don't instantiate this class directly, instantiate either lora.SX1276,
# lora.SX1277, lora.SX1278, lora.SX1279, or lora.AsyncSX1276,
# lora.AsyncSX1277, lora.AsyncSX1278, lora.AsyncSX1279 as applicable.
# common IRQ masks used by the base class functions
_IRQ_RX_COMPLETE = _IRQ_RX_DONE | _IRQ_RX_TIMEOUT
_IRQ_TX_COMPLETE = _IRQ_TX_DONE
def __init__(self, spi, cs, dio0=None, dio1=None, reset=None, lora_cfg=None, ant_sw=None):
super().__init__(ant_sw)
self._buf1 = bytearray(1) # shared small buffers
self._buf2 = bytearray(2)
self._spi = spi
self._cs = cs
self._dio0 = dio0
self._dio1 = dio1
cs.init(Pin.OUT, value=1)
if dio0:
dio0.init(Pin.IN)
dio0.irq(self._radio_isr, trigger=Pin.IRQ_RISING)
if dio1:
dio1.init(Pin.IN)
dio1.irq(self._radio_isr, trigger=Pin.IRQ_RISING)
# Configuration settings that need to be tracked by the driver
# Note: a number of these are set in the base class constructor
self._pa_boost = False
if reset:
# If the user supplies a reset pin argument, reset the radio
reset.init(Pin.OUT, value=0)
time.sleep_ms(1)
reset(1)
time.sleep_ms(5)
version = self._reg_read(_REG_VERSION)
if version != 0x12:
raise RuntimeError("Unexpected silicon version {}".format(version))
# wake the radio and enable LoRa mode if it's not already set
self._set_mode(_OPMODE_MODE_STDBY)
if lora_cfg:
self.configure(lora_cfg)
def configure(self, lora_cfg):
if self._rx is not False:
raise RuntimeError("Receiving")
# Set frequency
if "freq_khz" in lora_cfg:
# Assuming F(XOSC)=32MHz (datasheet both implies this value can be different, and
# specifies it shouldn't be different!)
self._rf_freq_hz = int(lora_cfg["freq_khz"] * 1000)
fr_val = self._rf_freq_hz * 16384 // 1000_000
buf = bytes([fr_val >> 16, (fr_val >> 8) & 0xFF, fr_val & 0xFF])
self._reg_write(_REG_FR_MSB, buf)
# Turn on/off automatic image re-calibration if temperature changes. May lead to dropped
# packets if enabled.
if "auto_image_cal" in lora_cfg:
self._set_mode(_OPMODE_MODE_STDBY, False) # Disable LoRa mode to access FSK/OOK
self._reg_update(
_REG_FSKOOK_IMAGE_CAL,
_IMAGE_CAL_AUTO,
_flag(_IMAGE_CAL_AUTO, lora_cfg["auto_image_cal"]),
)
self._set_mode(_OPMODE_MODE_STDBY) # Switch back to LoRa mode
# Note: Common pattern below is to generate a new register value and an update_mask,
# and then call self._reg_update(). self._reg_update() is a
# no-op if update_mask==0 (no bits to change).
# Update _REG_PA_CONFIG
pa_config = 0x0
update_mask = 0x0
# Ref DS 3.4.2 "RF Power Amplifiers"
if "tx_ant" in lora_cfg:
self._pa_boost = lora_cfg["tx_ant"].upper() == "PA_BOOST"
pa_boost_bit = (
_PA_CONFIG_PASELECT_PA_BOOST_PIN if self._pa_boost else _PA_CONFIG_PASELECT_RFO_PIN
)
pa_config |= pa_boost_bit
update_mask |= pa_boost_bit
if not self._pa_boost:
# When using RFO, _REG_PA_DAC can keep default value always
# (otherwise, it's set when output_power is set in next block)
self._reg_write(_REG_PA_DAC, _PA_DAC_DEFAULT_VALUE)
if "output_power" in lora_cfg:
# See DS 3.4.2 RF Power Amplifiers
dbm = int(lora_cfg["output_power"])
if self._pa_boost:
if dbm >= 20:
output_power = 0x15 # 17dBm setting
pa_dac = _PA_DAC_HIGH_POWER_20DBM
else:
dbm = _clamp(dbm, 2, 17) # +2 to +17dBm only
output_power = dbm - 2
pa_dac = _PA_DAC_DEFAULT_VALUE
self._reg_write(_REG_PA_DAC, pa_dac)
else:
# In RFO mode, Output Power is computed from two register fields
# - MaxPower and OutputPower.
#
# Do what the Semtech LoraMac-node driver does here, which is to
# set max_power at one extreme or the other (0 or 7) and then
# calculate the output_power setting based on this baseline.
dbm = _clamp(dbm, -4, 15)
if dbm > 0:
# MaxPower to maximum
pa_config |= _PA_CONFIG_MAXPOWER_MASK << _PA_CONFIG_MAXPOWER_SHIFT
# Pout (dBm) == 10.8dBm + 0.6*maxPower - (15 - register value)
# 10.8+0.6*7 == 15dBm, so pOut = register_value (0 to 15 dBm)
output_power = dbm
else:
# MaxPower field will be set to 0
# Pout (dBm) == 10.8dBm - (15 - OutputPower)
# OutputPower == Pout (dBm) + 4.2
output_power = dbm + 4 # round down to 4.0, to keep using integer math
pa_config |= output_power << _PA_CONFIG_OUTPUTPOWER_SHIFT
update_mask |= (
_PA_CONFIG_OUTPUTPOWER_MASK << _PA_CONFIG_OUTPUTPOWER_SHIFT
| _PA_CONFIG_MAXPOWER_MASK << _PA_CONFIG_MAXPOWER_SHIFT
)
self._reg_update(_REG_PA_CONFIG, update_mask, pa_config)
if "pa_ramp_us" in lora_cfg:
# other fields in this register are reserved to 0 or unused
self._reg_write(
_REG_PA_RAMP,
self._get_pa_ramp_val(
lora_cfg,
[10, 12, 15, 20, 25, 31, 40, 50, 62, 100, 125, 250, 500, 1000, 2000, 3400],
),
)
# If a hard reset happened then flags should be cleared already and mask should
# default to fully enabled, but let's be "belts and braces" sure
self._reg_write(_REG_IRQ_FLAGS, 0xFF)
self._reg_write(_REG_IRQ_FLAGS_MASK, 0) # do IRQ masking in software for now
# Update MODEM_CONFIG1
modem_config1 = 0x0
update_mask = 0x0
if "bw" in lora_cfg:
bw = str(lora_cfg["bw"])
bw_reg_val, self._bw_hz = {
"7.8": (_MODEM_CONFIG1_BW7_8, 7800),
"10.4": (_MODEM_CONFIG1_BW10_4, 10400),
"15.6": (_MODEM_CONFIG1_BW15_6, 15600),
"20.8": (_MODEM_CONFIG1_BW20_8, 20800),
"31.25": (_MODEM_CONFIG1_BW31_25, 31250),
"41.7": (_MODEM_CONFIG1_BW41_7, 41700),
"62.5": (_MODEM_CONFIG1_BW62_5, 62500),
"125": (_MODEM_CONFIG1_BW125, 125000),
"250": (_MODEM_CONFIG1_BW250, 250000),
"500": (_MODEM_CONFIG1_BW500, 500000),
}[bw]
modem_config1 |= bw_reg_val << _MODEM_CONFIG1_BW_SHIFT
update_mask |= _MODEM_CONFIG1_BW_MASK << _MODEM_CONFIG1_BW_SHIFT
if "freq_khz" in lora_cfg or "bw" in lora_cfg:
# Workaround for Errata Note 2.1 "Sensitivity Optimization with a 500 kHz bandwidth"
if self._bw_hz == 500000 and 862_000_000 <= self._rf_freq_hz <= 1020_000_000:
self._reg_write(0x36, 0x02)
self._reg_write(0x3A, 0x64)
elif self._bw_hz == 500000 and 410_000_000 <= self._rf_freq_hz <= 525_000_000:
self._reg_write(0x36, 0x02)
self._reg_write(0x3A, 0x7F)
else:
# "For all other combinations of bandiwdth/frequencies, register at address 0x36
# should be re-set to value 0x03 and the value at address 0x3a will be
# automatically selected by the chip"
self._reg_write(0x36, 0x03)
if "coding_rate" in lora_cfg:
self._coding_rate = int(lora_cfg["coding_rate"])
if self._coding_rate < 5 or self._coding_rate > 8:
raise ConfigError("coding_rate")
# _MODEM_CONFIG1_CODING_RATE_45 == value 5 == 1
modem_config1 |= (self._coding_rate - 4) << _MODEM_CONFIG1_CODING_RATE_SHIFT
update_mask |= _MODEM_CONFIG1_CODING_RATE_MASK << _MODEM_CONFIG1_CODING_RATE_SHIFT
self._reg_update(_REG_MODEM_CONFIG1, update_mask, modem_config1)
if "implicit_header" in lora_cfg:
self._implicit_header = lora_cfg["implicit_header"]
modem_config1 |= _flag(_MODEM_CONFIG1_IMPLICIT_HEADER_MODE_ON, self._implicit_header)
update_mask |= _MODEM_CONFIG1_IMPLICIT_HEADER_MODE_ON
# Update MODEM_CONFIG2, for any fields that changed
modem_config2 = 0
update_mask = 0
if "sf" in lora_cfg:
sf = self._sf = int(lora_cfg["sf"])
if sf < _MODEM_CONFIG2_SF_MIN or sf > _MODEM_CONFIG2_SF_MAX:
raise ConfigError("sf")
if sf == 6 and not self._implicit_header:
# DS 4.1.12 "Spreading Factor"
raise ConfigError("SF6 requires implicit_header mode")
# Update these registers when writing 'SF'
self._reg_write(
_REG_DETECTION_THRESHOLD,
_DETECTION_THRESHOLD_SF6 if sf == 6 else _DETECTION_THRESHOLD_OTHER,
)
# This field has a reserved non-zero field, so do a read-modify-write update
self._reg_update(
_REG_DETECT_OPTIMIZE,
_DETECT_OPTIMIZE_AUTOMATIC_IF_ON | _DETECT_OPTIMIZE_MASK,
_DETECT_OPTIMIZE_SF6 if sf == 6 else _DETECT_OPTIMIZE_OTHER,
)
modem_config2 |= sf << _MODEM_CONFIG2_SF_SHIFT
update_mask |= _MODEM_CONFIG2_SF_MASK << _MODEM_CONFIG2_SF_SHIFT
if "crc_en" in lora_cfg:
self._crc_en = lora_cfg["crc_en"]
# I had to double-check the datasheet about this point:
# 1. In implicit header mode, this bit is used on both RX & TX and
# should be set to get CRC generation on TX and/or checking on RX.
# 2. In explicit header mode, this bit is only used on TX (should CRC
# be added and CRC flag set in header) and ignored on RX (CRC flag
# read from header instead).
modem_config2 |= _flag(_MODEM_CONFIG2_RX_PAYLOAD_CRC_ON, self._crc_en)
update_mask |= _MODEM_CONFIG2_RX_PAYLOAD_CRC_ON
self._reg_update(_REG_MODEM_CONFIG2, update_mask, modem_config2)
# Update _REG_INVERT_IQ
#
# See comment about this register's undocumented weirdness at top of
# file above _REG_INVERT_IQ constant.
#
# Note also there is a second register invert_iq2 which may be set differently
# for transmit vs receive, see _set_invert_iq2() for that one.
invert_iq = 0x0
update_mask = 0x0
if "invert_iq_rx" in lora_cfg:
self._invert_iq[0] = lora_cfg["invert_iq_rx"]
invert_iq |= _flag(_INVERT_IQ_RX, lora_cfg["invert_iq_rx"])
update_mask |= _INVERT_IQ_RX
if "invert_iq_tx" in lora_cfg:
self._invert_iq[1] = lora_cfg["invert_iq_tx"]
invert_iq |= _flag(_INVERT_IQ_TX_OFF, not lora_cfg["invert_iq_tx"]) # Inverted
update_mask |= _INVERT_IQ_TX_OFF
self._reg_update(_REG_INVERT_IQ, update_mask, invert_iq)
if "preamble_len" in lora_cfg:
self._preamble_len = lora_cfg["preamble_len"]
self._reg_write(_REG_PREAMBLE_LEN_MSB, struct.pack(">H", self._preamble_len))
# Update MODEM_CONFIG3, for any fields that have changed
modem_config3 = 0
update_mask = 0
if "sf" in lora_cfg or "bw" in lora_cfg:
# Changing either SF or BW means the Low Data Rate Optimization may need to be changed
#
# note: BaseModem.get_n_symbols_x4() assumes this value is set automatically
# as follows.
modem_config3 |= _flag(_MODEM_CONFIG3_LOW_DATA_RATE_OPTIMIZE, self._get_ldr_en())
update_mask |= _MODEM_CONFIG3_LOW_DATA_RATE_OPTIMIZE
if "lna_gain" in lora_cfg:
lna_gain = lora_cfg["lna_gain"]
update_mask |= _MODEM_CONFIG3_AGC_ON
if lna_gain is None: # Setting 'None' means 'Auto'
modem_config3 |= _MODEM_CONFIG3_AGC_ON
else: # numeric register value
# Clear the _MODEM_CONFIG3_AGC_ON bit, and write the manual LNA gain level 1-6
# to the register
self._reg_update(
_REG_LNA, _LNA_GAIN_MASK << _LNA_GAIN_SHIFT, lna_gain << _LNA_GAIN_SHIFT
)
if "rx_boost" in lora_cfg:
self._reg_update(
_REG_LNA,
_LNA_BOOST_HF_MASK << _LNA_BOOST_HF_SHIFT,
_flag(0x3, lora_cfg["lna_boost_hf"]),
)
self._reg_update(_REG_MODEM_CONFIG3, update_mask, modem_config3)
def _reg_write(self, reg, value):
self._cs(0)
if isinstance(value, int):
self._buf2[0] = reg | _WRITE_REG_BIT
self._buf2[1] = value
self._spi.write(self._buf2)
if _DEBUG:
dbg = hex(value)
else: # value is a buffer
self._buf1[0] = reg | _WRITE_REG_BIT
self._spi.write(self._buf1)
self._spi.write(value)
if _DEBUG:
dbg = value.hex()
self._cs(1)
if _DEBUG:
print("W {:#x} ==> {}".format(reg, dbg))
self._reg_read(reg) # log the readback as well
def _reg_update(self, reg, update_mask, new_value):
# Update register address 'reg' with byte value new_value, as masked by
# bit mask update_mask. Bits not set in update_mask will be kept at
# their pre-existing values in the register.
#
# If update_mask is zero, this function is a no-op and returns None.
# If update_mask is not zero, this function updates 'reg' and returns
# the previous complete value of 'reg' as a result.
#
# Note: this function has no way of detecting a race condition if the
# modem updates any bits in 'reg' that are unset in update_mask, at the
# same time a read/modify/write is occurring. Any such changes are
# overwritten with the original values.
if not update_mask: # short-circuit if nothing to change
if _DEBUG:
# Log the current value if DEBUG is on
# (Note the compiler will optimize this out otherwise)
self._reg_read(reg)
return
old_value = self._reg_read(reg)
value = ((old_value & ~update_mask) & 0xFF) | (new_value & update_mask)
if old_value != value:
self._reg_write(reg, value)
return old_value
def _reg_read(self, reg):
# Read and return a single register value at address 'reg'
self._buf2[0] = reg
self._buf2[1] = 0xFF
self._cs(0)
self._spi.write_readinto(self._buf2, self._buf2)
self._cs(1)
if _DEBUG:
print("R {:#x} <== {:#x}".format(reg, self._buf2[1]))
return self._buf2[1]
def _reg_readinto(self, reg, buf):
# Read and return one or more register values starting at address 'reg',
# into buffer 'buf'.
self._cs(0)
self._spi.readinto(self._buf1, reg)
self._spi.readinto(buf)
if _DEBUG:
print("R {:#x} <== {}".format(reg, buf.hex()))
self._cs(1)
def _get_mode(self):
# Return the current 'Mode' field in RegOpMode
return self._reg_read(_REG_OPMODE) & _OPMODE_MODE_MASK
def _set_mode(self, mode, lora_en=True):
# Set the 'Mode' and 'LongRangeMode' fields in RegOpMode
# according to 'mode' and 'lora_en', respectively.
#
# If enabling or disabling LoRa mode, the radio is automatically
# switched into Sleep mode as required and then the requested mode is
# set (if not sleep mode).
#
# Returns the previous value of the RegOpMode register (unmasked).
mask = _OPMODE_LONGRANGEMODE_LORA | _OPMODE_MODE_MASK
lora_val = _flag(_OPMODE_LONGRANGEMODE_LORA, lora_en)
old_value = self._reg_read(_REG_OPMODE)
new_value = (old_value & ~mask) | lora_val | mode
if lora_val != (old_value & _OPMODE_LONGRANGEMODE_LORA):
# Need to switch into Sleep mode in order to change LongRangeMode flag
self._reg_write(_REG_OPMODE, _OPMODE_MODE_SLEEP | lora_val)
if new_value != old_value:
self._reg_write(_REG_OPMODE, new_value)
if _DEBUG:
print(
"Mode {} -> {} ({:#x})".format(
old_value & _OPMODE_MODE_MASK, mode, self._reg_read(_REG_OPMODE)
)
)
return old_value
def _set_invert_iq2(self, val):
# Set the InvertIQ2 register on/off as needed, unless it is already set to the correct
# level
if self._invert_iq[2] == val:
return # already set to the level we want
self._reg_write(_REG_INVERT_IQ2, _INVERT_IQ2_ON if val else _INVERT_IQ2_OFF)
self._invert_iq[2] = val
def _standby(self):
# Send the command for standby mode.
#
# **Don't call this function directly, call standby() instead.**
#
# (This private version doesn't update the driver's internal state.)
old_mode = self._set_mode(_OPMODE_MODE_STDBY) & _OPMODE_MODE_MASK
if old_mode not in (_OPMODE_MODE_STDBY, _OPMODE_MODE_SLEEP):
# If we just cancelled sending or receiving, clear any pending IRQs
self._reg_write(_REG_IRQ_FLAGS, 0xFF)
def sleep(self):
# Put the modem into sleep mode. Modem will wake automatically the next
# time host asks it for something, or call standby() to wake it manually.
self.standby() # save some code size, this clears driver state for us
self._set_mode(_OPMODE_MODE_SLEEP)
def is_idle(self):
# Returns True if the modem is idle (either in standby or in sleep).
#
# Note this function can return True in the case where the modem has temporarily gone to
# standby, but there's a receive configured in software that will resume receiving the
# next time poll_recv() or poll_send() is called.
return self._get_mode() in (_OPMODE_MODE_STDBY, _OPMODE_MODE_SLEEP)
def calibrate_image(self):
# Run the modem Image & RSSI calibration process to improve receive performance.
#
# calibration will be run in the HF or LF band automatically, depending on the
# current radio configuration.
#
# See DS 2.1.3.8 Image and RSSI Calibration. Idea to disable TX power
# comes from Semtech's sx1276 driver which does this.
pa_config = self._reg_update(_REG_PA_CONFIG, 0xFF, 0) # disable TX power
self._set_mode(_OPMODE_MODE_STDBY, False) # Switch to FSK/OOK mode to expose RegImageCal
self._reg_update(_REG_FSKOOK_IMAGE_CAL, _IMAGE_CAL_START, _IMAGE_CAL_START)
while self._reg_read(_REG_FSKOOK_IMAGE_CAL) & _IMAGE_CAL_RUNNING:
time.sleep_ms(1)
self._set_mode(_OPMODE_MODE_STDBY) # restore LoRA mode
self._reg_write(_REG_PA_CONFIG, pa_config) # restore previous TX power
def calibrate(self):
# Run a full calibration.
#
# For SX1276, this means just the image & RSSI calibration as no other runtime
# calibration is implemented in the modem.
self.calibrate_image()
def start_recv(self, timeout_ms=None, continuous=False, rx_length=0xFF):
# Start receiving.
#
# Part of common low-level modem API, see README.md for usage.
super().start_recv(timeout_ms, continuous, rx_length) # sets self._rx
# will_irq if DIO0 and DIO1 both hooked up, or DIO0 and no timeout
will_irq = self._dio0 and (self._dio1 or timeout_ms is None)
if self._tx:
# Send is in progress and has priority, _check_recv() will start receive
# once send finishes (caller needs to call poll_send() for this to happen.)
if _DEBUG:
print("Delaying receive until send completes")
return will_irq
# Put the modem in a known state. It's possible a different
# receive was in progress, this prevent anything changing while
# we set up the new receive
self._standby() # calling private version to keep driver state as-is
# Update the InvertIQ2 setting for RX
self._set_invert_iq2(self._invert_iq[0])
if self._implicit_header:
# Payload length only needs to be set in implicit header mode
self._reg_write(_REG_PAYLOAD_LEN, rx_length)
if self._dio0:
# Field value is 0, for DIO0 = RXDone
update_mask = _DIO0_MAPPING_MASK << _DIO0_MAPPING_SHIFT
if self._dio1:
# Field value also 0, for DIO1 = RXTimeout
update_mask |= _DIO1_MAPPING_MASK << _DIO1_MAPPING_SHIFT
self._reg_update(_REG_DIO_MAPPING1, update_mask, 0)
if not continuous:
# Unlike SX1262, SX1276 doesn't have a "single RX no timeout" mode. So we set the
# maximum hardware timeout and resume RX in software if needed.
if timeout_ms is None:
timeout_syms = 1023
else:
t_sym_us = self._get_t_sym_us()
timeout_syms = (timeout_ms * 1000 + t_sym_us - 1) // t_sym_us # round up
# if the timeout is too long for the modem, the host will
# automatically resume it in software. If the timeout is too
# short for the modem, round it silently up to the minimum
# timeout.
timeout_syms = _clamp(timeout_syms, 4, 1023)
self._reg_update(
_REG_MODEM_CONFIG2,
_MODEM_CONFIG2_SYMB_TIMEOUT_MSB_MASK,
timeout_syms >> 8,
)
self._reg_write(_REG_SYMB_TIMEOUT_LSB, timeout_syms & 0xFF)
# Allocate the full FIFO for RX
self._reg_write(_REG_FIFO_ADDR_PTR, 0)
self._reg_write(_REG_FIFO_RX_BASE_ADDR, 0)
self._set_mode(_OPMODE_MODE_RX_CONTINUOUS if continuous else _OPMODE_MODE_RX_SINGLE)
return will_irq
def _rx_flags_success(self, flags):
# Returns True if IRQ flags indicate successful receive.
# Specifically, from the bits in _IRQ_DRIVER_RX_MASK:
# - _IRQ_RX_DONE must be set
# - _IRQ_RX_TIMEOUT must not be set
# - _IRQ_PAYLOAD_CRC_ERROR must not be set
# - _IRQ_VALID_HEADER must be set if we're using explicit packet mode, ignored otherwise
return flags & _IRQ_DRIVER_RX_MASK == _IRQ_RX_DONE | _flag(
_IRQ_VALID_HEADER, not self._implicit_header
)
def _get_irq(self):
return self._reg_read(_REG_IRQ_FLAGS)
def _clear_irq(self, to_clear=0xFF):
return self._reg_write(_REG_IRQ_FLAGS, to_clear)
def _read_packet(self, rx_packet, flags):
# Private function to read received packet (RxPacket object) from the
# modem, if there is one.
#
# Called from poll_recv() function, which has already checked the IRQ flags
# and verified a valid receive happened.
ticks_ms = self._get_last_irq() # IRQ timestamp for the receive
rx_payload_len = self._reg_read(_REG_RX_NB_BYTES)
if rx_packet is None or len(rx_packet) != rx_payload_len:
rx_packet = RxPacket(rx_payload_len)
self._reg_readinto(_REG_FIFO, rx_packet)
rx_packet.ticks_ms = ticks_ms
# units: dB*4
rx_packet.snr = self._reg_read(_REG_PKT_SNR_VAL)
if rx_packet.snr & 0x80: # Signed 8-bit integer
# (avoiding using struct here to skip a heap allocation)
rx_packet.snr -= 0x100
# units: dBm
rx_packet.rssi = self._reg_read(_REG_PKT_RSSI_VAL) - (157 if self._pa_boost else 164)
rx_packet.crc_error = flags & _IRQ_PAYLOAD_CRC_ERROR != 0
return rx_packet
def prepare_send(self, packet):
# Prepare modem to start sending. Should be followed by a call to start_send()
#
# Part of common low-level modem API, see README.md for usage.
if len(packet) > 255:
raise ValueError("packet too long")
# Put the modem in a known state. Any current receive is suspended at this point,
# but calling _check_recv() will resume it later.
self._standby() # calling private version to keep driver state as-is
if self._ant_sw:
self._ant_sw.tx(self._pa_boost)
self._last_irq = None
if self._dio0:
self._reg_update(
_REG_DIO_MAPPING1,
_DIO0_MAPPING_MASK << _DIO0_MAPPING_SHIFT,
1 << _DIO0_MAPPING_SHIFT,
) # DIO0 = TXDone
# Update the InvertIQ2 setting for TX
self._set_invert_iq2(self._invert_iq[1])
# Allocate the full FIFO for TX
self._reg_write(_REG_FIFO_ADDR_PTR, 0)
self._reg_write(_REG_FIFO_TX_BASE_ADDR, 0)
self._reg_write(_REG_PAYLOAD_LEN, len(packet))
self._reg_write(_REG_FIFO, packet)
# clear the TX Done flag in case a previous call left it set
# (won't happen unless poll_send() was not called)
self._reg_write(_REG_IRQ_FLAGS, _IRQ_TX_DONE)
def start_send(self):
# Actually start a send that was loaded by calling prepare_send().
#
# This is split into a separate function to allow more precise timing.
#
# The driver doesn't verify the caller has done the right thing here, the
# modem will no doubt do something weird if prepare_send() was not called!
#
# Part of common low-level modem API, see README.md for usage.
self._set_mode(_OPMODE_MODE_TX)
self._tx = True
return self._dio0 is not None # will_irq if dio0 is set
def _irq_flag_tx_done(self):
return _IRQ_TX_DONE
# Define the actual modem classes that use the SyncModem & AsyncModem "mixin-like" classes
# to create sync and async variants.
try:
from .sync_modem import SyncModem
class SX1276(_SX127x, SyncModem):
pass
# Implementation note: Currently the classes SX1276, SX1277, SX1278 and
# SX1279 are actually all SX1276. Perhaps in the future some subclasses with
# software enforced limits can be added to this driver, but the differences
# appear very minor:
#
# - SX1276 seems like "baseline" with max freq.
# - SX1277 supports max SF level of 9.
# - SX1278 supports max freq 525MHz, therefore has no RFO_HF and RFI_HF pins.
# - SX1279 supports max freq 960MHz.
#
# There also appears to be no difference in silicon interface or register values to determine
# which model is connected.
SX1277 = SX1278 = SX1279 = SX1276
except ImportError:
pass
try:
from .async_modem import AsyncModem
class AsyncSX1276(_SX127x, AsyncModem):
pass
# See comment above about currently identical implementations
AsyncSX1277 = AsyncSX1278 = AsyncSX1279 = AsyncSX1276
except ImportError:
pass

Wyświetl plik

@ -0,0 +1,3 @@
metadata(version="0.1")
require("lora")
package("lora")

Wyświetl plik

@ -0,0 +1,86 @@
# MicroPython LoRa synchronous modem driver
# MIT license; Copyright (c) 2023 Angus Gratton
#
# LoRa is a registered trademark or service mark of Semtech Corporation or its affiliates.
import machine
import time
class SyncModem:
# Mixin-like base class that provides synchronous modem send and recv
# functions
#
#
# Don't instantiate this class directly, instantiate one of the 'AsyncXYZ'
# modem classes defined in the lora module.
#
# These are intended for simple applications. They block the caller until
# the modem operation is complete, and don't support interleaving send
# and receive.
def _after_init(self):
pass # Needed for AsyncModem but not SyncModem
def send(self, packet, tx_at_ms=None):
# Send the given packet (byte sequence),
# and return once transmission of the packet is complete.
#
# Returns a timestamp (result of time.ticks_ms()) when the packet
# finished sending.
self.prepare_send(packet)
# If the caller specified a timestamp to start transmission at, wait until
# that time before triggering the send
if tx_at_ms is not None:
time.sleep_ms(max(0, time.ticks_diff(tx_at_ms, time.ticks_ms())))
will_irq = self.start_send() # ... and go!
# sleep for the expected send time before checking if send has ended
time.sleep_ms(self.get_time_on_air_us(len(packet)) // 1000)
tx = True
while tx is True:
tx = self.poll_send()
self._sync_wait(will_irq)
return tx
def recv(self, timeout_ms=None, rx_length=0xFF, rx_packet=None):
# Attempt to a receive a single LoRa packet, timeout after timeout_ms milliseconds
# or wait indefinitely if no timeout is supplied (default).
#
# Returns an instance of RxPacket or None if the radio timed out while receiving.
#
# Optional rx_length argument is only used if lora_cfg["implict_header"] == True
# (not the default) and holds the length of the payload to receive.
#
# Optional rx_packet argument can be an existing instance of RxPacket
# which will be reused to save allocations, but only if the received packet
# is the same length as the rx_packet packet. If the length is different, a
# new RxPacket instance is allocated and returned.
will_irq = self.start_recv(timeout_ms, False, rx_length)
rx = True
while rx is True:
self._sync_wait(will_irq)
rx = self.poll_recv(rx_packet)
return rx or None
def _sync_wait(self, will_irq):
# For synchronous usage, block until an interrupt occurs or we time out
if will_irq:
for n in range(100):
machine.idle()
# machine.idle() wakes up very often, so don't actually return
# unless _radio_isr ran already. The outer for loop is so the
# modem is still polled occasionally to
# avoid the possibility an IRQ was lost somewhere.
#
# None of this is very efficient, power users should either use
# async or call the low-level API manually with better
# port-specific sleep configurations, in order to get the best
# efficiency.
if self.irq_triggered():
break
else:
time.sleep_ms(1)

Wyświetl plik

@ -0,0 +1,3 @@
metadata(version="0.1")
require("lora")
package("lora")

Wyświetl plik

@ -0,0 +1,29 @@
# MicroPython lora module
# MIT license; Copyright (c) 2023 Angus Gratton
from .modem import RxPacket # noqa: F401
ok = False # Flag if at least one modem driver package is installed
# Various lora "sub-packages"
try:
from .sx126x import * # noqa: F401
ok = True
except ImportError as e:
if "no module named 'lora." not in str(e):
raise
try:
from .sx127x import * # noqa: F401
ok = True
except ImportError as e:
if "no module named 'lora." not in str(e):
raise
if not ok:
raise ImportError(
"Incomplete lora installation. Need at least one of lora-sync, lora-async and one of lora-sx126x, lora-sx127x"
)

Wyświetl plik

@ -0,0 +1,483 @@
# MicroPython LoRa modem driver base class
# MIT license; Copyright (c) 2023 Angus Gratton
#
# LoRa is a registered trademark or service mark of Semtech Corporation or its affiliates.
import time
from micropython import const, schedule
# Set to True to get some additional printed debug output.
_DEBUG = const(False)
def _clamp(v, vmin, vmax):
# Small utility function to clamp a value 'v' between 'vmin' and 'vmax', inclusive.
return min(max(vmin, v), vmax)
def _flag(value, condition):
# Small utility function for returning a bit 'value' or not, based on a
# boolean condition. Can help make expressions to build register values more
# readable.
#
# Note that for value==1, can also rely on int(bool(x)) with one or both
# conversions being implicit, as int(True)==1 and int(False)==0
#
# There is also (condition and value) but this is (IMO) confusing to read.
return value if condition else 0
class ConfigError(ValueError):
# Raise if there is an error in lora_cfg, saves some duplicated strings
def __init__(self, field):
super().__init__("Invalid lora_cfg {}".format(field))
class BaseModem:
def __init__(self, ant_sw):
self._ant_sw = ant_sw
self._irq_callback = None
# Common configuration settings that need to be tracked by all modem drivers
# (Note that subclasses may set these to other values in their constructors, to match
# the power-on-reset configuration of a particular modem.)
#
self._rf_freq_hz = 0 # Needs to be set via configure()
self._sf = 7 # Spreading factor
self._bw_hz = 125000 # Reset value
self._coding_rate = 5
self._crc_en = True # use packet CRCs
self._implicit_header = False # implict vs explicit header mode
self._preamble_len = 12
self._coding_rate = 5
# CRC error counter
self.crc_errors = 0
self.rx_crc_error = False
# Current state of the modem
# _rx holds radio recv state:
#
# - False if the radio is not receiving
# - True if the radio is continuously receiving, or performing a single receive with
# no timeout.
# - An int if there is a timeout set, in which case it is the is the receive deadline
# (as a time.ticks_ms() timestamp).
#
# Note that self._rx can be not-False even when the radio hardware is not actually
# receiving, if self._tx is True (send always pauses recv.)
self._rx = False
# _rx_continuous is True if the modem is in continuous receive mode
# (this value is only valid when self._rx is also True).
self._rx_continuous = False
# This argument is stored from the parameter of the same name, as set in
# the last call to start_recv()
self._rx_length = None
# _tx holds radio send state and is simpler, True means sending and
# False means not sending.
self._tx = False
# timestamp (as time.ticks_ms() result) of last IRQ event
self._last_irq = None
# values are:
# - lora_cfg["invert_iq_rx"]
# - lora_cfg["invert_iq_tx"]
# - Current modem Invert setting
self._invert_iq = [False, False, False]
# This hook exists to allow the SyncModem & AsyncModem "mixin-like"
# classes to have some of their own state, without needing to manage the
# fuss of multiple constructor paths.
try:
self._after_init()
except AttributeError:
# If this exception happens here then one of the modem classes without a SyncModem or AsyncModem "mixin-like" class
# has been instantiated.
raise NotImplementedError(
"Don't instantiate this class directly, "
"instantiate a class from the 'lora' package"
)
def standby(self):
# Put the modem into standby. Can be used to cancel a continuous recv,
# or cancel a send before it completes.
#
# Calls the private function which actually sets the mode to standby, and then
# clears all the driver's state flags.
#
# Note this is also called before going to sleep(), to save on duplicated code.
self._standby()
self._rx = False
self._tx = False
self._last_irq = None
if self._ant_sw:
self._ant_sw.idle()
self._radio_isr(None) # "soft ISR"
def _get_t_sym_us(self):
# Return length of a symbol in microseconds
return 1000_000 * (1 << self._sf) // self._bw_hz
def _get_ldr_en(self):
# Return true if Low Data Rate should be enabled
#
# The calculation in get_n_symbols_x4() relies on this being the same logic applied
# in the modem configuration routines.
return self._get_t_sym_us() >= 16000
def _get_pa_ramp_val(self, lora_cfg, supported):
# Return the PA ramp register index from the list of supported PA ramp
# values. If the requested ramp time is supported by the modem, round up
# to the next supported value.
#
# 'supported' is the list of supported ramp times, must be sorted
# already.
us = int(lora_cfg["pa_ramp_us"])
# Find the index of the lowest supported ramp time that is longer or the
# same value as 'us'
for i, v in enumerate(supported):
if v >= us:
return i
# The request ramp time is longer than all this modem's supported ramp times
raise ConfigError("pa_ramp_us")
def _symbol_offsets(self):
# Called from get_time_on_air_us().
#
# This function provides a way to implement the different SF5 and SF6 in SX126x,
# by returning two offsets: one for the overall number of symbols, and one for the
# number of bits used to calculate the symbol length of the payload.
return (0, 0)
def get_n_symbols_x4(self, payload_len):
# Get the number of symbols in a packet (Time-on-Air) for the current
# configured modem settings and the provided payload length in bytes.
#
# Result is in units of "symbols times 4" as there is a fractional term
# in the equation, and we want to limit ourselves to integer arithmetic.
#
# References are:
# - SX1261/2 DS 6.1.4 "LoRa Time-on-Air"
# - SX1276 DS 4.1.1 "Time on air"
#
# Note the two datasheets give the same information in different
# ways. SX1261/62 DS is (IMO) clearer, so this function is based on that
# formula. The result is equivalent to the datasheet value "Nsymbol",
# times 4.
#
# Note also there are unit tests for this function in tests/test_time_on_air.py,
# and that it's been optimised a bit for code size (with impact on readability)
# Account for a minor difference between SX126x and SX127x: they have
# incompatible SF 5 & 6 modes.
#
# In SX126x when using SF5 or SF6, we apply an offset of +2 symbols to
# the overall preamble symbol count (s_o), and an offset of -8 to the
# payload bit length (b_o).
s_o, b_o = self._symbol_offsets()
# calculate the bit length of the payload
#
# This is the part inside the max(...,0) in the datasheet
bits = (
# payload_bytes
8 * payload_len
# N_bit_crc
+ (16 if self._crc_en else 0)
# (4 * SF)
- (4 * self._sf)
# +8 for most modes, except SF5/6 on SX126x where b_o == -8 so these two cancel out
+ 8
+ b_o
# N_symbol_header
+ (0 if self._implicit_header else 20)
)
bits = max(bits, 0)
# "Bits per symbol" denominator is either (4 * SF) or (4 * (SF -2))
# depending on Low Data Rate Optimization
bps = (self._sf - (2 * self._get_ldr_en())) * 4
return (
# Fixed preamble portion (4.25), times 4
17
# Remainder of equation is an integer number of symbols, times 4
+ 4
* (
# configured preamble length
self._preamble_len
+
# optional extra preamble symbols (4.25+2=6.25 for SX1262 SF5,SF6)
s_o
+
# 8 symbol constant overhead
8
+
# Payload symbol length
# (this is the term "ceil(bits / 4 * SF) * (CR + 4)" in the datasheet
((bits + bps - 1) // bps) * self._coding_rate
)
)
def get_time_on_air_us(self, payload_len):
# Return the "Time on Air" in microseconds for a particular
# payload length and the current configured modem settings.
return self._get_t_sym_us() * self.get_n_symbols_x4(payload_len) // 4
# Modem ISR routines
#
# ISR implementation is relatively simple, just exists to signal an optional
# callback, record a timestamp, and wake up the hardware if
# needed. ppplication code is expected to call poll_send() or
# poll_recv() as applicable in order to confirm the modem state.
#
# This is a MP hard irq in some configurations, meaning no memory allocation is possible.
#
# 'pin' may also be None if this is a "soft" IRQ triggered after a receive
# timed out during a send (meaning no receive IRQ will fire, but the
# receiver should wake up and move on anyhow.)
def _radio_isr(self, pin):
self._last_irq = time.ticks_ms()
if self._irq_callback:
self._irq_callback(pin)
if _DEBUG:
# Note: this may cause a MemoryError and fail if _DEBUG is enabled in this base class
# but disabled in the subclass, meaning this is a hard irq handler
try:
print("_radio_isr pin={}".format(pin))
except MemoryError:
pass
def irq_triggered(self):
# Returns True if the ISR has executed since the last time a send or a receive
# started
return self._last_irq is not None
def set_irq_callback(self, callback):
# Set a function to be called from the radio ISR
#
# This is used by the AsyncModem implementation, but can be called in
# other circumstances to implement custom ISR logic.
#
# Note that callback may be called in hard ISR context, meaning no
# memory allocation is possible.
self._irq_callback = callback
def _get_last_irq(self):
# Return the _last_irq timestamp if set by an ISR, or the
# current time.time_ms() timestamp otherwise.
if self._last_irq is None:
return time.ticks_ms()
return self._last_irq
# Common parts of receive API
def start_recv(self, timeout_ms=None, continuous=False, rx_length=0xFF):
# Start receiving.
#
# Part of common low-level modem API, see README.md for usage.
if continuous and timeout_ms is not None:
raise ValueError() # these two options are mutually exclusive
if timeout_ms is not None:
self._rx = time.ticks_add(time.ticks_ms(), timeout_ms)
else:
self._rx = True
self._rx_continuous = continuous
self._rx_length = rx_length
if self._ant_sw and not self._tx:
# this is guarded on 'not self._tx' as the subclass will not immediately
# start receiving if a send is in progress.
self._ant_sw.rx()
def poll_recv(self, rx_packet=None):
# Should be called while a receive is in progress:
#
# Part of common low-level modem API, see README.md for usage.
#
# This function may alter the state of the modem - it will clear
# RX interrupts, it may read out a packet from the FIFO, and it
# may resume receiving if the modem has gone to standby but receive
# should resume.
if self._rx is False:
# Not actually receiving...
return False
if self._tx:
# Actually sending, this has to complete before we
# resume receiving, but we'll indicate that we are still receiving.
#
# (It's not harmful to fall through here and check flags anyhow, but
# it is a little wasteful if an interrupt has just triggered
# poll_send() as well.)
return True
packet = None
flags = self._get_irq()
if _DEBUG and flags:
print("RX flags {:#x}".format(flags))
if flags & self._IRQ_RX_COMPLETE:
# There is a small potential for race conditions here in continuous
# RX mode. If packets are received rapidly and the call to this
# function delayed, then a ValidHeader interrupt (for example) might
# have already set for a second packet which is being received now,
# and clearing it will mark the second packet as invalid.
#
# However it's necessary in continuous mode as interrupt flags don't
# self-clear in the modem otherwise (for example, if a CRC error IRQ
# bit sets then it stays set on the next packet, even if that packet
# has a valid CRC.)
self._clear_irq(flags)
ok = self._rx_flags_success(flags)
if not ok:
# If a non-valid receive happened, increment the CRC error counter
self.crc_errors += 1
if ok or self.rx_crc_error:
# Successfully received a valid packet (or configured to return all packets)
packet = self._read_packet(rx_packet, flags)
if not self._rx_continuous:
# Done receiving now
self._end_recv()
# _check_recv() will return True if a receive is ongoing and hasn't timed out,
# and also manages resuming any modem receive if needed
#
# We need to always call check_recv(), but if we received a packet then this is what
# we should return to the caller.
res = self._check_recv()
return packet or res
def _end_recv(self):
# Utility function to clear the receive state
self._rx = False
if self._ant_sw:
self._ant_sw.idle()
def _check_recv(self):
# Internal function to automatically call start_recv()
# again if a receive has been interrupted and the host
# needs to start it again.
#
# Return True if modem is still receiving (or sending, but will
# resume receiving after send finishes).
if not self._rx:
return False # Not receiving, nothing to do
if not self.is_idle():
return True # Radio is already sending or receiving
rx = self._rx
timeout_ms = None
if isinstance(rx, int): # timeout is set
timeout_ms = time.ticks_diff(rx, time.ticks_ms())
if timeout_ms <= 0:
# Timed out in software, nothing to resume
self._end_recv()
if _DEBUG:
print("Timed out in software timeout_ms={}".format(timeout_ms))
schedule(
self._radio_isr, None
) # "soft irq" to unblock anything waiting on the interrupt event
return False
if _DEBUG:
print(
"Resuming receive timeout_ms={} continuous={} rx_length={}".format(
timeout_ms, self._rx_continuous, self._rx_length
)
)
self.start_recv(timeout_ms, self._rx_continuous, self._rx_length)
# restore the previous version of _rx so ticks_ms deadline can't
# slowly creep forward each time this happens
self._rx = rx
return True
# Common parts of send API
def poll_send(self):
# Check the ongoing send state.
#
# Returns one of:
#
# - True if a send is ongoing and the caller
# should call again.
# - False if no send is ongoing.
# - An int value exactly one time per transmission, the first time
# poll_send() is called after a send ends. In this case it
# is the time.ticks_ms() timestamp of the time that the send completed.
#
# Note this function only returns an int value one time (the first time it
# is called after send completes).
#
# Part of common low-level modem API, see README.md for usage.
if not self._tx:
return False
ticks_ms = self._get_last_irq()
if not (self._get_irq() & self._IRQ_TX_COMPLETE):
# Not done. If the host and modem get out
# of sync here, or the caller doesn't follow the sequence of
# send operations exactly, then can end up in a situation here
# where the modem has stopped sending and has gone to Standby,
# so _IRQ_TX_DONE is never set.
#
# For now, leaving this for the caller to do correctly. But if it becomes an issue then
# we can call _get_mode() here as well and check the modem is still in a TX mode.
return True
self._clear_irq()
self._tx = False
if self._ant_sw:
self._ant_sw.idle()
# The modem just finished sending, so start receiving again if needed
self._check_recv()
return ticks_ms
class RxPacket(bytearray):
# A class to hold a packet received from a LoRa modem.
#
# The base class is bytearray, which represents the packet payload,
# allowing RxPacket objects to be passed anywhere that bytearrays are
# accepted.
#
# Some additional properties are set on the object to store metadata about
# the received packet.
def __init__(self, payload, ticks_ms=None, snr=None, rssi=None, valid_crc=True):
super().__init__(payload)
self.ticks_ms = ticks_ms
self.snr = snr
self.rssi = rssi
self.valid_crc = valid_crc
def __repr__(self):
return "{}({}, {}, {}, {}, {})".format(
"RxPacket",
repr(
bytes(self)
), # This is a bit wasteful, but gets us b'XYZ' rather than "bytearray(b'XYZ')"
self.ticks_ms,
self.snr,
self.rssi,
self.valid_crc,
)

Wyświetl plik

@ -0,0 +1,2 @@
metadata(version="0.1")
package("lora")

Wyświetl plik

@ -0,0 +1,310 @@
# MicroPython LoRa modem driver time on air tests
# MIT license; Copyright (c) 2023 Angus Gratton
#
# LoRa is a registered trademark or service mark of Semtech Corporation or its affiliates.
#
# ## What is this?
#
# Host tests for the BaseModem.get_time_on_air_us() function. Theses against
# dummy test values produced by the Semtech "SX1261 LoRa Calculator" software,
# as downloaded from
# https://lora-developers.semtech.com/documentation/product-documents/
#
# The app notes for SX1276 (AN1200.3) suggest a similar calculator exists for that
# modem, but it doesn't appear to be available for download any more. I couldn't find
# an accurate calculator for SX1276, so manually calculated the SF5 & SF6 test cases below
# (other values should be the same as SX1262).
#
# ## Instructions
#
# These tests are intended to be run on a host PC via micropython unix port:
#
# cd /path/to/micropython-lib/micropython/lora
# micropython -m tests.test_time_on_air
#
# Note: Using the working directory shown above is easiest way to ensure 'lora' files are imported.
#
from lora import SX1262, SX1276
# Allow time calculations to deviate by up to this much as a ratio
# of the expected value (due to floating point, etc.)
TIME_ERROR_RATIO = 0.00001 # 0.001%
def main():
sx1262 = SX1262(spi=DummySPI(), cs=DummyPin(), busy=DummyPin())
sx1276 = SX1276(spi=DummySPI(0x12), cs=DummyPin())
# Test case format is based on the layout of the Semtech Calculator UI:
#
# (modem_instance,
# (modem settings),
# [
# ((packet config), (output values)),
# ...
# ],
# ),
#
# where each set of modem settings maps to zero or more packet config / output pairs
#
# - modem instance is sx1262 or sx1276 (SF5 & SF6 are different between these modems)
# - (modem settings) is (sf, bw (in khz), coding_rate, low_datarate_optimize)
# - (packet config) is (preamble_len, payload_len, explicit_header, crc_en)
# - (output values) is (total_symbols_excl, symbol_time in ms, time_on_air in ms)
#
# NOTE: total_symbols_excl is the value shown in the calculator output,
# which doesn't include 8 symbols of constant overhead between preamble and
# header+payload+crc. I think this is a bug in the Semtech calculator(!).
# These 8 symbols are included when the calculator derives the total time on
# air.
#
# NOTE ALSO: The "symbol_time" only depends on the modem settings so is
# repeated each group of test cases, and the "time_on_air" is the previous
# two output values multiplied (after accounting for the 8 symbols noted
# above). This repetition is deliberate to make the cases easier to read
# line-by-line when comparing to the calculator window.
CASES = [
(
sx1262,
(12, 500, 5, False), # Calculator defaults when launching calculator
[
((8, 1, True, True), (17.25, 8.192, 206.848)), # Calculator defaults
((12, 64, True, True), (71.25, 8.192, 649.216)),
((8, 1, True, False), (12.25, 8.192, 165.888)),
((8, 192, True, True), (172.25, 8.192, 1476.608)),
((12, 16, False, False), (26.25, 8.192, 280.576)),
],
),
(
sx1262,
(8, 125, 6, False),
[
((8, 1, True, True), (18.25, 2.048, 53.760)),
((8, 2, True, True), (18.25, 2.048, 53.760)),
((8, 2, True, False), (18.25, 2.048, 53.760)),
((8, 3, True, True), (24.25, 2.048, 66.048)),
((8, 3, True, False), (18.25, 2.048, 53.760)),
((8, 4, True, True), (24.25, 2.048, 66.048)),
((8, 4, True, False), (18.25, 2.048, 53.760)),
((8, 5, True, True), (24.25, 2.048, 66.048)),
((8, 5, True, False), (24.25, 2.048, 66.048)),
((8, 253, True, True), (396.25, 2.048, 827.904)),
((8, 253, True, False), (396.25, 2.048, 827.904)),
((12, 5, False, True), (22.25, 2.048, 61.952)),
((12, 5, False, False), (22.25, 2.048, 61.952)),
((12, 10, False, True), (34.25, 2.048, 86.528)),
((12, 253, False, True), (394.25, 2.048, 823.808)),
],
),
# quick check that sx1276 is the same as sx1262 for SF>6
(
sx1276,
(8, 125, 6, False),
[
((8, 1, True, True), (18.25, 2.048, 53.760)),
((8, 2, True, True), (18.25, 2.048, 53.760)),
((12, 5, False, True), (22.25, 2.048, 61.952)),
((12, 5, False, False), (22.25, 2.048, 61.952)),
],
),
# SF5 on SX1262
(
sx1262,
(5, 500, 5, False),
[
(
(2, 1, True, False),
(13.25, 0.064, 1.360),
), # Shortest possible LoRa packet?
((2, 1, True, True), (18.25, 0.064, 1.680)),
((12, 1, False, False), (18.25, 0.064, 1.680)),
((12, 253, False, True), (523.25, 0.064, 34.000)),
],
),
(
sx1262,
(5, 125, 8, False),
[
((12, 253, False, True), (826.25, 0.256, 213.568)),
],
),
# SF5 on SX1276
#
# Note: SF5 & SF6 settings are different between SX1262 & SX1276.
#
# There's no Semtech official calculator available for SX1276, so the
# symbol length is calculated by copying the formula from the datasheet
# "Time on air" section. Symbol time is the same as SX1262. Then the
# time on air is manually calculated by multiplying the two together.
#
# see the functions sx1276_num_payload and sx1276_num_symbols at end of this module
# for the actual functions used.
(
sx1276,
(5, 500, 5, False),
[
(
(2, 1, True, False),
(19.25 - 8, 0.064, 1.232),
), # Shortest possible LoRa packet?
((2, 1, True, True), (24.25 - 8, 0.064, 1.552)),
((12, 1, False, False), (24.25 - 8, 0.064, 1.552)),
((12, 253, False, True), (534.25 - 8, 0.064, 34.192)),
],
),
(
sx1276,
(5, 125, 8, False),
[
((12, 253, False, True), (840.25 - 8, 0.256, 215.104)),
],
),
(
sx1262,
(12, 7.81, 8, True), # Slowest possible
[
((128, 253, True, True), (540.25, 524.456, 287532.907)),
((1000, 253, True, True), (1412.25, 524.456, 744858.387)),
],
),
(
sx1262,
(11, 10.42, 7, True),
[
((25, 16, True, True), (57.25, 196.545, 12824.568)),
((25, 16, False, False), (50.25, 196.545, 11448.752)),
],
),
]
tests = 0
failures = set()
for modem, modem_settings, packets in CASES:
(sf, bw_khz, coding_rate, low_datarate_optimize) = modem_settings
print(
f"Modem config sf={sf} bw={bw_khz}kHz coding_rate=4/{coding_rate} "
+ f"low_datarate_optimize={low_datarate_optimize}"
)
# We don't call configure() as the Dummy interfaces won't handle it,
# just update the BaseModem fields directly
modem._sf = sf
modem._bw_hz = int(bw_khz * 1000)
modem._coding_rate = coding_rate
# Low datarate optimize on/off is auto-configured in the current driver,
# check the automatic selection matches the test case from the
# calculator
if modem._get_ldr_en() != low_datarate_optimize:
print(
f" -- ERROR: Test case has low_datarate_optimize={low_datarate_optimize} "
+ f"but modem selects {modem._get_ldr_en()}"
)
failures += 1
continue # results will not match so don't run any of the packet test cases
for packet_config, expected_outputs in packets:
preamble_len, payload_len, explicit_header, crc_en = packet_config
print(
f" -- preamble_len={preamble_len} payload_len={payload_len} "
+ f"explicit_header={explicit_header} crc_en={crc_en}"
)
modem._preamble_len = preamble_len
modem._implicit_header = not explicit_header # opposite logic to calculator
modem._crc_en = crc_en
# Now calculate the symbol length and times and compare with the expected valuesd
(
expected_symbols,
expected_symbol_time,
expected_time_on_air,
) = expected_outputs
print(f" ---- calculator shows total length {expected_symbols}")
expected_symbols += 8 # Account for the calculator bug mentioned in the comment above
n_symbols = modem.get_n_symbols_x4(payload_len) / 4.0
symbol_time_us = modem._get_t_sym_us()
time_on_air_us = modem.get_time_on_air_us(payload_len)
tests += 1
if n_symbols == expected_symbols:
print(f" ---- symbols {n_symbols}")
else:
print(f" ---- SYMBOL COUNT ERROR expected {expected_symbols} got {n_symbols}")
failures.add((modem, modem_settings, packet_config))
max_error = expected_symbol_time * 1000 * TIME_ERROR_RATIO
if abs(int(expected_symbol_time * 1000) - symbol_time_us) <= max_error:
print(f" ---- symbol time {expected_symbol_time}ms")
else:
print(
f" ---- SYMBOL TIME ERROR expected {expected_symbol_time}ms "
+ f"got {symbol_time_us}us"
)
failures.add((modem, modem_settings, packet_config))
max_error = expected_time_on_air * 1000 * TIME_ERROR_RATIO
if abs(int(expected_time_on_air * 1000) - time_on_air_us) <= max_error:
print(f" ---- time on air {expected_time_on_air}ms")
else:
print(
f" ---- TIME ON AIR ERROR expected {expected_time_on_air}ms "
+ f"got {time_on_air_us}us"
)
failures.add((modem, modem_settings, packet_config))
print("************************")
print(f"\n{len(failures)}/{tests} tests failed")
if failures:
print("FAILURES:")
for f in failures:
print(f)
raise SystemExit(1)
print("SUCCESS")
class DummySPI:
# Dummy SPI Interface allows us to use normal constructors
#
# Reading will always return the 'always_read' value
def __init__(self, always_read=0x00):
self.always_read = always_read
def write_readinto(self, _wrbuf, rdbuf):
for i in range(len(rdbuf)):
rdbuf[i] = self.always_read
class DummyPin:
# Dummy Pin interface allows us to use normal constructors
def __init__(self):
pass
def __call__(self, _=None):
pass
# Copies of the functions used to calculate SX1276 SF5, SF6 test case symbol counts.
# (see comments above).
#
# These are written as closely to the SX1276 datasheet "Time on air" section as
# possible, quite different from the BaseModem implementation.
def sx1276_n_payload(pl, sf, ih, de, cr, crc):
import math
ceil_arg = 8 * pl - 4 * sf + 28 + 16 * crc - 20 * ih
ceil_arg /= 4 * (sf - 2 * de)
return 8 + max(math.ceil(ceil_arg) * (cr + 4), 0)
def sx1276_n_syms(pl, sf, ih, de, cr, crc, n_preamble):
return sx1276_n_payload(pl, sf, ih, de, cr, crc) + n_preamble + 4.25
if __name__ == "__main__":
main()