Decoder thread now uses async io, so it can timeout correctly.

pull/13/head
Mark Jessop 2017-05-07 07:42:46 +08:00
rodzic e6f1146b1d
commit bec5264555
6 zmienionych plików z 286 dodań i 128 usunięć

Wyświetl plik

@ -22,6 +22,9 @@ Dependencies
* Needs the following python packages (get them with `pip install <package>`)
* numpy
* crcmod
* Also needs:
* rtl-sdr packages (i.e. rtl_fm, rtl_power)
* sox
Usage
-----

Wyświetl plik

@ -0,0 +1,68 @@
"""
AsynchronousFileReader
======================
Simple thread based asynchronous file reader for Python.
see https://github.com/soxofaan/asynchronousfilereader
MIT License
Copyright (c) 2014 Stefaan Lippens
"""
__version__ = '0.2.1'
import threading
try:
# Python 2
from Queue import Queue
except ImportError:
# Python 3
from queue import Queue
class AsynchronousFileReader(threading.Thread):
"""
Helper class to implement asynchronous reading of a file
in a separate thread. Pushes read lines on a queue to
be consumed in another thread.
"""
def __init__(self, fd, queue=None, autostart=True):
self._fd = fd
if queue is None:
queue = Queue()
self.queue = queue
self.running = True
threading.Thread.__init__(self)
if autostart:
self.start()
def run(self):
"""
The body of the tread: read lines and put them on the queue.
"""
while self.running:
line = self._fd.readline()
if not line:
break
self.queue.put(line)
def eof(self):
"""
Check whether there is no more content to expect.
"""
return not self.is_alive() and self.queue.empty()
def stop(self):
"""
Stop the running thread.
"""
self.running = False
def readlines(self):
"""
Get currently available lines.
"""
while not self.queue.empty():
yield self.queue.get()

Wyświetl plik

@ -4,14 +4,16 @@
#
# 2017-04 Mark Jessop <vk5qi@rfhead.net>
#
# The following binaries will need to be built and copied to this directory:
# rs92/rs92gps
# scan/rs_detect
# Use the build.sh script in this directory to build the required binaries and move them to this directory.
#
# The following other packages are needed:
# rtl-sdr (for the rtl_power and rtl_fm utilities)
# sox
#
# The following Python packages are needed:
# - numpy
# - crcmod
#
# Instructions:
# Modify config parameters below as required. Take note of the APRS_USER and APRS_PASS values.
# Run with: python auto_rx.py
@ -19,16 +21,9 @@
#
#
# TODO:
# [ ] Better handling of errors from the decoder sub-process.
# [x] Handle no lat/long better. [Decoder won't output data at all if CRC fails.]
# [-] Option to filter by DOP data
# [x] Automatic downloading of ephemeris data, instead of almanac.
# [ ] Fix user gain setting issues. (gain='automatic' = no decode?!)
# [ ] Better peak signal detection. (Maybe convolve known spectral masks over power data?)
# [ ] Habitat upload.
# [x] Move configuration parameters to a separate file.
# [x] Allow use of custom object name instead of sonde ID.
# [x] Build file.
# [x] RS41 support.
# [ ] Use FSK demod from codec2-dev ?
@ -50,6 +45,7 @@ from StringIO import StringIO
from findpeaks import *
from config_reader import *
from gps_grabber import *
from async_file_reader import AsynchronousFileReader
# Internet Push Globals
@ -59,12 +55,15 @@ HABITAT_OUTPUT_ENABLED = False
INTERNET_PUSH_RUNNING = True
internet_push_queue = Queue.Queue(1)
# Latest sonde data. Used on exiting.
latest_sonde_data = None
def run_rtl_power(start, stop, step, filename="log_power.csv", dwell = 20):
def run_rtl_power(start, stop, step, filename="log_power.csv", dwell = 20, ppm = 0, gain = 'automatic'):
""" Run rtl_power, with a timeout"""
# rtl_power -f 400400000:403500000:800 -i20 -1 log_power.csv
rtl_power_cmd = "timeout %d rtl_power -f %d:%d:%d -i %d -1 %s" % (dwell+10, start, stop, step, dwell, filename)
rtl_power_cmd = "timeout %d rtl_power -f %d:%d:%d -i %d -1 -p %d %s" % (dwell+10, start, stop, step, dwell, int(ppm), filename)
logging.info("Running frequency scan.")
ret_code = os.system(rtl_power_cmd)
if ret_code == 1:
@ -119,9 +118,10 @@ def quantize_freq(freq_list, quantize=5000):
""" Quantise a list of frequencies to steps of <quantize> Hz """
return np.round(freq_list/quantize)*quantize
def detect_sonde(frequency, ppm=0, gain=0):
def detect_sonde(frequency, ppm=0, gain='automatic'):
""" Receive some FM and attempt to detect the presence of a radiosonde. """
rx_test_command = "timeout 10s rtl_fm -p %d -M fm -s 15k -f %d 2>/dev/null |" % (ppm, frequency)
rx_test_command = "timeout 10s rtl_fm -p %d -M fm -s 15k -f %d 2>/dev/null |" % (int(ppm), frequency)
rx_test_command += "sox -t raw -r 15k -e s -b 16 -c 1 - -r 48000 -t wav - highpass 20 2>/dev/null |"
rx_test_command += "./rs_detect -z -t 8 2>/dev/null"
@ -140,6 +140,69 @@ def detect_sonde(frequency, ppm=0, gain=0):
return None
def sonde_search(config, attempts = 5):
""" Perform a frequency scan across the defined range, and test each frequency for a radiosonde's presence. """
search_attempts = attempts
sonde_freq = None
sonde_type = None
while search_attempts > 0:
# Scan Band
run_rtl_power(config['min_freq']*1e6, config['max_freq']*1e6, config['search_step'], ppm=config['rtlsdr_ppm'], gain=config['rtlsdr_gain'])
# Read in result
try:
(freq, power, step) = read_rtl_power('log_power.csv')
except Exception as e:
traceback.print_exc()
logging.debug("Failed to read log_power.csv. Attempting to run rtl_power again.")
search_attempts -= 1
time.sleep(10)
continue
# Rough approximation of the noise floor of the received power spectrum.
power_nf = np.mean(power)
# Detect peaks.
peak_indices = detect_peaks(power, mph=(power_nf+config['min_snr']), mpd=(config['min_distance']/step), show = False)
if len(peak_indices) == 0:
logging.info("No peaks found on this pass.")
search_attempts -= 1
time.sleep(10)
continue
# Sort peaks by power.
peak_powers = power[peak_indices]
peak_freqs = freq[peak_indices]
peak_frequencies = peak_freqs[np.argsort(peak_powers)][::-1]
# Quantize to nearest x kHz
peak_frequencies = quantize_freq(peak_frequencies, config['quantization'])
logging.info("Peaks found at (MHz): %s" % str(peak_frequencies/1e6))
# Run rs_detect on each peak frequency, to determine if there is a sonde there.
for freq in peak_frequencies:
detected = detect_sonde(freq, ppm=config['rtlsdr_ppm'], gain=config['rtlsdr_gain'])
if detected != None:
sonde_freq = freq
sonde_type = detected
break
if sonde_type != None:
# Found a sonde! Break out of the while loop and attempt to decode it.
return (sonde_freq, sonde_type)
else:
# No sondes found :-( Wait and try again.
search_attempts -= 1
logging.warning("Search attempt failed, %d attempts remaining. Waiting %d seconds." % (search_attempts, config['search_delay']))
time.sleep(config['search_delay'])
# If we get here, we have exhausted our search attempts.
logging.error("No sondes detected.")
return (None, None)
def process_rs_line(line):
""" Process a line of output from the rs92gps decoder, converting it to a dict """
# Sample output:
@ -177,8 +240,9 @@ def process_rs_line(line):
traceback.print_exc()
return None
def decode_rs92(frequency, ppm=0, rx_queue=None, almanac=None, ephemeris=None):
def decode_rs92(frequency, ppm=0, gain='automatic', rx_queue=None, almanac=None, ephemeris=None, timeout=120):
""" Decode a RS92 sonde """
global latest_sonde_data
# Before we get started, do we need to download GPS data?
if ephemeris == None:
@ -187,7 +251,7 @@ def decode_rs92(frequency, ppm=0, rx_queue=None, almanac=None, ephemeris=None):
ephemeris = get_ephemeris(destination="ephemeris.dat")
# If ephemeris is still None, then we failed to download the ephemeris data.
# Try and grab the almanac data instead.
# Try and grab the almanac data instead
if ephemeris == None:
logging.error("Could not obtain ephemeris data, trying to download an almanac.")
almanac = get_almanac(destination="almanac.txt")
@ -196,8 +260,7 @@ def decode_rs92(frequency, ppm=0, rx_queue=None, almanac=None, ephemeris=None):
logging.critical("Could not obtain GPS ephemeris or almanac data.")
return False
decode_cmd = "rtl_fm -p %d -M fm -s 12k -f %d 2>/dev/null |" % (ppm, frequency)
decode_cmd = "rtl_fm -p %d -M fm -s 12k -f %d 2>/dev/null |" % (int(ppm), frequency)
decode_cmd += "sox -t raw -r 12k -e s -b 16 -c 1 - -r 48000 -b 8 -t wav - lowpass 2500 highpass 20 2>/dev/null |"
# Note: I've got the check-CRC option hardcoded in here as always on.
@ -208,36 +271,54 @@ def decode_rs92(frequency, ppm=0, rx_queue=None, almanac=None, ephemeris=None):
elif almanac != None:
decode_cmd += "./rs92mod --crc --csv -a %s" % almanac
rx_start_time = time.time()
rx_last_line = time.time()
rx = subprocess.Popen(decode_cmd, shell=True, stdin=None, stdout=subprocess.PIPE, preexec_fn=os.setsid)
# Receiver subprocess. Discard stderr, and feed stdout into an asynchronous read class.
rx = subprocess.Popen(decode_cmd, shell=True, stdin=None, stdout=subprocess.PIPE, preexec_fn=os.setsid)
rx_stdout = AsynchronousFileReader(rx.stdout, autostart=True)
while True:
try:
line = rx.stdout.readline()
while not rx_stdout.eof():
for line in rx_stdout.readlines():
if (line != None) and (line != ""):
data = process_rs_line(line)
try:
data = process_rs_line(line)
# Reset timeout counter.
rx_last_line = time.time()
if data != None:
# Add in a few fields that don't come from the sonde telemetry.
data['freq'] = "%.3f MHz" % (frequency/1e6)
data['type'] = "RS92"
if data != None:
# Add in a few fields that don't come from the sonde telemetry.
data['freq'] = "%.3f MHz" % (frequency/1e6)
data['type'] = "RS92"
if rx_queue != None:
try:
rx_queue.put_nowait(data)
except:
pass
except:
traceback.print_exc()
logging.error("Could not read from rxer stdout?")
os.killpg(os.getpgid(rx.pid), signal.SIGTERM)
return
latest_sonde_data = data
if rx_queue != None:
try:
rx_queue.put_nowait(data)
except:
pass
except:
traceback.print_exc()
logging.error("Error parsing line: %s" % line)
# Check timeout counter.
if time.time() > (rx_last_line+timeout):
logging.error("RX Timed out.")
break
# Sleep for a short time.
time.sleep(0.1)
logging.error("Closing RX Thread.")
os.killpg(os.getpgid(rx.pid), signal.SIGTERM)
rx_stdout.stop()
rx_stdout.join()
return
def decode_rs41(frequency, ppm=0, rx_queue=None):
def decode_rs41(frequency, ppm=0, gain='automatic', rx_queue=None, timeout=120):
""" Decode a RS41 sonde """
decode_cmd = "rtl_fm -p %d -M fm -s 12k -f %d 2>/dev/null |" % (ppm, frequency)
global latest_sonde_data
decode_cmd = "rtl_fm -p %d -M fm -s 12k -f %d 2>/dev/null |" % (int(ppm), frequency)
decode_cmd += "sox -t raw -r 12k -e s -b 16 -c 1 - -r 48000 -b 8 -t wav - lowpass 2600 2>/dev/null |"
# Note: I've got the check-CRC option hardcoded in here as always on.
@ -245,31 +326,48 @@ def decode_rs41(frequency, ppm=0, rx_queue=None):
decode_cmd += "./rs41mod --crc --csv"
rx_start_time = time.time()
rx_last_line = time.time()
rx = subprocess.Popen(decode_cmd, shell=True, stdin=None, stdout=subprocess.PIPE, preexec_fn=os.setsid)
# Receiver subprocess. Discard stderr, and feed stdout into an asynchronous read class.
rx = subprocess.Popen(decode_cmd, shell=True, stdin=None, stdout=subprocess.PIPE, preexec_fn=os.setsid)
rx_stdout = AsynchronousFileReader(rx.stdout, autostart=True)
while True:
try:
line = rx.stdout.readline()
while not rx_stdout.eof():
for line in rx_stdout.readlines():
if (line != None) and (line != ""):
data = process_rs_line(line)
try:
data = process_rs_line(line)
# Reset timeout counter.
rx_last_line = time.time()
if data != None:
# Add in a few fields that don't come from the sonde telemetry.
data['freq'] = "%.3f MHz" % (frequency/1e6)
data['type'] = "RS41"
if data != None:
# Add in a few fields that don't come from the sonde telemetry.
data['freq'] = "%.3f MHz" % (frequency/1e6)
data['type'] = "RS41"
if rx_queue != None:
try:
rx_queue.put_nowait(data)
except:
pass
except:
traceback.print_exc()
logging.error("Could not read from rxer stdout?")
os.killpg(os.getpgid(rx.pid), signal.SIGTERM)
return
latest_sonde_data = data
if rx_queue != None:
try:
rx_queue.put_nowait(data)
except:
pass
except:
traceback.print_exc()
logging.error("Error parsing line: %s" % line)
# Check timeout counter.
if time.time() > (rx_last_line+timeout):
logging.error("RX Timed out.")
break
# Sleep for a short time.
time.sleep(0.1)
logging.error("Closing RX Thread.")
os.killpg(os.getpgid(rx.pid), signal.SIGTERM)
rx_stdout.stop()
rx_stdout.join()
return
def internet_push_thread(station_config):
""" Push a frame of sonde data into various internet services (APRS-IS, Habitat) """
@ -314,91 +412,76 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-c" ,"--config", default="station.cfg", help="Receive Station Configuration File")
parser.add_argument("-f", "--frequency", default=0.0, help="Sonde Frequency (MHz) (bypass scan step).")
parser.add_argument("-t", "--timeout", default=180, help="Stop receiving after X minutes.")
args = parser.parse_args()
# Attempt to read in configuration file. Use default config if reading fails.
config = read_auto_rx_config(args.config)
# Pull some variables out of the config.
SEARCH_ATTEMPTS = config['search_attempts']
# Clean up gain value.
if config['rtlsdr_gain'] == '0' or config['rtlsdr_gain'] == 0:
config['rtlsdr_gain'] = 'automatic'
elif type(config['rtlsdr_gain']) == int:
config['rtlsdr_gain'] = str(config['rtlsdr_gain'])
else:
config['rtlsdr_gain'] = 'automatic'
timeout_time = time.time() + int(args.timeout)*60
#
# STEP 1: Search for a sonde.
#
# Internet push thread object.
push_thread = None
# Search variables.
sonde_freq = 0.0
# Sonde Frequency & Type variables.
sonde_freq = None
sonde_type = None
while SEARCH_ATTEMPTS>0:
# Scan Band
run_rtl_power(config['min_freq']*1e6, config['max_freq']*1e6, config['search_step'])
# Read in result
try:
(freq, power, step) = read_rtl_power('log_power.csv')
except Exception as e:
logging.debug("Failed to read log_power.csv. Attempting to run rtl_power again.")
SEARCH_ATTEMPTS -= 1
time.sleep(10)
# Main scan & track loop. We keep on doing this until we timeout (i.e. after we expect the sonde to have landed)
while time.time() < timeout_time:
# Attempt to detect a sonde on a supplied frequency.
if args.frequency != 0.0:
sonde_type = detect_sonde(int(float(args.frequency)*1e6), ppm=config['rtlsdr_ppm'], gain=config['rtlsdr_gain'])
if sonde_type != None:
sonde_freq = sonde_type
# If nothing is detected, or we haven't been supplied a frequency, perform a scan.
if sonde_type == None:
(sonde_freq, sonde_type) = sonde_search(config, config['search_attempts'])
# If we *still* haven't detected a sonde... just keep on trying, until we hit our timeout.
if sonde_type == None:
continue
# Rough approximation of the noise floor of the received power spectrum.
power_nf = np.mean(power)
logging.info("Starting decoding of %s on %.3f MHz" % (sonde_type, sonde_freq/1e6))
# Detect peaks.
peak_indices = detect_peaks(power, mph=(power_nf+config['min_snr']), mpd=(config['min_distance']/step), show = False)
# Start a thread to push data to the web, if it isn't started already.
if push_thread == None:
push_thread = Thread(target=internet_push_thread, kwargs={'station_config':config})
push_thread.start()
if len(peak_indices) == 0:
logging.info("No peaks found on this pass.")
SEARCH_ATTEMPTS -= 1
time.sleep(10)
continue
# Sort peaks by power.
peak_powers = power[peak_indices]
peak_freqs = freq[peak_indices]
peak_frequencies = peak_freqs[np.argsort(peak_powers)][::-1]
# Quantize to nearest x kHz
peak_frequencies = quantize_freq(peak_frequencies, config['quantization'])
logging.info("Peaks found at (MHz): %s" % str(peak_frequencies/1e6))
# Run rs_detect on each peak frequency, to determine if there is a sonde there.
for freq in peak_frequencies:
detected = detect_sonde(freq, ppm=config['rtlsdr_ppm'])
if detected != None:
sonde_freq = freq
sonde_type = detected
break
if sonde_type != None:
# Found a sonde! Break out of the while loop and attempt to decode it.
break
# Start decoding the sonde!
if sonde_type == "RS92":
decode_rs92(sonde_freq, ppm=config['rtlsdr_ppm'], gain=config['rtlsdr_gain'], rx_queue=internet_push_queue, timeout=config['rx_timeout'])
elif sonde_type == "RS41":
decode_rs41(sonde_freq, ppm=config['rtlsdr_ppm'], gain=config['rtlsdr_gain'], rx_queue=internet_push_queue, timeout=config['rx_timeout'])
else:
# No sondes found :-( Wait and try again.
SEARCH_ATTEMPTS -= 1
logging.warning("Search attempt failed, %d attempts remaining. Waiting %d seconds." % (SEARCH_ATTEMPTS, config['search_delay']))
time.sleep(config['search_delay'])
pass
if SEARCH_ATTEMPTS == 0:
logging.error("No sondes detcted, exiting.")
sys.exit(0)
# Receiver has timed out. Reset sonde type and frequency variables and loop.
logging.error("Receiver timed out. Re-starting scan.")
sonde_type = None
sonde_freq = None
logging.info("Starting decoding of %s on %.3f MHz" % (sonde_type, sonde_freq/1e6))
logging.info("Exceeded maximum receive time. Exiting.")
# Start a thread to push data to the web.
t = Thread(target=internet_push_thread, kwargs={'station_config':config})
t.start()
# Write last known sonde position to file.
if latest_sonde_data != None:
last_position_str = "%s,%s,%s,%s,%s,%.5f,%.5f,%.1f" % (data['date'], data['time'], data['type'], data['id'], data['freq'], data['lat'], data['lon'], data['alt'])
logging.info("Last Position: %s" % (last_position_str))
# Start decoding the sonde!
if sonde_type == "RS92":
decode_rs92(sonde_freq, ppm=config['rtlsdr_ppm'], rx_queue=internet_push_queue)
elif sonde_type == "RS41":
decode_rs41(sonde_freq, ppm=config['rtlsdr_ppm'], rx_queue=internet_push_queue)
else:
pass
f = open("last_positions.txt", 'a')
f.write(last_position_str + "\n")
f.close()
# Stop the APRS output thread.
INTERNET_PUSH_RUNNING = False

Wyświetl plik

@ -14,4 +14,4 @@ cd ~/RS/auto_rx/
rm log_power.csv
# Start auto_rx process with a 3 hour timeout.
timeout 10800 python auto_rx.py 2>error.log
timeout 14400 python auto_rx.py 2>error.log

Wyświetl plik

@ -21,6 +21,7 @@ def read_auto_rx_config(filename):
'min_snr' : 10,
'min_distance' : 1000,
'quantization' : 10000,
'rx_timeout' : 120,
'upload_rate' : 30,
'enable_aprs' : False,
'enable_habitat': False,
@ -48,6 +49,7 @@ def read_auto_rx_config(filename):
auto_rx_config['min_snr'] = config.getfloat('search_params', 'min_snr')
auto_rx_config['min_distance'] = config.getfloat('search_params', 'min_distance')
auto_rx_config['quantization'] = config.getint('search_params', 'quantization')
auto_rx_config['rx_timeout'] = config.getint('search_params', 'rx_timeout')
auto_rx_config['upload_rate'] = config.getint('upload', 'upload_rate')
auto_rx_config['enable_aprs'] = config.getboolean('upload', 'enable_aprs')
auto_rx_config['enable_habitat'] = config.getboolean('upload', 'enable_habitat')

Wyświetl plik

@ -13,7 +13,7 @@ rtlsdr_gain = 0
# Radiosonde Search Parameters
[search_params]
search_attempts = 5
search_attempts = 10
search_delay = 120
# Minimum and maximum search frequencies, in MHz
@ -27,6 +27,8 @@ min_snr = 10
min_distance = 1000
# Quantize search results to x Hz steps. Useful as most sondes are on 10 kHz frequency steps.
quantization = 10000
# Timeout and re-scan after X seconds of no data.
rx_timeout = 120
# Internet upload settings.