diff --git a/test_images/compress_test_images.py b/test_images/compress_test_images.py index 4a17d3a..95b5003 100644 --- a/test_images/compress_test_images.py +++ b/test_images/compress_test_images.py @@ -17,7 +17,7 @@ import os, glob # images should be named 1.jpg, 2.jpg, etc. -image_numbers = xrange(1,14) +image_numbers = range(1,14) new_sizes = ["800x608","640x480","320x240"] callsign = "VK5QI" diff --git a/tx/PacketTX.py b/tx/PacketTX.py index 250e4a7..3257b9b 100644 --- a/tx/PacketTX.py +++ b/tx/PacketTX.py @@ -16,7 +16,6 @@ import serial -import Queue import sys import os import datetime @@ -29,6 +28,7 @@ from time import sleep from threading import Thread import numpy as np from ldpc_encoder import * +from queue import Queue class PacketTX(object): """ Packet Transmitter Class @@ -56,15 +56,15 @@ class PacketTX(object): """ # Transmit Queues. - ssdv_queue = Queue.Queue(4096) # Up to 1MB of 256 byte packets - telemetry_queue = Queue.Queue(256) # Keep this queue small. It's up to the user not to over-use this queue. + ssdv_queue = Queue(4096) # Up to 1MB of 256 byte packets + telemetry_queue = Queue(256) # Keep this queue small. It's up to the user not to over-use this queue. # Framing parameters - unique_word = "\xab\xcd\xef\x01" - preamble = "\x55"*16 + unique_word = b"\xab\xcd\xef\x01" + preamble = b"\x55"*16 # Idle sequence, transmitted if there is nothing in the transmit queues. - idle_sequence = "\x56"*256 + idle_sequence = b"\x56"*256 # Transmit thread active flag. transmit_active = False @@ -94,7 +94,7 @@ class PacketTX(object): self.payload_length = payload_length - self.callsign = callsign + self.callsign = callsign.encode('ascii') self.fec = fec self.crc16 = crcmod.predefined.mkCrcFun('crc-ccitt-false') @@ -130,25 +130,25 @@ class PacketTX(object): packet = packet[:self.payload_length] if len(packet) < self.payload_length: - packet = packet + "\x55"*(self.payload_length - len(packet)) + packet = packet + b"\x55"*(self.payload_length - len(packet)) crc = struct.pack(" 252: message = message[:252] - packet = "\x00" + struct.pack(">BH",len(message),self.text_message_count) + message + packet = b"\x00" + struct.pack(">BH",len(message),self.text_message_count) + message.encode('ascii') self.queue_telemetry_packet(packet, repeats=repeats) log_string = "TXing Text Message #%d: %s" % (self.text_message_count,message) @@ -419,13 +419,13 @@ class PacketTX(object): _id = int(id) % 256 # Convert the provided data to a string - _data = str(bytearray(data)) + _data = bytes(bytearray(data)) # Clip to 254 bytes. if len(_data) > 254: _data = _data[:254] - _packet = "\x03" + struct.pack(">B",_id) + _data + _packet = b"\x03" + struct.pack(">B",_id) + _data self.queue_telemetry_packet(_packet, repeats=repeats) @@ -438,7 +438,7 @@ class PacketTX(object): def handle_udp_packet(self, packet): ''' Process a received UDP packet ''' try: - packet_dict = json.loads(packet) + packet_dict = json.loads(packet.decode()) if packet_dict['type'] == 'WENET_TX_TEXT': # Transmit an arbitrary text packet. diff --git a/tx/SX127x/LoRa.py b/tx/SX127x/LoRa.py new file mode 100755 index 0000000..7d1e3ee --- /dev/null +++ b/tx/SX127x/LoRa.py @@ -0,0 +1,939 @@ +""" Defines the SX127x class and a few utility functions. """ + +# Copyright 2015 Mayer Analytics Ltd. +# +# This file is part of pySX127x. +# +# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public +# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You can be released from the requirements of the license by obtaining a commercial license. Such a license is +# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your +# own applications, or shipping pySX127x with a closed source product. +# +# You should have received a copy of the GNU General Public License along with pySX127. If not, see +# . + + +import sys, struct +from .constants import * + + +################################################## Some utility functions ############################################## + +def set_bit(value, index, new_bit): + """ Set the index'th bit of value to new_bit, and return the new value. + :param value: The integer to set the new_bit in + :type value: int + :param index: 0-based index + :param new_bit: New value the bit shall have (0 or 1) + :return: Changed value + :rtype: int + """ + mask = 1 << index + value &= ~mask + if new_bit: + value |= mask + return value + + +def getter(register_address): + """ The getter decorator reads the register content and calls the decorated function to do + post-processing. + :param register_address: Register address + :return: Register value + :rtype: int + """ + def decorator(func): + def wrapper(self): + return func(self, self.spi.xfer([register_address, 0])[1]) + return wrapper + return decorator + + +def setter(register_address): + """ The setter decorator calls the decorated function for pre-processing and + then writes the result to the register + :param register_address: Register address + :return: New register value + :rtype: int + """ + def decorator(func): + def wrapper(self, val): + return self.spi.xfer([register_address | 0x80, func(self, val)])[1] + return wrapper + return decorator + + +############################################### Definition of the LoRa class ########################################### + +class LoRa(object): + + BOARD = None + spi = None + mode = None # the mode is backed up here + backup_registers = [] + verbose = True + dio_mapping = [None] * 6 # store the dio mapping here + current_freq = 434.500 + + def __init__(self, hw_interface, verbose=True): + self.BOARD = hw_interface + self.spi = self.BOARD.SpiDev() + self.verbose = verbose + # set the callbacks for DIO0..5 IRQs. + self.BOARD.add_events(self._dio0, self._dio1, self._dio2, self._dio3, self._dio4, self._dio5) + # set mode to sleep and read all registers + self.set_mode(MODE.SLEEP) + self.backup_registers = self.get_all_registers() + # more setup work: + # self.rx_chain_calibration(434.) # TODO: Find out if we need this. + # the FSK registers are set up exactly as modtronix do it: + lookup_fsk = [ + #[REG.FSK.LNA , 0x23], + #[REG.FSK.RX_CONFIG , 0x1E], + #[REG.FSK.RSSI_CONFIG , 0xD2], + #[REG.FSK.PREAMBLE_DETECT, 0xAA], + #[REG.FSK.OSC , 0x07], + #[REG.FSK.SYNC_CONFIG , 0x12], + #[REG.FSK.SYNC_VALUE_1 , 0xC1], + #[REG.FSK.SYNC_VALUE_2 , 0x94], + #[REG.FSK.SYNC_VALUE_3 , 0xC1], + #[REG.FSK.PACKET_CONFIG_1, 0xD8], + #[REG.FSK.FIFO_THRESH , 0x8F], + #[REG.FSK.IMAGE_CAL , 0x02], + #[REG.FSK.DIO_MAPPING_1 , 0x00], + #[REG.FSK.DIO_MAPPING_2 , 0x30] + ] + #self.set_mode(MODE.FSK_STDBY) + #for register_address, value in lookup_fsk: + # self.set_register(register_address, value) + #self.set_mode(MODE.SLEEP) + # set the dio_ mapping by calling the two get_dio_mapping_* functions + self.get_dio_mapping_1() + self.get_dio_mapping_2() + + + # Overridable functions: + + def on_rx_done(self): + pass + + def on_tx_done(self): + pass + + def on_cad_done(self): + pass + + def on_rx_timeout(self): + pass + + def on_valid_header(self): + pass + + def on_payload_crc_error(self): + pass + + def on_fhss_change_channel(self): + pass + + # Internal callbacks for add_events() + + def _dio0(self, channel): + # DIO0 00: RxDone + # DIO0 01: TxDone + # DIO0 10: CadDone + if self.dio_mapping[0] == 0: + self.on_rx_done() + elif self.dio_mapping[0] == 1: + self.on_tx_done() + elif self.dio_mapping[0] == 2: + self.on_cad_done() + else: + raise RuntimeError("unknown dio0mapping!") + + def _dio1(self, channel): + # DIO1 00: RxTimeout + # DIO1 01: FhssChangeChannel + # DIO1 10: CadDetected + if self.dio_mapping[1] == 0: + self.on_rx_timeout() + elif self.dio_mapping[1] == 1: + self.on_fhss_change_channel() + elif self.dio_mapping[1] == 2: + self.on_CadDetected() + else: + raise RuntimeError("unknown dio1mapping!") + + def _dio2(self, channel): + # DIO2 00: FhssChangeChannel + # DIO2 01: FhssChangeChannel + # DIO2 10: FhssChangeChannel + self.on_fhss_change_channel() + + def _dio3(self, channel): + # DIO3 00: CadDone + # DIO3 01: ValidHeader + # DIO3 10: PayloadCrcError + if self.dio_mapping[3] == 0: + self.on_cad_done() + elif self.dio_mapping[3] == 1: + self.on_valid_header() + elif self.dio_mapping[3] == 2: + self.on_payload_crc_error() + else: + raise RuntimeError("unknown dio3 mapping!") + + def _dio4(self, channel): + raise RuntimeError("DIO4 is not used") + + def _dio5(self, channel): + raise RuntimeError("DIO5 is not used") + + # All the set/get/read/write functions + + def get_mode(self): + """ Get the mode + :return: New mode + """ + self.mode = self.spi.xfer([REG.LORA.OP_MODE, 0])[1] + return self.mode + + def set_mode(self, mode): + """ Set the mode + :param mode: Set the mode. Use constants.MODE class + :return: New mode + """ + # the mode is backed up in self.mode + if mode == self.mode: + return mode + if self.verbose: + sys.stderr.write("Mode <- %s\n" % MODE.lookup[mode]) + self.mode = mode + return self.spi.xfer([REG.LORA.OP_MODE | 0x80, mode])[1] + + def write_payload(self, payload): + """ Get FIFO ready for TX: Set FifoAddrPtr to FifoTxBaseAddr. The transceiver is put into STDBY mode. + :param payload: Payload to write (list) + :return: Written payload + """ + self.set_mode(MODE.STDBY) + base_addr = self.get_fifo_tx_base_addr() + self.set_fifo_addr_ptr(base_addr) + return self.spi.xfer([REG.LORA.FIFO | 0x80] + payload)[1:] + + def reset_ptr_rx(self): + """ Get FIFO ready for RX: Set FifoAddrPtr to FifoRxBaseAddr. The transceiver is put into STDBY mode. """ + self.set_mode(MODE.STDBY) + base_addr = self.get_fifo_rx_base_addr() + self.set_fifo_addr_ptr(base_addr) + + def rx_is_good(self): + """ Check the IRQ flags for RX errors + :return: True if no errors + :rtype: bool + """ + flags = self.get_irq_flags() + return not any([flags[s] for s in ['valid_header', 'crc_error', 'rx_done', 'rx_timeout']]) + + def read_payload(self , nocheck = False): + """ Read the payload from FIFO + :param nocheck: If True then check rx_is_good() + :return: Payload + :rtype: list[int] + """ + if not nocheck and not self.rx_is_good(): + return None + rx_nb_bytes = self.get_rx_nb_bytes() + fifo_rx_current_addr = self.get_fifo_rx_current_addr() + self.set_fifo_addr_ptr(fifo_rx_current_addr) + payload = self.spi.xfer([REG.LORA.FIFO] + [0] * rx_nb_bytes)[1:] + return payload + + def get_freq(self): + """ Get the frequency (MHz) + :return: Frequency in MHz + :rtype: float + """ + msb, mid, lsb = self.spi.xfer([REG.LORA.FR_MSB, 0, 0, 0])[1:] + f = lsb + 256*(mid + 256*msb) + return f / 16384. + + def set_freq(self, f): + """ Set the frequency (MHz) + :param f: Frequency in MHz + "type f: float + :return: New register settings (3 bytes [msb, mid, lsb]) + :rtype: list[int] + """ + assert self.mode == MODE.SLEEP or self.mode == MODE.STDBY or self.mode == MODE.FSK_STDBY + i = int(f * 16384.) # choose floor + msb = i // 65536 + i -= msb * 65536 + mid = i // 256 + i -= mid * 256 + lsb = i + self.current_freq = f + return self.spi.xfer([REG.LORA.FR_MSB | 0x80, msb, mid, lsb]) + + def get_pa_config(self, convert_dBm=False): + v = self.spi.xfer([REG.LORA.PA_CONFIG, 0])[1] + pa_select = v >> 7 + max_power = v >> 4 & 0b111 + output_power = v & 0b1111 + if convert_dBm: + max_power = max_power * .6 + 10.8 + output_power = max_power - (15 - output_power) + return dict( + pa_select = pa_select, + max_power = max_power, + output_power = output_power + ) + + def set_pa_config(self, pa_select=None, max_power=None, output_power=None): + """ Configure the PA + :param pa_select: Selects PA output pin, 0->RFO, 1->PA_BOOST + :param max_power: Select max output power Pmax=10.8+0.6*MaxPower + :param output_power: Output power Pout=Pmax-(15-OutputPower) if PaSelect = 0, + Pout=17-(15-OutputPower) if PaSelect = 1 (PA_BOOST pin) + :return: new register value + """ + loc = locals() + current = self.get_pa_config() + loc = {s: current[s] if loc[s] is None else loc[s] for s in loc} + val = (loc['pa_select'] << 7) | (loc['max_power'] << 4) | (loc['output_power']) + return self.spi.xfer([REG.LORA.PA_CONFIG | 0x80, val])[1] + + @getter(REG.LORA.PA_RAMP) + def get_pa_ramp(self, val): + return val & 0b1111 + + @setter(REG.LORA.PA_RAMP) + def set_pa_ramp(self, val): + return val & 0b1111 + + def get_ocp(self, convert_mA=False): + v = self.spi.xfer([REG.LORA.OCP, 0])[1] + ocp_on = v >> 5 & 0x01 + ocp_trim = v & 0b11111 + if convert_mA: + if ocp_trim <= 15: + ocp_trim = 45. + 5. * ocp_trim + elif ocp_trim <= 27: + ocp_trim = -30. + 10. * ocp_trim + else: + assert ocp_trim <= 27 + return dict( + ocp_on = ocp_on, + ocp_trim = ocp_trim + ) + + def set_ocp_trim(self, I_mA): + assert(I_mA >= 45 and I_mA <= 240) + ocp_on = self.spi.xfer([REG.LORA.OCP, 0])[1] >> 5 & 0x01 + if I_mA <= 120: + v = int(round((I_mA-45.)/5.)) + else: + v = int(round((I_mA+30.)/10.)) + v = set_bit(v, 5, ocp_on) + return self.spi.xfer([REG.LORA.OCP | 0x80, v])[1] + + def get_lna(self): + v = self.spi.xfer([REG.LORA.LNA, 0])[1] + return dict( + lna_gain = v >> 5, + lna_boost_lf = v >> 3 & 0b11, + lna_boost_hf = v & 0b11 + ) + + def set_lna(self, lna_gain=None, lna_boost_lf=None, lna_boost_hf=None): + assert lna_boost_hf is None or lna_boost_hf == 0b00 or lna_boost_hf == 0b11 + self.set_mode(MODE.STDBY) + if lna_gain is not None: + # Apparently agc_auto_on must be 0 in order to set lna_gain + self.set_agc_auto_on(lna_gain == GAIN.NOT_USED) + loc = locals() + current = self.get_lna() + loc = {s: current[s] if loc[s] is None else loc[s] for s in loc} + val = (loc['lna_gain'] << 5) | (loc['lna_boost_lf'] << 3) | (loc['lna_boost_hf']) + retval = self.spi.xfer([REG.LORA.LNA | 0x80, val])[1] + if lna_gain is not None: + # agc_auto_on must track lna_gain: GAIN=NOT_USED -> agc_auto=ON, otherwise =OFF + self.set_agc_auto_on(lna_gain == GAIN.NOT_USED) + return retval + + def set_lna_gain(self, lna_gain): + self.set_lna(lna_gain=lna_gain) + + def get_fifo_addr_ptr(self): + return self.spi.xfer([REG.LORA.FIFO_ADDR_PTR, 0])[1] + + def set_fifo_addr_ptr(self, ptr): + return self.spi.xfer([REG.LORA.FIFO_ADDR_PTR | 0x80, ptr])[1] + + def get_fifo_tx_base_addr(self): + return self.spi.xfer([REG.LORA.FIFO_TX_BASE_ADDR, 0])[1] + + def set_fifo_tx_base_addr(self, ptr): + return self.spi.xfer([REG.LORA.FIFO_TX_BASE_ADDR | 0x80, ptr])[1] + + def get_fifo_rx_base_addr(self): + return self.spi.xfer([REG.LORA.FIFO_RX_BASE_ADDR, 0])[1] + + def set_fifo_rx_base_addr(self, ptr): + return self.spi.xfer([REG.LORA.FIFO_RX_BASE_ADDR | 0x80, ptr])[1] + + def get_fifo_rx_current_addr(self): + return self.spi.xfer([REG.LORA.FIFO_RX_CURR_ADDR, 0])[1] + + def get_fifo_rx_byte_addr(self): + return self.spi.xfer([REG.LORA.FIFO_RX_BYTE_ADDR, 0])[1] + + def get_irq_flags_mask(self): + v = self.spi.xfer([REG.LORA.IRQ_FLAGS_MASK, 0])[1] + return dict( + rx_timeout = v >> 7 & 0x01, + rx_done = v >> 6 & 0x01, + crc_error = v >> 5 & 0x01, + valid_header = v >> 4 & 0x01, + tx_done = v >> 3 & 0x01, + cad_done = v >> 2 & 0x01, + fhss_change_ch = v >> 1 & 0x01, + cad_detected = v >> 0 & 0x01, + ) + + def set_irq_flags_mask(self, + rx_timeout=None, rx_done=None, crc_error=None, valid_header=None, tx_done=None, + cad_done=None, fhss_change_ch=None, cad_detected=None): + loc = locals() + v = self.spi.xfer([REG.LORA.IRQ_FLAGS_MASK, 0])[1] + for i, s in enumerate(['cad_detected', 'fhss_change_ch', 'cad_done', 'tx_done', 'valid_header', + 'crc_error', 'rx_done', 'rx_timeout']): + this_bit = locals()[s] + if this_bit is not None: + v = set_bit(v, i, this_bit) + return self.spi.xfer([REG.LORA.IRQ_FLAGS_MASK | 0x80, v])[1] + + def get_irq_flags(self): + v = self.spi.xfer([REG.LORA.IRQ_FLAGS, 0])[1] + return dict( + rx_timeout = v >> 7 & 0x01, + rx_done = v >> 6 & 0x01, + crc_error = v >> 5 & 0x01, + valid_header = v >> 4 & 0x01, + tx_done = v >> 3 & 0x01, + cad_done = v >> 2 & 0x01, + fhss_change_ch = v >> 1 & 0x01, + cad_detected = v >> 0 & 0x01, + ) + + def set_irq_flags(self, + rx_timeout=None, rx_done=None, crc_error=None, valid_header=None, tx_done=None, + cad_done=None, fhss_change_ch=None, cad_detected=None): + v = self.spi.xfer([REG.LORA.IRQ_FLAGS, 0])[1] + for i, s in enumerate(['cad_detected', 'fhss_change_ch', 'cad_done', 'tx_done', 'valid_header', + 'crc_error', 'rx_done', 'rx_timeout']): + this_bit = locals()[s] + if this_bit is not None: + v = set_bit(v, i, this_bit) + return self.spi.xfer([REG.LORA.IRQ_FLAGS | 0x80, v])[1] + + def clear_irq_flags(self): + v = self.spi.xfer([REG.LORA.IRQ_FLAGS | 0x80, 0])[1] + return v + + def get_rx_nb_bytes(self): + return self.spi.xfer([REG.LORA.RX_NB_BYTES, 0])[1] + + def get_rx_header_cnt(self): + msb, lsb = self.spi.xfer([REG.LORA.RX_HEADER_CNT_MSB, 0, 0])[1:] + return lsb + 256 * msb + + def get_rx_packet_cnt(self): + msb, lsb = self.spi.xfer([REG.LORA.RX_PACKET_CNT_MSB, 0, 0])[1:] + return lsb + 256 * msb + + def get_modem_status(self): + status = self.spi.xfer([REG.LORA.MODEM_STAT, 0])[1] + return dict( + rx_coding_rate = status >> 5 & 0x03, + modem_clear = status >> 4 & 0x01, + header_info_valid = status >> 3 & 0x01, + rx_ongoing = status >> 2 & 0x01, + signal_sync = status >> 1 & 0x01, + signal_detected = status >> 0 & 0x01 + ) + + def get_pkt_snr_value(self): + v = self.spi.xfer([REG.LORA.PKT_SNR_VALUE, 0])[1] + return struct.unpack('b',str(bytearray([v])))[0]/4. + + def get_pkt_rssi_value(self): + v = self.spi.xfer([REG.LORA.PKT_RSSI_VALUE, 0])[1] + if self.current_freq>525.0: + return v - 157 + else: + return v - 164 + + def get_rssi_value(self): + v = self.spi.xfer([REG.LORA.RSSI_VALUE, 0])[1] + if self.current_freq>525.0: + return v - 157 + else: + return v - 164 + + def get_hop_channel(self): + v = self.spi.xfer([REG.LORA.HOP_CHANNEL, 0])[1] + return dict( + pll_timeout = v >> 7, + crc_on_payload = v >> 6 & 0x01, + fhss_present_channel = v >> 5 & 0b111111 + ) + + def get_modem_config_1(self): + val = self.spi.xfer([REG.LORA.MODEM_CONFIG_1, 0])[1] + return dict( + bw = val >> 4 & 0x0F, + coding_rate = val >> 1 & 0x07, + implicit_header_mode = val & 0x01 + ) + + def set_modem_config_1(self, bw=None, coding_rate=None, implicit_header_mode=None): + loc = locals() + current = self.get_modem_config_1() + loc = {s: current[s] if loc[s] is None else loc[s] for s in loc} + val = loc['implicit_header_mode'] | (loc['coding_rate'] << 1) | (loc['bw'] << 4) + return self.spi.xfer([REG.LORA.MODEM_CONFIG_1 | 0x80, val])[1] + + def set_bw(self, bw): + """ Set the bandwidth 0=7.8kHz ... 9=500kHz + :param bw: A number 0,2,3,...,9 + :return: + """ + self.set_modem_config_1(bw=bw) + + def set_coding_rate(self, coding_rate): + """ Set the coding rate 4/5, 4/6, 4/7, 4/8 + :param coding_rate: A number 1,2,3,4 + :return: New register value + """ + self.set_modem_config_1(coding_rate=coding_rate) + + def set_implicit_header_mode(self, implicit_header_mode): + self.set_modem_config_1(implicit_header_mode=implicit_header_mode) + + def get_modem_config_2(self, include_symb_timout_lsb=False): + val = self.spi.xfer([REG.LORA.MODEM_CONFIG_2, 0])[1] + d = dict( + spreading_factor = val >> 4 & 0x0F, + tx_cont_mode = val >> 3 & 0x01, + rx_crc = val >> 2 & 0x01, + ) + if include_symb_timout_lsb: + d['symb_timout_lsb'] = val & 0x03 + return d + + def set_modem_config_2(self, spreading_factor=None, tx_cont_mode=None, rx_crc=None): + loc = locals() + # RegModemConfig2 contains the SymbTimout MSB bits. We tack the back on when writing this register. + current = self.get_modem_config_2(include_symb_timout_lsb=True) + loc = {s: current[s] if loc[s] is None else loc[s] for s in loc} + val = (loc['spreading_factor'] << 4) | (loc['tx_cont_mode'] << 3) | (loc['rx_crc'] << 2) | current['symb_timout_lsb'] + return self.spi.xfer([REG.LORA.MODEM_CONFIG_2 | 0x80, val])[1] + + def set_spreading_factor(self, spreading_factor): + self.set_modem_config_2(spreading_factor=spreading_factor) + + def set_rx_crc(self, rx_crc): + self.set_modem_config_2(rx_crc=rx_crc) + + def get_modem_config_3(self): + val = self.spi.xfer([REG.LORA.MODEM_CONFIG_3, 0])[1] + return dict( + low_data_rate_optim = val >> 3 & 0x01, + agc_auto_on = val >> 2 & 0x01 + ) + + def set_modem_config_3(self, low_data_rate_optim=None, agc_auto_on=None): + loc = locals() + current = self.get_modem_config_3() + loc = {s: current[s] if loc[s] is None else loc[s] for s in loc} + val = (loc['low_data_rate_optim'] << 3) | (loc['agc_auto_on'] << 2) + return self.spi.xfer([REG.LORA.MODEM_CONFIG_3 | 0x80, val])[1] + + @setter(REG.LORA.INVERT_IQ) + def set_invert_iq(self, invert): + """ Invert the LoRa I and Q signals + :param invert: 0: normal mode, 1: I and Q inverted + :return: New value of register + """ + return 0x27 | (invert & 0x01) << 6 + + @getter(REG.LORA.INVERT_IQ) + def get_invert_iq(self, val): + """ Get the invert the I and Q setting + :return: 0: normal mode, 1: I and Q inverted + """ + return (val >> 6) & 0x01 + + def get_agc_auto_on(self): + return self.get_modem_config_3()['agc_auto_on'] + + def set_agc_auto_on(self, agc_auto_on): + self.set_modem_config_3(agc_auto_on=agc_auto_on) + + def get_low_data_rate_optim(self): + return self.set_modem_config_3()['low_data_rate_optim'] + + def set_low_data_rate_optim(self, low_data_rate_optim): + self.set_modem_config_3(low_data_rate_optim=low_data_rate_optim) + + def get_symb_timeout(self): + SYMB_TIMEOUT_MSB = REG.LORA.MODEM_CONFIG_2 + msb, lsb = self.spi.xfer([SYMB_TIMEOUT_MSB, 0, 0])[1:] # the MSB bits are stored in REG.LORA.MODEM_CONFIG_2 + msb = msb & 0b11 + return lsb + 256 * msb + + def set_symb_timeout(self, timeout): + bkup_reg_modem_config_2 = self.spi.xfer([REG.LORA.MODEM_CONFIG_2, 0])[1] + msb = timeout >> 8 & 0b11 # bits 8-9 + lsb = timeout - 256 * msb # bits 0-7 + reg_modem_config_2 = bkup_reg_modem_config_2 & 0xFC | msb # bits 2-7 of bkup_reg_modem_config_2 ORed with the two msb bits + old_msb = self.spi.xfer([REG.LORA.MODEM_CONFIG_2 | 0x80, reg_modem_config_2])[1] & 0x03 + old_lsb = self.spi.xfer([REG.LORA.SYMB_TIMEOUT_LSB | 0x80, lsb])[1] + return old_lsb + 256 * old_msb + + def get_preamble(self): + msb, lsb = self.spi.xfer([REG.LORA.PREAMBLE_MSB, 0, 0])[1:] + return lsb + 256 * msb + + def set_preamble(self, preamble): + msb = preamble >> 8 + lsb = preamble - msb * 256 + old_msb, old_lsb = self.spi.xfer([REG.LORA.PREAMBLE_MSB | 0x80, msb, lsb])[1:] + return old_lsb + 256 * old_msb + + @getter(REG.LORA.PAYLOAD_LENGTH) + def get_payload_length(self, val): + return val + + @setter(REG.LORA.PAYLOAD_LENGTH) + def set_payload_length(self, payload_length): + return payload_length + + @getter(REG.LORA.MAX_PAYLOAD_LENGTH) + def get_max_payload_length(self, val): + return val + + @setter(REG.LORA.MAX_PAYLOAD_LENGTH) + def set_max_payload_length(self, max_payload_length): + return max_payload_length + + @getter(REG.LORA.HOP_PERIOD) + def get_hop_period(self, val): + return val + + @setter(REG.LORA.HOP_PERIOD) + def set_hop_period(self, hop_period): + return hop_period + + def get_fei(self): + msb, mid, lsb = self.spi.xfer([REG.LORA.FEI_MSB, 0, 0, 0])[1:] + msb &= 0x0F + freq_error = lsb + 256 * (mid + 256 * msb) + return freq_error + + @getter(REG.LORA.DETECT_OPTIMIZE) + def get_detect_optimize(self, val): + """ Get LoRa detection optimize setting + :return: detection optimize setting 0x03: SF7-12, 0x05: SF6 + + """ + return val & 0b111 + + @setter(REG.LORA.DETECT_OPTIMIZE) + def set_detect_optimize(self, detect_optimize): + """ Set LoRa detection optimize + :param detect_optimize 0x03: SF7-12, 0x05: SF6 + :return: New register value + """ + assert detect_optimize == 0x03 or detect_optimize == 0x05 + return detect_optimize & 0b111 + + @getter(REG.LORA.DETECTION_THRESH) + def get_detection_threshold(self, val): + """ Get LoRa detection threshold setting + :return: detection threshold 0x0A: SF7-12, 0x0C: SF6 + + """ + return val + + @setter(REG.LORA.DETECTION_THRESH) + def set_detection_threshold(self, detect_threshold): + """ Set LoRa detection optimize + :param detect_threshold 0x0A: SF7-12, 0x0C: SF6 + :return: New register value + """ + assert detect_threshold == 0x0A or detect_threshold == 0x0C + return detect_threshold + + @getter(REG.LORA.SYNC_WORD) + def get_sync_word(self, sync_word): + return sync_word + + @setter(REG.LORA.SYNC_WORD) + def set_sync_word(self, sync_word): + return sync_word + + @getter(REG.LORA.DIO_MAPPING_1) + def get_dio_mapping_1(self, mapping): + """ Get mapping of pins DIO0 to DIO3. Object variable dio_mapping will be set. + :param mapping: Register value + :type mapping: int + :return: Value of the mapping list + :rtype: list[int] + """ + self.dio_mapping = [mapping>>6 & 0x03, mapping>>4 & 0x03, mapping>>2 & 0x03, mapping>>0 & 0x03] \ + + self.dio_mapping[4:6] + return self.dio_mapping + + @setter(REG.LORA.DIO_MAPPING_1) + def set_dio_mapping_1(self, mapping): + """ Set mapping of pins DIO0 to DIO3. Object variable dio_mapping will be set. + :param mapping: Register value + :type mapping: int + :return: New value of the register + :rtype: int + """ + self.dio_mapping = [mapping>>6 & 0x03, mapping>>4 & 0x03, mapping>>2 & 0x03, mapping>>0 & 0x03] \ + + self.dio_mapping[4:6] + return mapping + + @getter(REG.LORA.DIO_MAPPING_2) + def get_dio_mapping_2(self, mapping): + """ Get mapping of pins DIO4 to DIO5. Object variable dio_mapping will be set. + :param mapping: Register value + :type mapping: int + :return: Value of the mapping list + :rtype: list[int] + """ + self.dio_mapping = self.dio_mapping[0:4] + [mapping>>6 & 0x03, mapping>>4 & 0x03] + return self.dio_mapping + + @setter(REG.LORA.DIO_MAPPING_2) + def set_dio_mapping_2(self, mapping): + """ Set mapping of pins DIO4 to DIO5. Object variable dio_mapping will be set. + :param mapping: Register value + :type mapping: int + :return: New value of the register + :rtype: int + """ + assert mapping & 0b00001110 == 0 + self.dio_mapping = self.dio_mapping[0:4] + [mapping>>6 & 0x03, mapping>>4 & 0x03] + return mapping + + def get_dio_mapping(self): + """ Utility function that returns the list of current DIO mappings. Object variable dio_mapping will be set. + :return: List of current DIO mappings + :rtype: list[int] + """ + self.get_dio_mapping_1() + return self.get_dio_mapping_2() + + def set_dio_mapping(self, mapping): + """ Utility function that returns the list of current DIO mappings. Object variable dio_mapping will be set. + :param mapping: DIO mapping list + :type mapping: list[int] + :return: New DIO mapping list + :rtype: list[int] + """ + mapping_1 = (mapping[0] & 0x03) << 6 | (mapping[1] & 0x03) << 4 | (mapping[2] & 0x3) << 2 | mapping[3] & 0x3 + mapping_2 = (mapping[4] & 0x03) << 6 | (mapping[5] & 0x03) << 4 + self.set_dio_mapping_1(mapping_1) + return self.set_dio_mapping_2(mapping_2) + + @getter(REG.LORA.VERSION) + def get_version(self, version): + """ Version code of the chip. + Bits 7-4 give the full revision number; bits 3-0 give the metal mask revision number. + :return: Version code + :rtype: int + """ + return version + + @getter(REG.LORA.TCXO) + def get_tcxo(self, tcxo): + """ Get TCXO or XTAL input setting + 0 -> "XTAL": Crystal Oscillator with external Crystal + 1 -> "TCXO": External clipped sine TCXO AC-connected to XTA pin + :param tcxo: 1=TCXO or 0=XTAL input setting + :return: TCXO or XTAL input setting + :type: int (0 or 1) + """ + return tcxo & 0b00010000 + + @setter(REG.LORA.TCXO) + def set_tcxo(self, tcxo): + """ Make TCXO or XTAL input setting. + 0 -> "XTAL": Crystal Oscillator with external Crystal + 1 -> "TCXO": External clipped sine TCXO AC-connected to XTA pin + :param tcxo: 1=TCXO or 0=XTAL input setting + :return: new TCXO or XTAL input setting + """ + return (tcxo >= 1) << 4 | 0x09 # bits 0-3 must be 0b1001 + + @getter(REG.LORA.PA_DAC) + def get_pa_dac(self, pa_dac): + """ Enables the +20dBm option on PA_BOOST pin + False -> Default value + True -> +20dBm on PA_BOOST when OutputPower=1111 + :return: True/False if +20dBm option on PA_BOOST on/off + :rtype: bool + """ + pa_dac &= 0x07 # only bits 0-2 + if pa_dac == 0x04: + return False + elif pa_dac == 0x07: + return True + else: + raise RuntimeError("Bad PA_DAC value %s" % hex(pa_dac)) + + @setter(REG.LORA.PA_DAC) + def set_pa_dac(self, pa_dac): + """ Enables the +20dBm option on PA_BOOST pin + False -> Default value + True -> +20dBm on PA_BOOST when OutputPower=1111 + :param pa_dac: 1/0 if +20dBm option on PA_BOOST on/off + :return: New pa_dac register value + :rtype: int + """ + return 0x87 if pa_dac else 0x84 + + def rx_chain_calibration(self, freq=868.): + """ Run the image calibration (see Semtech documentation section 4.2.3.8) + :param freq: Frequency for the HF calibration + :return: None + """ + # backup some registers + op_mode_bkup = self.get_mode() + pa_config_bkup = self.get_register(REG.LORA.PA_CONFIG) + freq_bkup = self.get_freq() + # for image calibration device must be in FSK standby mode + self.set_mode(MODE.FSK_STDBY) + # cut the PA + self.set_register(REG.LORA.PA_CONFIG, 0x00) + # calibration for the LF band + image_cal = (self.get_register(REG.FSK.IMAGE_CAL) & 0xBF) | 0x40 + self.set_register(REG.FSK.IMAGE_CAL, image_cal) + while (self.get_register(REG.FSK.IMAGE_CAL) & 0x20) == 0x20: + pass + # Set a Frequency in HF band + self.set_freq(freq) + # calibration for the HF band + image_cal = (self.get_register(REG.FSK.IMAGE_CAL) & 0xBF) | 0x40 + self.set_register(REG.FSK.IMAGE_CAL, image_cal) + while (self.get_register(REG.FSK.IMAGE_CAL) & 0x20) == 0x20: + pass + # put back the saved parameters + self.set_mode(op_mode_bkup) + self.set_register(REG.LORA.PA_CONFIG, pa_config_bkup) + self.set_freq(freq_bkup) + + def dump_registers(self): + """ Returns a list of [reg_addr, reg_name, reg_value] tuples. Chip is put into mode SLEEP. + :return: List of [reg_addr, reg_name, reg_value] tuples + :rtype: list[tuple] + """ + self.set_mode(MODE.SLEEP) + values = self.get_all_registers() + skip_set = set([REG.LORA.FIFO]) + result_list = [] + for i, s in REG.LORA.lookup.iteritems(): + if i in skip_set: + continue + v = values[i] + result_list.append((i, s, v)) + return result_list + + def get_register(self, register_address): + return self.spi.xfer([register_address & 0x7F, 0])[1] + + def set_register(self, register_address, val): + return self.spi.xfer([register_address | 0x80, val])[1] + + def get_all_registers(self): + # read all registers + reg = [0] + self.spi.xfer([1]+[0]*0x3E)[1:] + self.mode = reg[1] + return reg + + def __del__(self): + self.set_mode(MODE.SLEEP) + if self.verbose: + sys.stderr.write("MODE=SLEEP\n") + + def __str__(self): + # don't use __str__ while in any mode other that SLEEP or STDBY + assert(self.mode == MODE.SLEEP or self.mode == MODE.STDBY) + + onoff = lambda i: 'ON' if i else 'OFF' + f = self.get_freq() + cfg1 = self.get_modem_config_1() + cfg2 = self.get_modem_config_2() + cfg3 = self.get_modem_config_3() + pa_config = self.get_pa_config(convert_dBm=True) + ocp = self.get_ocp(convert_mA=True) + lna = self.get_lna() + s = "SX127x LoRa registers:\n" + s += " mode %s\n" % MODE.lookup[self.get_mode()] + s += " freq %f MHz\n" % f + s += " coding_rate %s\n" % CODING_RATE.lookup[cfg1['coding_rate']] + s += " bw %s\n" % BW.lookup[cfg1['bw']] + s += " spreading_factor %s chips/symb\n" % (1 << cfg2['spreading_factor']) + s += " implicit_hdr_mode %s\n" % onoff(cfg1['implicit_header_mode']) + s += " rx_payload_crc %s\n" % onoff(cfg2['rx_crc']) + s += " tx_cont_mode %s\n" % onoff(cfg2['tx_cont_mode']) + s += " preamble %d\n" % self.get_preamble() + s += " low_data_rate_opti %s\n" % onoff(cfg3['low_data_rate_optim']) + s += " agc_auto_on %s\n" % onoff(cfg3['agc_auto_on']) + s += " symb_timeout %s\n" % self.get_symb_timeout() + s += " freq_hop_period %s\n" % self.get_hop_period() + s += " hop_channel %s\n" % self.get_hop_channel() + s += " payload_length %s\n" % self.get_payload_length() + s += " max_payload_length %s\n" % self.get_max_payload_length() + s += " irq_flags_mask %s\n" % self.get_irq_flags_mask() + s += " irq_flags %s\n" % self.get_irq_flags() + s += " rx_nb_byte %d\n" % self.get_rx_nb_bytes() + s += " rx_header_cnt %d\n" % self.get_rx_header_cnt() + s += " rx_packet_cnt %d\n" % self.get_rx_packet_cnt() + s += " pkt_snr_value %f\n" % self.get_pkt_snr_value() + s += " pkt_rssi_value %d\n" % self.get_pkt_rssi_value() + s += " rssi_value %d\n" % self.get_rssi_value() + s += " fei %d\n" % self.get_fei() + s += " pa_select %s\n" % PA_SELECT.lookup[pa_config['pa_select']] + s += " max_power %f dBm\n" % pa_config['max_power'] + s += " output_power %f dBm\n" % pa_config['output_power'] + s += " ocp %s\n" % onoff(ocp['ocp_on']) + s += " ocp_trim %f mA\n" % ocp['ocp_trim'] + s += " lna_gain %s\n" % GAIN.lookup[lna['lna_gain']] + s += " lna_boost_lf %s\n" % bin(lna['lna_boost_lf']) + s += " lna_boost_hf %s\n" % bin(lna['lna_boost_hf']) + s += " detect_optimize %#02x\n" % self.get_detect_optimize() + s += " detection_thresh %#02x\n" % self.get_detection_threshold() + s += " sync_word %#02x\n" % self.get_sync_word() + s += " dio_mapping 0..5 %s\n" % self.get_dio_mapping() + s += " tcxo %s\n" % ['XTAL', 'TCXO'][self.get_tcxo()] + s += " pa_dac %s\n" % ['default', 'PA_BOOST'][self.get_pa_dac()] + s += " fifo_addr_ptr %#02x\n" % self.get_fifo_addr_ptr() + s += " fifo_tx_base_addr %#02x\n" % self.get_fifo_tx_base_addr() + s += " fifo_rx_base_addr %#02x\n" % self.get_fifo_rx_base_addr() + s += " fifo_rx_curr_addr %#02x\n" % self.get_fifo_rx_current_addr() + s += " fifo_rx_byte_addr %#02x\n" % self.get_fifo_rx_byte_addr() + s += " status %s\n" % self.get_modem_status() + s += " version %#02x\n" % self.get_version() + return s diff --git a/tx/SX127x/LoRaArgumentParser.py b/tx/SX127x/LoRaArgumentParser.py new file mode 100755 index 0000000..0e2451e --- /dev/null +++ b/tx/SX127x/LoRaArgumentParser.py @@ -0,0 +1,75 @@ +""" Defines LoRaArgumentParser which extends argparse.ArgumentParser with standard config parameters for the SX127x. """ + +# Copyright 2015 Mayer Analytics Ltd. +# +# This file is part of pySX127x. +# +# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public +# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You can be released from the requirements of the license by obtaining a commercial license. Such a license is +# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your +# own applications, or shipping pySX127x with a closed source product. +# +# You should have received a copy of the GNU General Public License along with pySX127. If not, see +# . + + +import argparse + + +class LoRaArgumentParser(argparse.ArgumentParser): + """ This class extends argparse.ArgumentParser. + Some commonly used LoRa config parameters are defined + * ocp + * spreading factor + * frequency + * bandwidth + * preamble + Call the parse_args with an additional parameter referencing a LoRa object. The args will be used to configure + the LoRa. + """ + + bw_lookup = dict(BW7_8=0, BW10_4=1, BW15_6=2, BW20_8=3, BW31_25=4, BW41_7=5, BW62_5=6, BW125=7, BW250=8, BW500=9) + cr_lookup = dict(CR4_5=1, CR4_6=2,CR4_7=3,CR4_8=4) + + def __init__(self, description): + argparse.ArgumentParser.__init__(self, description=description) + self.add_argument('--ocp', '-c', dest='ocp', default=100, action="store", type=float, + help="Over current protection in mA (45 .. 240 mA)") + self.add_argument('--sf', '-s', dest='sf', default=7, action="store", type=int, + help="Spreading factor (6...12). Default is 7.") + self.add_argument('--freq', '-f', dest='freq', default=869., action="store", type=float, + help="Frequency") + self.add_argument('--bw', '-b', dest='bw', default='BW125', action="store", type=str, + help="Bandwidth (one of BW7_8 BW10_4 BW15_6 BW20_8 BW31_25 BW41_7 BW62_5 BW125 BW250 BW500).\nDefault is BW125.") + self.add_argument('--cr', '-r', dest='coding_rate', default='CR4_5', action="store", type=str, + help="Coding rate (one of CR4_5 CR4_6 CR4_7 CR4_8).\nDefault is CR4_5.") + self.add_argument('--preamble', '-p', dest='preamble', default=8, action="store", type=int, + help="Preamble length. Default is 8.") + + def parse_args(self, lora): + """ Parse the args, perform some sanity checks and configure the LoRa accordingly. + :param lora: Reference to LoRa object + :return: args + """ + args = argparse.ArgumentParser.parse_args(self) + args.bw = self.bw_lookup.get(args.bw, None) + args.coding_rate = self.cr_lookup.get(args.coding_rate, None) + # some sanity checks + assert(args.bw is not None) + assert(args.coding_rate is not None) + assert(args.sf >=6 and args.sf <= 12) + # set the LoRa object + lora.set_freq(args.freq) + lora.set_preamble(args.preamble) + lora.set_spreading_factor(args.sf) + lora.set_bw(args.bw) + lora.set_coding_rate(args.coding_rate) + lora.set_ocp_trim(args.ocp) + return args diff --git a/tx/SX127x/__init__.py b/tx/SX127x/__init__.py new file mode 100755 index 0000000..78c5d36 --- /dev/null +++ b/tx/SX127x/__init__.py @@ -0,0 +1 @@ +__all__ = ['SX127x'] diff --git a/tx/SX127x/board_config.py b/tx/SX127x/board_config.py new file mode 100755 index 0000000..7d15a49 --- /dev/null +++ b/tx/SX127x/board_config.py @@ -0,0 +1,124 @@ +""" Defines the BOARD class that contains the board pin mappings. """ + +# Copyright 2015 Mayer Analytics Ltd. +# +# This file is part of pySX127x. +# +# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public +# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You can be released from the requirements of the license by obtaining a commercial license. Such a license is +# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your +# own applications, or shipping pySX127x with a closed source product. +# +# You should have received a copy of the GNU General Public License along with pySX127. If not, see +# . + + +import RPi.GPIO as GPIO +import spidev + +import time + + +class BOARD: + """ Board initialisation/teardown and pin configuration is kept here. + This is the Raspberry Pi board with one LED and a modtronix inAir9B + """ + # Note that the BCOM numbering for the GPIOs is used. + DIO0 = 22 # RaspPi GPIO 21 + DIO1 = 23 # RaspPi GPIO 22 + DIO2 = 24 # RaspPi GPIO 23 + DIO3 = 25 # RaspPi GPIO 24 + LED = 18 # RaspPi GPIO 18 connects to the LED on the proto shield + SWITCH = 4 # RaspPi GPIO 4 connects to a switch + + # The spi object is kept here + spi = None + + @staticmethod + def setup(): + """ Configure the Raspberry GPIOs + :rtype : None + """ + GPIO.setmode(GPIO.BCM) + # LED + GPIO.setup(BOARD.LED, GPIO.OUT) + GPIO.output(BOARD.LED, 0) + # switch + GPIO.setup(BOARD.SWITCH, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) + # DIOx + for gpio_pin in [BOARD.DIO0, BOARD.DIO1, BOARD.DIO2, BOARD.DIO3]: + GPIO.setup(gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) + # blink 2 times to signal the board is set up + BOARD.blink(.1, 2) + + @staticmethod + def teardown(): + """ Cleanup GPIO and SpiDev """ + GPIO.cleanup() + BOARD.spi.close() + + @staticmethod + def SpiDev(): + """ Init and return the SpiDev object + :return: SpiDev object + :rtype: SpiDev + """ + BOARD.spi = spidev.SpiDev() + BOARD.spi.open(0, 0) + return BOARD.spi + + @staticmethod + def add_event_detect(dio_number, callback): + """ Wraps around the GPIO.add_event_detect function + :param dio_number: DIO pin 0...5 + :param callback: The function to call when the DIO triggers an IRQ. + :return: None + """ + GPIO.add_event_detect(dio_number, GPIO.RISING, callback=callback) + + @staticmethod + def add_events(cb_dio0, cb_dio1, cb_dio2, cb_dio3, cb_dio4, cb_dio5, switch_cb=None): + BOARD.add_event_detect(BOARD.DIO0, callback=cb_dio0) + BOARD.add_event_detect(BOARD.DIO1, callback=cb_dio1) + BOARD.add_event_detect(BOARD.DIO2, callback=cb_dio2) + BOARD.add_event_detect(BOARD.DIO3, callback=cb_dio3) + # the modtronix inAir9B does not expose DIO4 and DIO5 + if switch_cb is not None: + GPIO.add_event_detect(BOARD.SWITCH, GPIO.RISING, callback=switch_cb, bouncetime=300) + + @staticmethod + def led_on(value=1): + """ Switch the proto shields LED + :param value: 0/1 for off/on. Default is 1. + :return: value + :rtype : int + """ + GPIO.output(BOARD.LED, value) + return value + + @staticmethod + def led_off(): + """ Switch LED off + :return: 0 + """ + GPIO.output(BOARD.LED, 0) + return 0 + + @staticmethod + def blink(time_sec, n_blink): + if n_blink == 0: + return + BOARD.led_on() + for i in range(n_blink): + time.sleep(time_sec) + BOARD.led_off() + time.sleep(time_sec) + BOARD.led_on() + BOARD.led_off() diff --git a/tx/SX127x/constants.py b/tx/SX127x/constants.py new file mode 100755 index 0000000..1368fed --- /dev/null +++ b/tx/SX127x/constants.py @@ -0,0 +1,190 @@ +""" Defines constants (modes, bandwidths, registers, etc.) needed by SX127x. """ + + +# Copyright 2015 Mayer Analytics Ltd. +# +# This file is part of pySX127x. +# +# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public +# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You can be released from the requirements of the license by obtaining a commercial license. Such a license is +# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your +# own applications, or shipping pySX127x with a closed source product. +# +# You should have received a copy of the GNU General Public License along with pySX127. If not, see +# . + + +def add_lookup(cls): + """ A decorator that adds a lookup dictionary to the class. + The lookup dictionary maps the codes back to the names. This is used for pretty-printing. """ + varnames = filter(str.isupper, cls.__dict__.keys()) + lookup = dict(map(lambda varname: (cls.__dict__.get(varname, None), varname), varnames)) + setattr(cls, 'lookup', lookup) + return cls + + +@add_lookup +class MODE: + SLEEP = 0x80 + STDBY = 0x81 + FSTX = 0x82 + TX = 0x83 + FSRX = 0x84 + RXCONT = 0x85 + RXSINGLE = 0x86 + CAD = 0x87 + FSK_STDBY= 0x01 # needed for calibration + + +@add_lookup +class BW: + BW7_8 = 0 + BW10_4 = 1 + BW15_6 = 2 + BW20_8 = 3 + BW31_25 = 4 + BW41_7 = 5 + BW62_5 = 6 + BW125 = 7 + BW250 = 8 + BW500 = 9 + + +@add_lookup +class CODING_RATE: + CR4_5 = 1 + CR4_6 = 2 + CR4_7 = 3 + CR4_8 = 4 + + +@add_lookup +class GAIN: + NOT_USED = 0b000 + G1 = 0b001 + G2 = 0b010 + G3 = 0b011 + G4 = 0b100 + G5 = 0b101 + G6 = 0b110 + + +@add_lookup +class PA_SELECT: + RFO = 0 + PA_BOOST = 1 + + +@add_lookup +class PA_RAMP: + RAMP_3_4_ms = 0 + RAMP_2_ms = 1 + RAMP_1_ms = 2 + RAMP_500_us = 3 + RAMP_250_us = 4 + RAMP_125_us = 5 + RAMP_100_us = 6 + RAMP_62_us = 7 + RAMP_50_us = 8 + RAMP_40_us = 9 + RAMP_31_us = 10 + RAMP_25_us = 11 + RAMP_20_us = 12 + RAMP_15_us = 13 + RAMP_12_us = 14 + RAMP_10_us = 15 + + +class MASK: + class IRQ_FLAGS: + RxTimeout = 7 + RxDone = 6 + PayloadCrcError = 5 + ValidHeader = 4 + TxDone = 3 + CadDone = 2 + FhssChangeChannel = 1 + CadDetected = 0 + + +class REG: + + @add_lookup + class LORA: + FIFO = 0x00 + OP_MODE = 0x01 + FR_MSB = 0x06 + FR_MID = 0x07 + FR_LSB = 0x08 + PA_CONFIG = 0x09 + PA_RAMP = 0x0A + OCP = 0x0B + LNA = 0x0C + FIFO_ADDR_PTR = 0x0D + FIFO_TX_BASE_ADDR = 0x0E + FIFO_RX_BASE_ADDR = 0x0F + FIFO_RX_CURR_ADDR = 0x10 + IRQ_FLAGS_MASK = 0x11 + IRQ_FLAGS = 0x12 + RX_NB_BYTES = 0x13 + RX_HEADER_CNT_MSB = 0x14 + RX_PACKET_CNT_MSB = 0x16 + MODEM_STAT = 0x18 + PKT_SNR_VALUE = 0x19 + PKT_RSSI_VALUE = 0x1A + RSSI_VALUE = 0x1B + HOP_CHANNEL = 0x1C + MODEM_CONFIG_1 = 0x1D + MODEM_CONFIG_2 = 0x1E + SYMB_TIMEOUT_LSB = 0x1F + PREAMBLE_MSB = 0x20 + PAYLOAD_LENGTH = 0x22 + MAX_PAYLOAD_LENGTH = 0x22 + HOP_PERIOD = 0x24 + FIFO_RX_BYTE_ADDR = 0x25 + MODEM_CONFIG_3 = 0x26 + PPM_CORRECTION = 0x27 + FEI_MSB = 0x28 + DETECT_OPTIMIZE = 0X31 + INVERT_IQ = 0x33 + DETECTION_THRESH = 0X37 + SYNC_WORD = 0X39 + DIO_MAPPING_1 = 0x40 + DIO_MAPPING_2 = 0x41 + VERSION = 0x42 + TCXO = 0x4B + PA_DAC = 0x4D + AGC_REF = 0x61 + AGC_THRESH_1 = 0x62 + AGC_THRESH_2 = 0x63 + AGC_THRESH_3 = 0x64 + PLL = 0x70 + + @add_lookup + class FSK: + LNA = 0x0C + RX_CONFIG = 0x0D + RSSI_CONFIG = 0x0E + PREAMBLE_DETECT = 0x1F + OSC = 0x24 + SYNC_CONFIG = 0x27 + SYNC_VALUE_1 = 0x28 + SYNC_VALUE_2 = 0x29 + SYNC_VALUE_3 = 0x2A + SYNC_VALUE_4 = 0x2B + SYNC_VALUE_5 = 0x2C + SYNC_VALUE_6 = 0x2D + SYNC_VALUE_7 = 0x2E + SYNC_VALUE_8 = 0x2F + PACKET_CONFIG_1 = 0x30 + FIFO_THRESH = 0x35 + IMAGE_CAL = 0x3B + DIO_MAPPING_1 = 0x40 + DIO_MAPPING_2 = 0x41 diff --git a/tx/SX127x/hardware_piloragateway.py b/tx/SX127x/hardware_piloragateway.py new file mode 100755 index 0000000..4e8d6a8 --- /dev/null +++ b/tx/SX127x/hardware_piloragateway.py @@ -0,0 +1,121 @@ +""" Defines the BOARD class that contains the board pin mappings. """ + +# Copyright 2015 Mark Jessop +# +# This file is part of pySX127x. +# +# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public +# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You can be released from the requirements of the license by obtaining a commercial license. Such a license is +# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your +# own applications, or shipping pySX127x with a closed source product. +# +# You should have received a copy of the GNU General Public License along with pySX127x. If not, see +# . + + +import RPi.GPIO as GPIO +import spidev +import time + + +class HardwareInterface(object): + """ Board initialisation/teardown and pin configuration is kept here. + This is the HabSupplies PiLoraGateway v2.4 Shield. + Schematic for this board is here: https://github.com/PiInTheSky/pits-hardware/blob/master/PiLoraGatewayV2.4.pdf + Only the DIO0 and DIO5 pins are wired up + """ + # Note that the BCOM numbering for the GPIOs is used. + + # The spi object is kept here + spi_device = 0 + spi = None + spi_speed = 1000000 + + def __init__(self, device=0): + """ Configure the Raspberry GPIOs + :rtype : None + """ + self.spi_device = device + GPIO.setmode(GPIO.BCM) + + if device == 0: + self.LED = 5 + self.DIO0 = 25 + self.DIO5 = 24 + else: + self.LED = 21 + self.DIO0 = 16 + self.DIO5 = 12 + + GPIO.setup(self.LED, GPIO.OUT) + GPIO.output(self.LED, 0) + # DIOx + for gpio_pin in [self.DIO0, self.DIO5]: + GPIO.setup(gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) + # blink 2 times to signal the board is set up + self.blink(.1, 2) + + def teardown(self): + """ Cleanup GPIO and SpiDev """ + GPIO.cleanup() + self.spi.close() + + def SpiDev(self): + """ Init and return the SpiDev object + :return: SpiDev object + :rtype: SpiDev + """ + self.spi = spidev.SpiDev() + self.spi.open(0, self.spi_device) + self.spi.max_speed_hz = self.spi_speed + return self.spi + + def add_event_detect(self,dio_number, callback): + """ Wraps around the GPIO.add_event_detect function + :param dio_number: DIO pin 0...5 + :param callback: The function to call when the DIO triggers an IRQ. + :return: None + """ + GPIO.add_event_detect(dio_number, GPIO.RISING, callback=callback) + + def add_events(self,cb_dio0, cb_dio1, cb_dio2, cb_dio3, cb_dio4, cb_dio5, switch_cb=None): + return + #self.add_event_detect(self.DIO0, callback=cb_dio0) + #self.add_event_detect(self.DIO5, callback=cb_dio5) + + def led_on(self,value=1): + """ Switch the proto shields LED + :param value: 0/1 for off/on. Default is 1. + :return: value + :rtype : int + """ + GPIO.output(self.LED, value) + return value + + def led_off(self): + """ Switch LED off + :return: 0 + """ + GPIO.output(self.LED, 0) + return 0 + + def blink(self,time_sec, n_blink): + if n_blink == 0: + return + self.led_on() + for i in range(n_blink): + time.sleep(time_sec) + self.led_off() + time.sleep(time_sec) + self.led_on() + self.led_off() + + def read_gpio(self): + return (GPIO.input(self.DIO0),GPIO.input(self.DIO5)) diff --git a/tx/SX127x/hardware_spibridge.py b/tx/SX127x/hardware_spibridge.py new file mode 100644 index 0000000..7e36c1e --- /dev/null +++ b/tx/SX127x/hardware_spibridge.py @@ -0,0 +1,95 @@ +""" Arduino SPI Bridge Hardware backend. """ + +# Copyright 2015 Mark Jessop +# +# This file is part of pySX127x. +# +# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public +# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You can be released from the requirements of the license by obtaining a commercial license. Such a license is +# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your +# own applications, or shipping pySX127x with a closed source product. +# +# You should have received a copy of the GNU General Public License along with pySX127x. If not, see +# . + + +import time,thread +from spibridge import SPIBridge + + +class HardwareInterface(object): + """ Special HardwareInterface object for the Arduino SPI Bridge code. + This is different in that we have to poll for interrupt flags. + Board initialisation/teardown and pin configuration is kept here. + Only the DIO0 and DIO5 pins are wired up on these Arduino shields + """ + # The object is kept here + spi = None + + def __init__(self,port="/dev/ttyUSB0",baud=57600): + """ Configure the Raspberry GPIOs + :rtype : None + """ + self.spi = SPIBridge(port,baud) + + # blink 2 times to signal the board is set up + self.blink(.1, 2) + + def teardown(self): + """ Cleanup Serial Instance """ + self.spi.close() + + def SpiDev(self): + """ Init and return the SpiDev object + :return: SpiDev object + :rtype: SpiDev + """ + return self.spi + + def add_event_detect(self,dio_number, callback): + """ Wraps around the GPIO.add_event_detect function + :param dio_number: DIO pin 0...5 + :param callback: The function to call when the DIO triggers an IRQ. + :return: None + """ + pass + + def add_events(self,cb_dio0, cb_dio1, cb_dio2, cb_dio3, cb_dio4, cb_dio5, switch_cb=None): + pass + + def led_on(self,value=1): + """ Switch the proto shields LED + :param value: 0/1 for off/on. Default is 1. + :return: value + :rtype : int + """ + self.spi.set_led(1) + return value + + def led_off(self): + """ Switch LED off + :return: 0 + """ + self.spi.set_led(0) + return 0 + + def blink(self,time_sec, n_blink): + if n_blink == 0: + return + self.led_on() + for i in range(n_blink): + time.sleep(time_sec) + self.led_off() + time.sleep(time_sec) + self.led_on() + self.led_off() + + def read_gpio(self): + return self.spi.read_gpio() \ No newline at end of file diff --git a/tx/SX127x/spibridge.py b/tx/SX127x/spibridge.py new file mode 100644 index 0000000..4387b7a --- /dev/null +++ b/tx/SX127x/spibridge.py @@ -0,0 +1,186 @@ +""" Arduino SPI Bridge Object """ + +# Copyright 2015 Mark Jessop +# +# This file is part of pySX127x. +# +# pySX127x is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public +# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# pySX127x is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You can be released from the requirements of the license by obtaining a commercial license. Such a license is +# mandatory as soon as you develop commercial activities involving pySX127x without disclosing the source code of your +# own applications, or shipping pySX127x with a closed source product. +# +# You should have received a copy of the GNU General Public License along with pySX127x. If not, see +# . + +import serial, time, struct + +class SPIBridge(object): + + ser = None + + # Binary protocol stuff. + sync = 0xABCD + OPCODE_VERSION = 0x00 + OPCODE_SPI_TXFR = 0x01 + OPCODE_LED = 0x02 + OPCODE_READ_GPIO = 0x03 + + def __init__(self, serialport="/dev/ttyUSB0",serialbaud=57600): + self.ser = serial.Serial(serialport, serialbaud, timeout=1) + print "Waiting for Arduino to boot..." + time.sleep(2) + if "SPIBridge" in self.read_version(): + print "Connected OK!" + else: + print "Could not connect to SPI Bridge." + + + def close(self): + self.ser.close() + + # CRC16 Functions + crc16tab = [ + 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7, + 0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef, + 0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6, + 0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de, + 0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485, + 0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d, + 0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4, + 0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc, + 0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823, + 0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b, + 0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12, + 0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a, + 0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41, + 0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49, + 0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70, + 0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78, + 0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f, + 0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067, + 0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e, + 0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256, + 0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d, + 0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405, + 0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c, + 0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634, + 0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab, + 0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3, + 0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a, + 0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92, + 0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9, + 0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1, + 0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8, + 0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0 + ] + + def crc16_floating(self,next_byte, seed): + return ((seed << 8) ^ self.crc16tab[(seed >> 8) ^ (ord(next_byte) & 0x00FF)])\ + & 0xFFFF + + def crc16_buff(self,buff): + crc = 0xFFFF + for ch in buff: + crc = self.crc16_floating(ch, crc) + return crc + + def read_version(self): + temp = struct.pack('>BH', self.OPCODE_VERSION, 0x0000) + crc = self.crc16_buff(temp) + tx_data = struct.pack('>H3sH', self.sync, temp, crc) + + if(self.ser.inWaiting()>0): + rx_data = self.ser.read(ser.inWaiting()) + #print "Data in RX Buffer:" + ':'.join(x.encode('hex') for x in rx_data) + # Send! + self.ser.write(tx_data) + rx_data = self.ser.read(24) + + if(ord(rx_data[2])==self.OPCODE_VERSION): + return str(ord(rx_data[5])) + "." + str(ord(rx_data[6])) + "." + str(ord(rx_data[7])) + " " + rx_data[8:-2] + else: + return "Unknown" + + def xfer(self,data): + if(len(data)>1024): + data = data[:1024] + + temp = struct.pack('>BH', self.OPCODE_SPI_TXFR, len(data)) + str(bytearray(data)) + crc = self.crc16_buff(temp) + tx_data = struct.pack('>H', self.sync) + temp + struct.pack('>H', crc) + #print "Data in TX Buffer:" + ':'.join(x.encode('hex') for x in tx_data) + if(self.ser.inWaiting()>0): + rx_data = self.ser.read(ser.inWaiting()) + #print "Data in RX Buffer:" + ':'.join(x.encode('hex') for x in rx_data) + + self.ser.write(tx_data) + #time.sleep(1) + #print str(self.ser.inWaiting()) + " waiting." + rx_data = self.ser.read(len(tx_data)) + #print "SPI RX Data:" + ':'.join(x.encode('hex') for x in rx_data) + #print rx_data + + calc_crc = self.crc16_buff(rx_data[2:-2]) + if(struct.pack('>H', calc_crc) == rx_data[-2:]): + #print "Checksum Match" + pass + else: + #print "Checksum Fail" + pass + if(ord(rx_data[2]) == self.OPCODE_SPI_TXFR): + return list(bytearray(rx_data[5:-2])) + else: + print "No Opcode Found!" + return [0] + + def set_led(self,value=1): + opcode = self.OPCODE_LED + payload_length = 0x0001 + payload = 0x00 + if(value==1): + payload = 0x01 + + temp = struct.pack('>BHB',opcode,payload_length,payload) + + crc = self.crc16_buff(temp) + + tx_data = struct.pack('>H4sH', self.sync, temp, crc) + if(self.ser.inWaiting()>0): + rx_data = self.ser.read(ser.inWaiting()) + #print "Data in RX Buffer:" + ':'.join(x.encode('hex') for x in rx_data) + self.ser.write(tx_data) + #print "Data in TX Buffer:" + ':'.join(x.encode('hex') for x in tx_data) + rx_data = self.ser.read(len(tx_data)) + if(rx_data == tx_data): + return value + + def read_gpio(self): + temp = struct.pack('>BH', self.OPCODE_READ_GPIO, 0x0000) + crc = self.crc16_buff(temp) + tx_data = struct.pack('>H3sH', self.sync, temp, crc) + + if(self.ser.inWaiting()>0): + rx_data = self.ser.read(self.ser.inWaiting()) + #print "Data in RX Buffer:" + ':'.join(x.encode('hex') for x in rx_data) + # Send! + self.ser.write(tx_data) + #print "Data in TX Buffer:" + ':'.join(x.encode('hex') for x in tx_data) + rx_data = self.ser.read(9) + #print "Data in RX Buffer:" + ':'.join(x.encode('hex') for x in rx_data) + + if(ord(rx_data[2])==self.OPCODE_READ_GPIO): + return (ord(rx_data[5]),ord(rx_data[6])) + else: + return (-1,-1) + +if __name__ == "__main__": + spi = SPIBridge() + print spi.read_version() + spi.close() \ No newline at end of file diff --git a/tx/WenetPiCam.py b/tx/WenetPiCam.py index 0c8a45b..c64b9f6 100644 --- a/tx/WenetPiCam.py +++ b/tx/WenetPiCam.py @@ -274,7 +274,7 @@ class WenetPiCam(object): return # Inform ground station we are about to send an image. - self.debug_message("Transmitting %d PiCam SSDV Packets." % (file_size/256)) + self.debug_message("Transmitting %d PiCam SSDV Packets." % (file_size//256)) # Push SSDV file into transmit queue. tx.queue_image_file(ssdv_filename) diff --git a/tx/ldpc_encoder.py b/tx/ldpc_encoder.py index 4719afe..08618c7 100644 --- a/tx/ldpc_encoder.py +++ b/tx/ldpc_encoder.py @@ -10,6 +10,7 @@ # Released under GNU GPL v3 or later # +import codecs import ctypes from numpy.ctypeslib import ndpointer import numpy as np @@ -38,21 +39,22 @@ except OSError as e: # Accepts a 258 byte string as input, returns the LDPC parity bits. # -def ldpc_encode_string(payload, Nibits = 2064, Npbits = 516): +def ldpc_encode(payload, Nibits = 2064, Npbits = 516): if len(payload) != 258: raise TypeError("Payload MUST be 258 bytes in length! (2064 bit codeword)") # Get input data into the right form (list of 0s and 1s) - ibits = np.unpackbits(np.fromstring(payload,dtype=np.uint8)).astype(np.uint8) + ibits = np.unpackbits(np.frombuffer(payload,dtype=np.uint8)).astype(np.uint8) pbits = np.zeros(Npbits).astype(np.uint8) _ldpc_enc.encode(ibits, pbits) - return np.packbits(np.array(list(pbits)).astype(np.uint8)).tostring() + return np.packbits(np.array(list(pbits)).astype(np.uint8)).tobytes() # -# Interleaver functions +# Interleaver functions - Note that these are not used in Wenet right now. +# These also may not work under Python 3 # # These variables need to be synchronised with those in ldpc_enc.c, until i figure out a better way @@ -113,26 +115,26 @@ def interleave_test(): def generate_dummy_packet(): payload = np.arange(0,256,1) - payload = np.append(payload,[0,0]).astype(np.uint8).tostring() # Add on dummy checksum, for a total of 258 bytes. + payload = np.append(payload,[0,0]).astype(np.uint8).tobytes() # Add on dummy checksum, for a total of 258 bytes. return payload def main(): # Generate a dummy test packet, and convert it to an array of 0 and 1. payload = generate_dummy_packet() - print("Input (hex): %s" % ("".join("{:02x}".format(ord(c)) for c in payload))) + print("Input (hex):" + codecs.encode(payload,'hex').decode()) # Now run ldpc_encode over it X times. parity = "" start = time.time() - for x in xrange(1000): + for x in range(1000): #print(x) parity = ldpc_encode(payload) stop = time.time() print("time delta: %.3f" % (stop-start)) - print("LDPC Parity Bits (hex): %s" % ("".join("{:02x}".format(ord(c)) for c in parity))) + print("LDPC Parity Bits (hex):" + codecs.encode(parity,'hex').decode()) print("Done!") # Some basic test code. diff --git a/tx/tx_known_sequence.py b/tx/tx_known_sequence.py index 3b0d1e5..682a2b3 100644 --- a/tx/tx_known_sequence.py +++ b/tx/tx_known_sequence.py @@ -10,7 +10,7 @@ import PacketTX, sys, os, time import numpy as np -payload = np.arange(0,256,1).astype(np.uint8).tostring() # 0->255 +payload = np.arange(0,256,1).astype(np.uint8).tobytes() # 0->255 debug_output = False # If True, packet bits are saved to debug.bin as one char per bit. diff --git a/tx/tx_picam_gps.py b/tx/tx_picam_gps.py index ebc455a..71742b8 100644 --- a/tx/tx_picam_gps.py +++ b/tx/tx_picam_gps.py @@ -151,8 +151,8 @@ def post_process_image(filename): # Finally, initialise the PiCam capture object. -picam = WenetPiCam.WenetPiCam(src_resolution=(1920,1088), - tx_resolution=(1920,1088), +picam = WenetPiCam.WenetPiCam(src_resolution=(3280,2464), + tx_resolution=(1488,1120), callsign=callsign, num_images=5, debug_ptr=tx.transmit_text_message, diff --git a/tx/tx_test_images.py b/tx/tx_test_images.py index 3032fb3..ab3334d 100644 --- a/tx/tx_test_images.py +++ b/tx/tx_test_images.py @@ -11,7 +11,7 @@ import PacketTX, sys, os, argparse # Set to whatever resolution you want to test. file_path = "../test_images/%d_raw.bin" # _raw, _800x608, _640x480, _320x240 -image_numbers = xrange(1,14) +image_numbers = range(1,14) debug_output = False # If True, packet bits are saved to debug.bin as one char per bit. @@ -22,11 +22,11 @@ def transmit_file(filename, tx_object): print("File size not a multiple of 256 bytes!") return - print("Transmitting %d Packets." % (file_size/256)) + print("Transmitting %d Packets." % (file_size//256)) f = open(filename,'rb') - for x in range(file_size/256): + for x in range(file_size//256): data = f.read(256) tx_object.tx_packet(data) diff --git a/tx/ublox.py b/tx/ublox.py index 87b0af2..f9a946b 100644 --- a/tx/ublox.py +++ b/tx/ublox.py @@ -12,22 +12,27 @@ Added UBloxGPS abstraction layer class for use with Wenet TX system. import struct import datetime from threading import Thread -import time, os, json, calendar, math +import time, os, json, calendar, math, traceback, socket, argparse # protocol constants PREAMBLE1 = 0xb5 PREAMBLE2 = 0x62 # message classes -CLASS_NAV = 0x01 -CLASS_RXM = 0x02 -CLASS_INF = 0x04 -CLASS_ACK = 0x05 -CLASS_CFG = 0x06 -CLASS_MON = 0x0A -CLASS_AID = 0x0B -CLASS_TIM = 0x0D -CLASS_ESF = 0x10 +CLASS_NAV = 0x01 # Navigation +CLASS_RXM = 0x02 # Receiver Manager +CLASS_INF = 0x04 # Inforation +CLASS_ACK = 0x05 # Message ACKs +CLASS_CFG = 0x06 # Configuration +CLASS_UPD = 0x09 # Firmware Updates +CLASS_MON = 0x0A # Monitoring +CLASS_AID = 0x0B # AssistNow Aiding +CLASS_TIM = 0x0D # Timing +CLASS_ESF = 0x10 # External Sensor Fusion +CLASS_MGA = 0x13 # Multiple-GNSS Assistance +CLASS_LOG = 0x21 # Logging +CLASS_SEC = 0x27 # Security +CLASS_HNR = 0x28 # High-Rate Navigation # ACK messages MSG_ACK_NACK = 0x00 @@ -52,6 +57,7 @@ MSG_NAV_DOP = 0x04 MSG_NAV_EKFSTATUS = 0x40 MSG_NAV_SBAS = 0x32 MSG_NAV_SOL = 0x06 +MSG_NAV_PVT = 0x07 # RXM messages MSG_RXM_RAW = 0x10 @@ -102,10 +108,18 @@ MSG_CFG_TMODE = 0x1D MSG_CFG_TPS = 0x31 MSG_CFG_TP = 0x07 MSG_CFG_GNSS = 0x3E +MSG_CFG_ESFALG = 0x56 +MSG_CFG_ESFA = 0x4C +MSG_CFG_ESFG = 0x4D +MSG_CFG_ESFWT = 0x82 +MSG_CFG_HNR = 0x5C # ESF messages MSG_ESF_MEAS = 0x02 MSG_ESF_STATUS = 0x10 +MSG_ESF_ALG = 0x14 +MSG_ESF_INS = 0x15 +MSG_ESF_RAW = 0x03 # INF messages MSG_INF_DEBUG = 0x04 @@ -131,6 +145,11 @@ MSG_TIM_TM2 = 0x03 MSG_TIM_SVIN = 0x04 MSG_TIM_VRFY = 0x06 +# HNR Messages +MSG_HNR_ATT = 0x01 +MSG_HNR_INS = 0x02 +MSG_HNR_PVT = 0x00 + # port IDs PORT_DDC =0 PORT_SERIAL1=1 @@ -204,7 +223,7 @@ class UBloxDescriptor: self.fields2 = fields2 def unpack(self, msg): - '''unpack a UBloxMessage, creating the .fields and ._recs attributes in msg''' + '''unpack a UBloxMessage, creating the .fields and ._recs attributes in msg''' msg._fields = {} # unpack main message blocks. A comm @@ -261,7 +280,7 @@ class UBloxDescriptor: msg._unpacked = True def pack(self, msg, msg_class=None, msg_id=None): - '''pack a UBloxMessage from the .fields and ._recs attributes in msg''' + '''pack a UBloxMessage from the .fields and ._recs attributes in msg''' f1 = [] if msg_class is None: msg_class = msg.msg_class() @@ -302,7 +321,7 @@ class UBloxDescriptor: msg._buf += struct.pack(' 0 and ord(self._buf[0]) != PREAMBLE1: + '''check if the message is valid so far''' + if len(self._buf) > 0 and self._buf[0] != PREAMBLE1: return False - if len(self._buf) > 1 and ord(self._buf[1]) != PREAMBLE2: + if len(self._buf) > 1 and self._buf[1] != PREAMBLE2: self.debug(1, "bad pre2") return False if self.needed_bytes() == 0 and not self.valid(): @@ -611,28 +639,27 @@ class UBloxMessage: return True def add(self, bytes): - '''add some bytes to a message''' + '''add some bytes to a message''' self._buf += bytes while not self.valid_so_far() and len(self._buf) > 0: - '''handle corrupted streams''' self._buf = self._buf[1:] if self.needed_bytes() < 0: - self._buf = "" + self._buf = b"" def checksum(self, data=None): - '''return a checksum tuple for a message''' + '''return a checksum tuple for a message''' if data is None: data = self._buf[2:-2] cs = 0 ck_a = 0 ck_b = 0 for i in data: - ck_a = (ck_a + ord(i)) & 0xFF + ck_a = (ck_a + i) & 0xFF ck_b = (ck_b + ck_a) & 0xFF return (ck_a, ck_b) def valid_checksum(self): - '''check if the checksum is OK''' + '''check if the checksum is OK''' (ck_a, ck_b) = self.checksum() d = self._buf[2:-2] (ck_a2, ck_b2) = struct.unpack('= 8 and self.needed_bytes() == 0 and self.valid_checksum() @@ -685,9 +712,9 @@ class UBlox: self.preferred_dgps_timeout = None def close(self): - '''close the device''' + '''close the device''' self.dev.close() - self.dev = None + self.dev = None def set_debug(self, debug_level): '''set debug level''' @@ -699,7 +726,7 @@ class UBlox: print(msg) def set_logfile(self, logfile, append=False): - '''setup logging to a file''' + '''setup logging to a file''' if self.log is not None: self.log.close() self.log = None @@ -758,10 +785,10 @@ class UBlox: def send_nmea(self, msg): if not self.read_only: s = msg + "*%02X" % self.nmea_checksum(msg) - self.write(s) + self.write(s.encode('ascii')) def set_binary(self): - '''put a UBlox into binary mode using a NMEA string''' + '''put a UBlox into binary mode using a NMEA string''' if not self.read_only: print("try set binary at %u" % self.baudrate) self.send_nmea("$PUBX,41,0,0007,0001,%u,0" % self.baudrate) @@ -808,7 +835,7 @@ class UBlox: def receive_message(self, ignore_eof=False): - '''blocking receive of one ublox message''' + '''blocking receive of one ublox message''' msg = UBloxMessage() while True: n = msg.needed_bytes() @@ -827,7 +854,7 @@ class UBlox: return msg def receive_message_noerror(self, ignore_eof=False): - '''blocking receive of one ublox message, ignoring errors''' + '''blocking receive of one ublox message, ignoring errors''' try: return self.receive_message(ignore_eof=ignore_eof) except UBloxError as e: @@ -840,7 +867,7 @@ class UBlox: return None def send(self, msg): - '''send a preformatted ublox message''' + '''send a preformatted ublox message''' if not msg.valid(): self.debug(1, "invalid send") return @@ -848,7 +875,7 @@ class UBlox: self.write(msg._buf) def send_message(self, msg_class, msg_id, payload): - '''send a ublox message with class, id and payload''' + '''send a ublox message with class, id and payload''' msg = UBloxMessage() msg._buf = struct.pack('