diff --git a/auto_rx/README.md b/auto_rx/README.md index a25ee2b..2481e00 100644 --- a/auto_rx/README.md +++ b/auto_rx/README.md @@ -22,6 +22,9 @@ Dependencies * Needs the following python packages (get them with `pip install `) * numpy * crcmod +* Also needs: + * rtl-sdr packages (i.e. rtl_fm, rtl_power) + * sox Usage ----- diff --git a/auto_rx/async_file_reader.py b/auto_rx/async_file_reader.py new file mode 100644 index 0000000..271f630 --- /dev/null +++ b/auto_rx/async_file_reader.py @@ -0,0 +1,68 @@ +""" +AsynchronousFileReader +====================== +Simple thread based asynchronous file reader for Python. +see https://github.com/soxofaan/asynchronousfilereader +MIT License +Copyright (c) 2014 Stefaan Lippens +""" + +__version__ = '0.2.1' + +import threading +try: + # Python 2 + from Queue import Queue +except ImportError: + # Python 3 + from queue import Queue + + +class AsynchronousFileReader(threading.Thread): + """ + Helper class to implement asynchronous reading of a file + in a separate thread. Pushes read lines on a queue to + be consumed in another thread. + """ + + def __init__(self, fd, queue=None, autostart=True): + self._fd = fd + if queue is None: + queue = Queue() + self.queue = queue + self.running = True + + threading.Thread.__init__(self) + + if autostart: + self.start() + + def run(self): + """ + The body of the tread: read lines and put them on the queue. + """ + while self.running: + line = self._fd.readline() + if not line: + break + self.queue.put(line) + + def eof(self): + """ + Check whether there is no more content to expect. + """ + return not self.is_alive() and self.queue.empty() + + def stop(self): + """ + Stop the running thread. + """ + self.running = False + + + def readlines(self): + """ + Get currently available lines. + """ + while not self.queue.empty(): + yield self.queue.get() \ No newline at end of file diff --git a/auto_rx/auto_rx.py b/auto_rx/auto_rx.py index 31bc589..1896740 100644 --- a/auto_rx/auto_rx.py +++ b/auto_rx/auto_rx.py @@ -4,14 +4,16 @@ # # 2017-04 Mark Jessop # -# The following binaries will need to be built and copied to this directory: -# rs92/rs92gps -# scan/rs_detect +# Use the build.sh script in this directory to build the required binaries and move them to this directory. # # The following other packages are needed: # rtl-sdr (for the rtl_power and rtl_fm utilities) # sox # +# The following Python packages are needed: +# - numpy +# - crcmod +# # Instructions: # Modify config parameters below as required. Take note of the APRS_USER and APRS_PASS values. # Run with: python auto_rx.py @@ -19,16 +21,9 @@ # # # TODO: -# [ ] Better handling of errors from the decoder sub-process. -# [x] Handle no lat/long better. [Decoder won't output data at all if CRC fails.] -# [-] Option to filter by DOP data -# [x] Automatic downloading of ephemeris data, instead of almanac. +# [ ] Fix user gain setting issues. (gain='automatic' = no decode?!) # [ ] Better peak signal detection. (Maybe convolve known spectral masks over power data?) # [ ] Habitat upload. -# [x] Move configuration parameters to a separate file. -# [x] Allow use of custom object name instead of sonde ID. -# [x] Build file. -# [x] RS41 support. # [ ] Use FSK demod from codec2-dev ? @@ -50,6 +45,7 @@ from StringIO import StringIO from findpeaks import * from config_reader import * from gps_grabber import * +from async_file_reader import AsynchronousFileReader # Internet Push Globals @@ -59,12 +55,15 @@ HABITAT_OUTPUT_ENABLED = False INTERNET_PUSH_RUNNING = True internet_push_queue = Queue.Queue(1) +# Latest sonde data. Used on exiting. +latest_sonde_data = None -def run_rtl_power(start, stop, step, filename="log_power.csv", dwell = 20): +def run_rtl_power(start, stop, step, filename="log_power.csv", dwell = 20, ppm = 0, gain = 'automatic'): """ Run rtl_power, with a timeout""" # rtl_power -f 400400000:403500000:800 -i20 -1 log_power.csv - rtl_power_cmd = "timeout %d rtl_power -f %d:%d:%d -i %d -1 %s" % (dwell+10, start, stop, step, dwell, filename) + + rtl_power_cmd = "timeout %d rtl_power -f %d:%d:%d -i %d -1 -p %d %s" % (dwell+10, start, stop, step, dwell, int(ppm), filename) logging.info("Running frequency scan.") ret_code = os.system(rtl_power_cmd) if ret_code == 1: @@ -119,9 +118,10 @@ def quantize_freq(freq_list, quantize=5000): """ Quantise a list of frequencies to steps of Hz """ return np.round(freq_list/quantize)*quantize -def detect_sonde(frequency, ppm=0, gain=0): +def detect_sonde(frequency, ppm=0, gain='automatic'): """ Receive some FM and attempt to detect the presence of a radiosonde. """ - rx_test_command = "timeout 10s rtl_fm -p %d -M fm -s 15k -f %d 2>/dev/null |" % (ppm, frequency) + + rx_test_command = "timeout 10s rtl_fm -p %d -M fm -s 15k -f %d 2>/dev/null |" % (int(ppm), frequency) rx_test_command += "sox -t raw -r 15k -e s -b 16 -c 1 - -r 48000 -t wav - highpass 20 2>/dev/null |" rx_test_command += "./rs_detect -z -t 8 2>/dev/null" @@ -140,6 +140,69 @@ def detect_sonde(frequency, ppm=0, gain=0): return None +def sonde_search(config, attempts = 5): + """ Perform a frequency scan across the defined range, and test each frequency for a radiosonde's presence. """ + search_attempts = attempts + + sonde_freq = None + sonde_type = None + + while search_attempts > 0: + # Scan Band + run_rtl_power(config['min_freq']*1e6, config['max_freq']*1e6, config['search_step'], ppm=config['rtlsdr_ppm'], gain=config['rtlsdr_gain']) + + # Read in result + try: + (freq, power, step) = read_rtl_power('log_power.csv') + except Exception as e: + traceback.print_exc() + logging.debug("Failed to read log_power.csv. Attempting to run rtl_power again.") + search_attempts -= 1 + time.sleep(10) + continue + + # Rough approximation of the noise floor of the received power spectrum. + power_nf = np.mean(power) + + # Detect peaks. + peak_indices = detect_peaks(power, mph=(power_nf+config['min_snr']), mpd=(config['min_distance']/step), show = False) + + if len(peak_indices) == 0: + logging.info("No peaks found on this pass.") + search_attempts -= 1 + time.sleep(10) + continue + + # Sort peaks by power. + peak_powers = power[peak_indices] + peak_freqs = freq[peak_indices] + peak_frequencies = peak_freqs[np.argsort(peak_powers)][::-1] + + # Quantize to nearest x kHz + peak_frequencies = quantize_freq(peak_frequencies, config['quantization']) + logging.info("Peaks found at (MHz): %s" % str(peak_frequencies/1e6)) + + # Run rs_detect on each peak frequency, to determine if there is a sonde there. + for freq in peak_frequencies: + detected = detect_sonde(freq, ppm=config['rtlsdr_ppm'], gain=config['rtlsdr_gain']) + if detected != None: + sonde_freq = freq + sonde_type = detected + break + + if sonde_type != None: + # Found a sonde! Break out of the while loop and attempt to decode it. + return (sonde_freq, sonde_type) + else: + # No sondes found :-( Wait and try again. + search_attempts -= 1 + logging.warning("Search attempt failed, %d attempts remaining. Waiting %d seconds." % (search_attempts, config['search_delay'])) + time.sleep(config['search_delay']) + + # If we get here, we have exhausted our search attempts. + logging.error("No sondes detected.") + return (None, None) + def process_rs_line(line): """ Process a line of output from the rs92gps decoder, converting it to a dict """ # Sample output: @@ -177,8 +240,9 @@ def process_rs_line(line): traceback.print_exc() return None -def decode_rs92(frequency, ppm=0, rx_queue=None, almanac=None, ephemeris=None): +def decode_rs92(frequency, ppm=0, gain='automatic', rx_queue=None, almanac=None, ephemeris=None, timeout=120): """ Decode a RS92 sonde """ + global latest_sonde_data # Before we get started, do we need to download GPS data? if ephemeris == None: @@ -187,7 +251,7 @@ def decode_rs92(frequency, ppm=0, rx_queue=None, almanac=None, ephemeris=None): ephemeris = get_ephemeris(destination="ephemeris.dat") # If ephemeris is still None, then we failed to download the ephemeris data. - # Try and grab the almanac data instead. + # Try and grab the almanac data instead if ephemeris == None: logging.error("Could not obtain ephemeris data, trying to download an almanac.") almanac = get_almanac(destination="almanac.txt") @@ -196,8 +260,7 @@ def decode_rs92(frequency, ppm=0, rx_queue=None, almanac=None, ephemeris=None): logging.critical("Could not obtain GPS ephemeris or almanac data.") return False - - decode_cmd = "rtl_fm -p %d -M fm -s 12k -f %d 2>/dev/null |" % (ppm, frequency) + decode_cmd = "rtl_fm -p %d -M fm -s 12k -f %d 2>/dev/null |" % (int(ppm), frequency) decode_cmd += "sox -t raw -r 12k -e s -b 16 -c 1 - -r 48000 -b 8 -t wav - lowpass 2500 highpass 20 2>/dev/null |" # Note: I've got the check-CRC option hardcoded in here as always on. @@ -208,36 +271,54 @@ def decode_rs92(frequency, ppm=0, rx_queue=None, almanac=None, ephemeris=None): elif almanac != None: decode_cmd += "./rs92mod --crc --csv -a %s" % almanac - rx_start_time = time.time() + rx_last_line = time.time() - rx = subprocess.Popen(decode_cmd, shell=True, stdin=None, stdout=subprocess.PIPE, preexec_fn=os.setsid) + # Receiver subprocess. Discard stderr, and feed stdout into an asynchronous read class. + rx = subprocess.Popen(decode_cmd, shell=True, stdin=None, stdout=subprocess.PIPE, preexec_fn=os.setsid) + rx_stdout = AsynchronousFileReader(rx.stdout, autostart=True) - while True: - try: - line = rx.stdout.readline() + while not rx_stdout.eof(): + for line in rx_stdout.readlines(): if (line != None) and (line != ""): - data = process_rs_line(line) + try: + data = process_rs_line(line) + # Reset timeout counter. + rx_last_line = time.time() - if data != None: - # Add in a few fields that don't come from the sonde telemetry. - data['freq'] = "%.3f MHz" % (frequency/1e6) - data['type'] = "RS92" + if data != None: + # Add in a few fields that don't come from the sonde telemetry. + data['freq'] = "%.3f MHz" % (frequency/1e6) + data['type'] = "RS92" - if rx_queue != None: - try: - rx_queue.put_nowait(data) - except: - pass - except: - traceback.print_exc() - logging.error("Could not read from rxer stdout?") - os.killpg(os.getpgid(rx.pid), signal.SIGTERM) - return + latest_sonde_data = data + + if rx_queue != None: + try: + rx_queue.put_nowait(data) + except: + pass + except: + traceback.print_exc() + logging.error("Error parsing line: %s" % line) + + # Check timeout counter. + if time.time() > (rx_last_line+timeout): + logging.error("RX Timed out.") + break + # Sleep for a short time. + time.sleep(0.1) + + logging.error("Closing RX Thread.") + os.killpg(os.getpgid(rx.pid), signal.SIGTERM) + rx_stdout.stop() + rx_stdout.join() + return -def decode_rs41(frequency, ppm=0, rx_queue=None): +def decode_rs41(frequency, ppm=0, gain='automatic', rx_queue=None, timeout=120): """ Decode a RS41 sonde """ - decode_cmd = "rtl_fm -p %d -M fm -s 12k -f %d 2>/dev/null |" % (ppm, frequency) + global latest_sonde_data + decode_cmd = "rtl_fm -p %d -M fm -s 12k -f %d 2>/dev/null |" % (int(ppm), frequency) decode_cmd += "sox -t raw -r 12k -e s -b 16 -c 1 - -r 48000 -b 8 -t wav - lowpass 2600 2>/dev/null |" # Note: I've got the check-CRC option hardcoded in here as always on. @@ -245,31 +326,48 @@ def decode_rs41(frequency, ppm=0, rx_queue=None): decode_cmd += "./rs41mod --crc --csv" - rx_start_time = time.time() + rx_last_line = time.time() - rx = subprocess.Popen(decode_cmd, shell=True, stdin=None, stdout=subprocess.PIPE, preexec_fn=os.setsid) + # Receiver subprocess. Discard stderr, and feed stdout into an asynchronous read class. + rx = subprocess.Popen(decode_cmd, shell=True, stdin=None, stdout=subprocess.PIPE, preexec_fn=os.setsid) + rx_stdout = AsynchronousFileReader(rx.stdout, autostart=True) - while True: - try: - line = rx.stdout.readline() + while not rx_stdout.eof(): + for line in rx_stdout.readlines(): if (line != None) and (line != ""): - data = process_rs_line(line) + try: + data = process_rs_line(line) + # Reset timeout counter. + rx_last_line = time.time() - if data != None: - # Add in a few fields that don't come from the sonde telemetry. - data['freq'] = "%.3f MHz" % (frequency/1e6) - data['type'] = "RS41" + if data != None: + # Add in a few fields that don't come from the sonde telemetry. + data['freq'] = "%.3f MHz" % (frequency/1e6) + data['type'] = "RS41" - if rx_queue != None: - try: - rx_queue.put_nowait(data) - except: - pass - except: - traceback.print_exc() - logging.error("Could not read from rxer stdout?") - os.killpg(os.getpgid(rx.pid), signal.SIGTERM) - return + latest_sonde_data = data + + if rx_queue != None: + try: + rx_queue.put_nowait(data) + except: + pass + except: + traceback.print_exc() + logging.error("Error parsing line: %s" % line) + + # Check timeout counter. + if time.time() > (rx_last_line+timeout): + logging.error("RX Timed out.") + break + # Sleep for a short time. + time.sleep(0.1) + + logging.error("Closing RX Thread.") + os.killpg(os.getpgid(rx.pid), signal.SIGTERM) + rx_stdout.stop() + rx_stdout.join() + return def internet_push_thread(station_config): """ Push a frame of sonde data into various internet services (APRS-IS, Habitat) """ @@ -314,91 +412,76 @@ if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-c" ,"--config", default="station.cfg", help="Receive Station Configuration File") parser.add_argument("-f", "--frequency", default=0.0, help="Sonde Frequency (MHz) (bypass scan step).") + parser.add_argument("-t", "--timeout", default=180, help="Stop receiving after X minutes.") args = parser.parse_args() # Attempt to read in configuration file. Use default config if reading fails. config = read_auto_rx_config(args.config) - # Pull some variables out of the config. - SEARCH_ATTEMPTS = config['search_attempts'] + # Clean up gain value. + if config['rtlsdr_gain'] == '0' or config['rtlsdr_gain'] == 0: + config['rtlsdr_gain'] = 'automatic' + elif type(config['rtlsdr_gain']) == int: + config['rtlsdr_gain'] = str(config['rtlsdr_gain']) + else: + config['rtlsdr_gain'] = 'automatic' + timeout_time = time.time() + int(args.timeout)*60 - # - # STEP 1: Search for a sonde. - # + # Internet push thread object. + push_thread = None - # Search variables. - sonde_freq = 0.0 + # Sonde Frequency & Type variables. + sonde_freq = None sonde_type = None - while SEARCH_ATTEMPTS>0: - # Scan Band - run_rtl_power(config['min_freq']*1e6, config['max_freq']*1e6, config['search_step']) - # Read in result - try: - (freq, power, step) = read_rtl_power('log_power.csv') - except Exception as e: - logging.debug("Failed to read log_power.csv. Attempting to run rtl_power again.") - SEARCH_ATTEMPTS -= 1 - time.sleep(10) + # Main scan & track loop. We keep on doing this until we timeout (i.e. after we expect the sonde to have landed) + while time.time() < timeout_time: + + # Attempt to detect a sonde on a supplied frequency. + if args.frequency != 0.0: + sonde_type = detect_sonde(int(float(args.frequency)*1e6), ppm=config['rtlsdr_ppm'], gain=config['rtlsdr_gain']) + if sonde_type != None: + sonde_freq = sonde_type + # If nothing is detected, or we haven't been supplied a frequency, perform a scan. + if sonde_type == None: + (sonde_freq, sonde_type) = sonde_search(config, config['search_attempts']) + + # If we *still* haven't detected a sonde... just keep on trying, until we hit our timeout. + if sonde_type == None: continue - # Rough approximation of the noise floor of the received power spectrum. - power_nf = np.mean(power) + logging.info("Starting decoding of %s on %.3f MHz" % (sonde_type, sonde_freq/1e6)) - # Detect peaks. - peak_indices = detect_peaks(power, mph=(power_nf+config['min_snr']), mpd=(config['min_distance']/step), show = False) + # Start a thread to push data to the web, if it isn't started already. + if push_thread == None: + push_thread = Thread(target=internet_push_thread, kwargs={'station_config':config}) + push_thread.start() - if len(peak_indices) == 0: - logging.info("No peaks found on this pass.") - SEARCH_ATTEMPTS -= 1 - time.sleep(10) - continue - - # Sort peaks by power. - peak_powers = power[peak_indices] - peak_freqs = freq[peak_indices] - peak_frequencies = peak_freqs[np.argsort(peak_powers)][::-1] - - # Quantize to nearest x kHz - peak_frequencies = quantize_freq(peak_frequencies, config['quantization']) - logging.info("Peaks found at (MHz): %s" % str(peak_frequencies/1e6)) - - # Run rs_detect on each peak frequency, to determine if there is a sonde there. - for freq in peak_frequencies: - detected = detect_sonde(freq, ppm=config['rtlsdr_ppm']) - if detected != None: - sonde_freq = freq - sonde_type = detected - break - - if sonde_type != None: - # Found a sonde! Break out of the while loop and attempt to decode it. - break + # Start decoding the sonde! + if sonde_type == "RS92": + decode_rs92(sonde_freq, ppm=config['rtlsdr_ppm'], gain=config['rtlsdr_gain'], rx_queue=internet_push_queue, timeout=config['rx_timeout']) + elif sonde_type == "RS41": + decode_rs41(sonde_freq, ppm=config['rtlsdr_ppm'], gain=config['rtlsdr_gain'], rx_queue=internet_push_queue, timeout=config['rx_timeout']) else: - # No sondes found :-( Wait and try again. - SEARCH_ATTEMPTS -= 1 - logging.warning("Search attempt failed, %d attempts remaining. Waiting %d seconds." % (SEARCH_ATTEMPTS, config['search_delay'])) - time.sleep(config['search_delay']) + pass - if SEARCH_ATTEMPTS == 0: - logging.error("No sondes detcted, exiting.") - sys.exit(0) + # Receiver has timed out. Reset sonde type and frequency variables and loop. + logging.error("Receiver timed out. Re-starting scan.") + sonde_type = None + sonde_freq = None - logging.info("Starting decoding of %s on %.3f MHz" % (sonde_type, sonde_freq/1e6)) + logging.info("Exceeded maximum receive time. Exiting.") - # Start a thread to push data to the web. - t = Thread(target=internet_push_thread, kwargs={'station_config':config}) - t.start() + # Write last known sonde position to file. + if latest_sonde_data != None: + last_position_str = "%s,%s,%s,%s,%s,%.5f,%.5f,%.1f" % (data['date'], data['time'], data['type'], data['id'], data['freq'], data['lat'], data['lon'], data['alt']) + logging.info("Last Position: %s" % (last_position_str)) - # Start decoding the sonde! - if sonde_type == "RS92": - decode_rs92(sonde_freq, ppm=config['rtlsdr_ppm'], rx_queue=internet_push_queue) - elif sonde_type == "RS41": - decode_rs41(sonde_freq, ppm=config['rtlsdr_ppm'], rx_queue=internet_push_queue) - else: - pass + f = open("last_positions.txt", 'a') + f.write(last_position_str + "\n") + f.close() # Stop the APRS output thread. INTERNET_PUSH_RUNNING = False diff --git a/auto_rx/auto_rx.sh b/auto_rx/auto_rx.sh index 61eb8c0..a6dfcbc 100755 --- a/auto_rx/auto_rx.sh +++ b/auto_rx/auto_rx.sh @@ -14,4 +14,4 @@ cd ~/RS/auto_rx/ rm log_power.csv # Start auto_rx process with a 3 hour timeout. -timeout 10800 python auto_rx.py 2>error.log +timeout 14400 python auto_rx.py 2>error.log diff --git a/auto_rx/config_reader.py b/auto_rx/config_reader.py index f4bc42d..6bbb5be 100644 --- a/auto_rx/config_reader.py +++ b/auto_rx/config_reader.py @@ -21,6 +21,7 @@ def read_auto_rx_config(filename): 'min_snr' : 10, 'min_distance' : 1000, 'quantization' : 10000, + 'rx_timeout' : 120, 'upload_rate' : 30, 'enable_aprs' : False, 'enable_habitat': False, @@ -48,6 +49,7 @@ def read_auto_rx_config(filename): auto_rx_config['min_snr'] = config.getfloat('search_params', 'min_snr') auto_rx_config['min_distance'] = config.getfloat('search_params', 'min_distance') auto_rx_config['quantization'] = config.getint('search_params', 'quantization') + auto_rx_config['rx_timeout'] = config.getint('search_params', 'rx_timeout') auto_rx_config['upload_rate'] = config.getint('upload', 'upload_rate') auto_rx_config['enable_aprs'] = config.getboolean('upload', 'enable_aprs') auto_rx_config['enable_habitat'] = config.getboolean('upload', 'enable_habitat') diff --git a/auto_rx/station.cfg.example b/auto_rx/station.cfg.example index 129b50f..f24585d 100644 --- a/auto_rx/station.cfg.example +++ b/auto_rx/station.cfg.example @@ -13,7 +13,7 @@ rtlsdr_gain = 0 # Radiosonde Search Parameters [search_params] -search_attempts = 5 +search_attempts = 10 search_delay = 120 # Minimum and maximum search frequencies, in MHz @@ -27,6 +27,8 @@ min_snr = 10 min_distance = 1000 # Quantize search results to x Hz steps. Useful as most sondes are on 10 kHz frequency steps. quantization = 10000 +# Timeout and re-scan after X seconds of no data. +rx_timeout = 120 # Internet upload settings.