Merge pull request #135 from darksidelemm/testing

Add command-line type option. Make the habitat uploader always upload…
pull/136/head
Mark Jessop 2019-03-10 14:52:48 +10:30 zatwierdzone przez GitHub
commit ee295d34c1
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
2 zmienionych plików z 70 dodań i 37 usunięć

Wyświetl plik

@ -278,7 +278,7 @@ def clean_task_list():
continue
if _running == False:
# This task has stopped. Release it's associated SDR.
# This task has stopped. Release its associated SDR.
autorx.sdr_list[_task_sdr]['in_use'] = False
autorx.sdr_list[_task_sdr]['task'] = None
# Pop the task from the task list.
@ -389,6 +389,7 @@ def main():
parser.add_argument("-c" ,"--config", default="station.cfg", help="Receive Station Configuration File. Default: station.cfg")
parser.add_argument("-l" ,"--log", default="./log/", help="Receive Station Log Path. Default: ./log/")
parser.add_argument("-f", "--frequency", type=float, default=0.0, help="Sonde Frequency Override (MHz). This overrides the scan whitelist with the supplied frequency.")
parser.add_argument("-m", "--type", type=str, default=None, help="Immediately start a decoder for a provided sonde type (RS41, RS92, DFM, M10, etc)")
parser.add_argument("-t", "--timeout", type=int, default=0, help="Close auto_rx system after N minutes. Use 0 to run continuously.")
parser.add_argument("-v", "--verbose", help="Enable debug output.", action="store_true")
parser.add_argument("-e", "--ephemeris", type=str, default="None", help="Use a manually obtained ephemeris file when decoding RS92 Sondes.")
@ -589,6 +590,13 @@ def main():
# Note the start time.
_start_time = time.time()
# If a sonde type has been provided, insert an entry into the scan results,
# and immediately start a decoder. If decoding fails, then we continue into
# the main scanning loop.
if args.type != None:
scan_results.put([[args.frequency*1e6, args.type]])
handle_scan_results()
# Loop.
while True:
# Check for finished tasks.

Wyświetl plik

@ -15,7 +15,7 @@ import traceback
import json
from base64 import b64encode
from hashlib import sha256
from threading import Thread
from threading import Thread, Lock
from . import __version__ as auto_rx_version
try:
# Python 2
@ -457,6 +457,9 @@ class HabitatUploader(object):
# Record of when we last uploaded a user station position to Habitat.
self.last_user_position_upload = 0
# Lock for dealing with telemetry uploads.
self.upload_lock = Lock()
# Start the uploader thread.
self.upload_thread_running = True
self.upload_thread = Thread(target=self.habitat_upload_thread)
@ -580,6 +583,55 @@ class HabitatUploader(object):
self.log_debug("Stopped Habitat Uploader Thread.")
def handle_telem_dict(self, telem, immediate=False):
# Try and convert it to a UKHAS sentence
try:
_sentence = sonde_telemetry_to_sentence(telem)
except Exception as e:
self.log_error("Error converting telemetry to sentence - %s" % str(e))
return
_callsign = "RS_" + telem['id']
# Wait for the upload_lock to be available, to ensure we don't end up with
# race conditions resulting in multiple payload docs being created.
self.upload_lock.acquire()
# Create a habitat document if one does not already exist:
if not self.observed_payloads[telem['id']]['habitat_document']:
# Check if there has already been telemetry from this ID observed on Habhub
_document_exists = check_callsign(_callsign)
# If so, we don't need to create a new document
if _document_exists:
self.observed_payloads[telem['id']]['habitat_document'] = True
else:
# Otherwise, we attempt to create a new document.
if self.inhibit:
# If we have an upload inhibit, don't create a payload doc.
_created = True
else:
_created = initPayloadDoc(_callsign, description="Meteorology Radiosonde", frequency=telem['freq_float'])
if _created:
self.observed_payloads[telem['id']]['habitat_document'] = True
else:
self.log_error("Error creating payload document!")
self.upload_lock.release()
return
if immediate:
self.log_info("Performing immediate upload for first telemetry sentence of %s." % telem['id'])
self.habitat_upload(_sentence)
else:
# Attept to add it to the habitat uploader queue.
try:
self.habitat_upload_queue.put_nowait(_sentence)
except Exception as e:
self.log_error("Error adding sentence to queue: %s" % str(e))
self.upload_lock.release()
def upload_timer(self):
""" Add packets to the habitat upload queue if it is time for us to upload. """
@ -596,40 +648,8 @@ class HabitatUploader(object):
while not self.observed_payloads[_id]['data'].empty():
_telem = self.observed_payloads[_id]['data'].get()
# Try and convert it to a UKHAS sentence
try:
_sentence = sonde_telemetry_to_sentence(_telem)
except Exception as e:
self.log_error("Error converting telemetry to sentence - %s" % str(e))
continue
_callsign = "RS_" + _id
# Create a habitat document if one does not already exist:
if not self.observed_payloads[_id]['habitat_document']:
# Check if there has already been telemetry from this ID observed on Habhub
_document_exists = check_callsign(_callsign)
# If so, we don't need to create a new document
if _document_exists:
self.observed_payloads[_id]['habitat_document'] = True
else:
# Otherwise, we attempt to create a new document.
if self.inhibit:
# If we have an upload inhibit, don't create a payload doc.
_created = True
else:
_created = initPayloadDoc(_callsign, description="Meteorology Radiosonde", frequency=_telem['freq_float'])
if _created:
self.observed_payloads[_id]['habitat_document'] = True
else:
self.log_error("Error creating payload document!")
continue
# Attept to add it to the habitat uploader queue.
try:
self.habitat_upload_queue.put_nowait(_sentence)
except Exception as e:
self.log_error("Error adding sentence to queue: %s" % str(e))
self.handle_telem_dict(_telem)
# Sleep a second so we don't hit the synchronous upload time again.
time.sleep(1)
@ -666,12 +686,17 @@ class HabitatUploader(object):
# If we have seen this particular ID enough times, add the data to the ID's queue.
if self.observed_payloads[_id]['count'] >= self.callsign_validity_threshold:
# Add the telemetry to the queue
self.observed_payloads[_id]['data'].put(_telem)
# If this is the first time we have observed this payload, update the listener position.
if (self.observed_payloads[_id]['listener_updated'] == False) and (self.user_position is not None):
self.observed_payloads[_id]['listener_updated'] = self.user_position_upload()
# Because receiving balloon telemetry appears to be a competition, immediately upload the
# first valid position received.
self.handle_telem_dict(_telem, immediate=True)
else:
# Otherwise, add the telemetry to the upload queue
self.observed_payloads[_id]['data'].put(_telem)
else:
self.log_debug("Payload ID %s not observed enough to allow upload." % _id)