radiosonde_auto_rx/auto_rx/autorx/habitat.py

797 wiersze
30 KiB
Python

#!/usr/bin/env python
#
# radiosonde_auto_rx - Habitat Exporter
#
# Copyright (C) 2018 Mark Jessop <vk5qi@rfhead.net>
# Released under GNU GPL v3 or later
#
import crcmod
import datetime
import logging
import random
import requests
import time
import traceback
import json
from base64 import b64encode
from hashlib import sha256
from threading import Thread, Lock
from . import __version__ as auto_rx_version
try:
# Python 2
from Queue import Queue
except ImportError:
# Python 3
from queue import Queue
# URL for the Habitat DB Server.
# As of July 2018 we send via sondehub.org, which will allow us to eventually transition away
# from using the habhub.org tracker, and leave it for use by High-Altitude Balloon Hobbyists.
# For now, sondehub.org just acts as a proxy to habhub.org.
HABITAT_URL = "http://habitat.sondehub.org/"
# HABITAT_URL = "http://habitat.habhub.org/"
# CRC16 function
def crc16_ccitt(data):
"""
Calculate the CRC16 CCITT checksum of *data*.
(CRC16 CCITT: start 0xFFFF, poly 0x1021)
Args:
data (str): String to be CRC'd. The string will be encoded to ASCII prior to CRCing.
Return:
str: Resultant checksum as two bytes of hexadecimal.
"""
crc16 = crcmod.predefined.mkCrcFun('crc-ccitt-false')
# Encode to ASCII.
_data_ascii = data.encode('ascii')
return hex(crc16(_data_ascii))[2:].upper().zfill(4)
def sonde_telemetry_to_sentence(telemetry, payload_callsign=None, comment=None):
""" Convert a telemetry data dictionary into a UKHAS-compliant telemetry sentence.
Args:
telemetry (dict): A sonde telemetry dictionary. Refer to the description in the autorx.decode.SondeDecoder docs.
payload_callsign (str): If supplied, override the callsign field with this string.
comment (str): Optional data to add to the comment field of the output sentence.
Returns:
str: UKHAS-compliant telemetry sentence for uploading to Habitat
"""
# We only want HH:MM:SS for uploading to habitat.
_short_time = telemetry['datetime_dt'].strftime("%H:%M:%S")
if payload_callsign is None:
# If we haven't been supplied a callsign, we generate one based on the serial number.
_callsign = "RS_" + telemetry['id']
else:
_callsign = payload_callsign
_sentence = "$$%s,%d,%s,%.5f,%.5f,%d,%.1f,%.1f,%.1f" % (_callsign,
telemetry['frame'],
_short_time,
telemetry['lat'],
telemetry['lon'],
int(telemetry['alt']), # Round to the nearest metre.
telemetry['vel_h'],
telemetry['temp'],
telemetry['humidity'])
# Add in a comment field, containing the sonde type, serial number, and frequency.
_sentence += ",%s %s %s" % (telemetry['type'], telemetry['id'], telemetry['freq'])
# Check for Burst/Kill timer data, and add in.
if 'bt' in telemetry:
if (telemetry['bt'] != -1) and (telemetry['bt'] != 65535):
_sentence += " BT %s" % time.strftime("%H:%M:%S", time.gmtime(telemetry['bt']))
# Add on the station code, which will only be present if we are receiving an iMet sonde.
# This may assist multiple receiving stations in the vicinity of an iMet launch site coordinate
# the iMet unique ID generation.
if 'station_code' in telemetry:
_sentence += " LOC: %s" % telemetry['station_code']
# Add on any custom comment data if provided.
if comment != None:
comment = comment.replace(',','_')
_sentence += " " + comment
_checksum = crc16_ccitt(_sentence[2:])
_output = _sentence + "*" + _checksum + "\n"
return _output
#
# Functions for uploading a listener position to Habitat.
# Derived from https://raw.githubusercontent.com/rossengeorgiev/hab-tools/master/spot2habitat_chase.py
#
callsign_init = False
url_habitat_uuids = HABITAT_URL + "_uuids?count=%d"
url_habitat_db = HABITAT_URL + "habitat/"
uuids = []
def check_callsign(callsign, timeout=10):
"""
Check if a payload document exists for a given callsign.
This is done in a bit of a hack-ish way at the moment. We just check to see if there have
been any reported packets for the payload callsign on the tracker.
This should really be replaced with the correct call into the habitat tracker.
Args:
callsign (str): Payload callsign to search for.
timeout (int): Timeout for the search, in seconds. Defaults to 10 seconds.
Returns:
bool: True if callsign has been observed within the last 6 hour, False otherwise.
"""
_url_check_callsign = "http://spacenear.us/tracker/datanew.php?mode=6hours&type=positions&format=json&max_positions=10&position_id=0&vehicle=%s"
logging.debug("Habitat - Checking if %s has been observed recently..." % callsign)
# Perform the request
_r = requests.get(_url_check_callsign % callsign, timeout=timeout)
try:
# Read the response in as JSON
_r_json = _r.json()
# Read out the list of positions for the requested callsign
_positions = _r_json['positions']['position']
# If there is at least one position returned, we assume there is a valid payload document.
if len(_positions) > 0:
logging.info("Habitat - Callsign %s already present in Habitat DB, not creating new payload doc." % callsign)
return True
else:
# Otherwise, we don't, and go create one.
return False
except Exception as e:
# Handle errors with JSON parsing.
logging.error("Habitat - Unable to request payload positions from spacenear.us - %s" % str(e))
return False
# Keep an internal cache for which payload docs we've created so we don't spam couchdb with updates
payload_config_cache = {}
def ISOStringNow():
return "%sZ" % datetime.datetime.utcnow().isoformat()
def initPayloadDoc(serial, description="Meteorology Radiosonde", frequency=401.5, timeout=20):
"""Creates a payload in Habitat for the radiosonde before uploading"""
global url_habitat_db
payload_data = {
"type": "payload_configuration",
"name": serial,
"time_created": ISOStringNow(),
"metadata": {
"description": description
},
"transmissions": [
{
"frequency": frequency,
"modulation": "RTTY",
"mode": "USB",
"encoding": "ASCII-8",
"parity": "none",
"stop": 2,
"shift": 350,
"baud": 50,
"description": "DUMMY ENTRY, DATA IS VIA radiosonde_auto_rx"
}
],
"sentences": [
{
"protocol": "UKHAS",
"callsign": serial,
"checksum":"crc16-ccitt",
"fields":[
{
"name": "sentence_id",
"sensor": "base.ascii_int"
},
{
"name": "time",
"sensor": "stdtelem.time"
},
{
"name": "latitude",
"sensor": "stdtelem.coordinate",
"format": "dd.dddd"
},
{
"name": "longitude",
"sensor": "stdtelem.coordinate",
"format": "dd.dddd"
},
{
"name": "altitude",
"sensor": "base.ascii_int"
},
{
"name": "speed",
"sensor": "base.ascii_float"
},
{
"name": "temperature_external",
"sensor": "base.ascii_float"
},
{
"name": "humidity",
"sensor": "base.ascii_float"
},
{
"name": "comment",
"sensor": "base.string"
}
],
"filters":
{
"post": [
{
"filter": "common.invalid_location_zero",
"type": "normal"
}
]
},
"description": "radiosonde_auto_rx to Habitat Bridge"
}
]
}
# Perform the POST request to the Habitat DB.
try:
_r = requests.post(url_habitat_db, json=payload_data, timeout=timeout)
if _r.json()['ok'] is True:
logging.info("Habitat - Created a payload document for %s" % serial)
return True
else:
logging.error("Habitat - Failed to create a payload document for %s" % serial)
return False
except Exception as e:
logging.error("Habitat - Failed to create a payload document for %s - %s" % (serial, str(e)))
return False
def postListenerData(doc, timeout=10):
global uuids, url_habitat_db
# do we have at least one uuid, if not go get more
if len(uuids) < 1:
fetchUuids()
# Attempt to add UUID and time data to document.
try:
doc['_id'] = uuids.pop()
except IndexError:
logging.error("Habitat - Unable to post listener data - no UUIDs available.")
return False
doc['time_uploaded'] = ISOStringNow()
try:
_r = requests.post(url_habitat_db, json=doc, timeout=timeout)
return True
except Exception as e:
logging.error("Habitat - Could not post listener data - %s" % str(e))
return False
def fetchUuids(timeout=10):
global uuids, url_habitat_uuids
_retries = 5
while _retries > 0:
try:
_r = requests.get(url_habitat_uuids % 10, timeout=timeout)
uuids.extend(_r.json()['uuids'])
#logging.debug("Habitat - Got UUIDs")
return
except Exception as e:
logging.error("Habitat - Unable to fetch UUIDs, retrying in 10 seconds - %s" % str(e))
time.sleep(10)
_retries = _retries - 1
continue
logging.error("Habitat - Gave up trying to get UUIDs.")
return
def initListenerCallsign(callsign, version='', antenna=''):
doc = {
'type': 'listener_information',
'time_created' : ISOStringNow(),
'data': {
'callsign': callsign,
'antenna': antenna,
'radio': 'radiosonde_auto_rx %s' % version,
}
}
resp = postListenerData(doc)
if resp is True:
#logging.debug("Habitat - Listener Callsign Initialized.")
return True
else:
logging.error("Habitat - Unable to initialize callsign.")
return False
def uploadListenerPosition(callsign, lat, lon, version='', antenna=''):
""" Initializer Listener Callsign, and upload Listener Position """
# Attempt to initialize the listeners callsign
resp = initListenerCallsign(callsign, version=version, antenna=antenna)
# If this fails, it means we can't contact the Habitat server,
# so there is no point continuing.
if resp is False:
return False
doc = {
'type': 'listener_telemetry',
'time_created': ISOStringNow(),
'data': {
'callsign': callsign,
'chase': False,
'latitude': lat,
'longitude': lon,
'altitude': 0,
'speed': 0,
}
}
# post position to habitat
resp = postListenerData(doc)
if resp is True:
logging.info("Habitat - Station position uploaded.")
return True
else:
logging.error("Habitat - Unable to upload station position.")
return False
#
# Habitat Uploader Class
#
class HabitatUploader(object):
'''
Queued Habitat Telemetry Uploader class
This performs uploads to the Habitat servers, and also handles generation of flight documents.
Incoming telemetry packets are fed into queue, which is checked regularly.
If a new callsign is sighted, a payload document is created in the Habitat DB.
The telemetry data is then converted into a UKHAS-compatible format, before being added to queue to be
uploaded as network speed permits.
If an upload attempt times out, the packet is discarded.
If the queue fills up (probably indicating no network connection, and a fast packet downlink rate),
it is immediately emptied, to avoid upload of out-of-date packets.
Note that this uploader object is intended to handle telemetry from multiple sondes
'''
# We require the following fields to be present in the incoming telemetry dictionary data
REQUIRED_FIELDS = ['frame', 'id', 'datetime', 'lat', 'lon', 'alt', 'temp', 'type', 'freq', 'freq_float', 'datetime_dt']
def __init__(self,
user_callsign = 'N0CALL',
user_position = None,
user_antenna = "",
payload_callsign_override = None,
synchronous_upload_time = 30,
callsign_validity_threshold = 5,
upload_queue_size = 16,
upload_timeout = 10,
upload_retries = 5,
upload_retry_interval = 0.25,
user_position_update_rate = 6,
inhibit = False
):
""" Initialise a Habitat Uploader object.
Args:
user_callsign (str): Callsign of the uploader.
user_position (tuple): Optional - a tuple consisting of (lat, lon, alt), which if populated,
is used to plot the listener's position on the Habitat map, both when this class is initialised, and
when a new sonde ID is observed.
payload_callsign_override (str): Override the payload callsign in the uploaded sentence with this value.
WARNING: This will horribly break the tracker map if multiple sondes are uploaded under the same callsign.
USE WITH CAUTION!!!
synchronous_upload_time (int): Upload the most recent telemetry when time.time()%synchronous_upload_time == 0
This is done in an attempt to get multiple stations uploading the same telemetry sentence simultaneously,
and also acts as decimation on the number of sentences uploaded to Habitat.
callsign_validity_threshold (int): Only upload telemetry data if the callsign has been observed more than N times. Default = 5
upload_queue_size (int): Maximum umber of sentences to keep in the upload queue. If the queue is filled,
it will be emptied (discarding the queue contents).
upload_timeout (int): Timeout (Seconds) when performing uploads to Habitat. Default: 10 seconds.
upload_retries (int): Retry an upload up to this many times. Default: 5
upload_retry_interval (int): Time interval between upload retries. Default: 0.25 seconds.
user_position_update_rate (int): Time interval between automatic station position updates, hours.
Set to 6 hours by default, updating any more often than this is not really useful.
inhibit (bool): Inhibit all uploads. Mainly intended for debugging.
"""
self.user_callsign = user_callsign
self.user_position = user_position
self.user_antenna = user_antenna
self.payload_callsign_override = payload_callsign_override
self.upload_timeout = upload_timeout
self.upload_retries = upload_retries
self.upload_retry_interval = upload_retry_interval
self.upload_queue_size = upload_queue_size
self.synchronous_upload_time = synchronous_upload_time
self.callsign_validity_threshold = callsign_validity_threshold
self.inhibit = inhibit
self.user_position_update_rate = user_position_update_rate
# Our two Queues - one to hold sentences to be upload, the other to temporarily hold
# input telemetry dictionaries before they are converted and processed.
self.habitat_upload_queue = Queue(upload_queue_size)
self.input_queue = Queue()
# Dictionary where we store sorted telemetry data for upload when required.
# Elements will be named after payload IDs, and will contain:
# 'count' (int): Number of times this callsign has been observed. Uploads will only occur when
# this number rises above callsign_validity_threshold.
# 'data' (Queue): A queue of telemetry sentences to be uploaded. When the upload timer fires,
# this queue will be dumped, and the most recent telemetry uploaded.
# 'habitat_document' (bool): Indicates if a habitat document has been created for this payload ID.
# 'listener_updated' (bool): Indicates if the listener position has been updated for the start of this ID's flight.
self.observed_payloads = {}
# Record of when we last uploaded a user station position to Habitat.
self.last_user_position_upload = 0
# Lock for dealing with telemetry uploads.
self.upload_lock = Lock()
# Start the uploader thread.
self.upload_thread_running = True
self.upload_thread = Thread(target=self.habitat_upload_thread)
self.upload_thread.start()
# Start the input queue processing thread.
self.input_processing_running = True
self.input_thread = Thread(target=self.process_queue)
self.input_thread.start()
self.timer_thread_running = True
self.timer_thread = Thread(target=self.upload_timer)
self.timer_thread.start()
def user_position_upload(self):
""" Upload the the station position to Habitat. """
if self.user_position is not None:
_success = uploadListenerPosition(self.user_callsign, self.user_position[0], self.user_position[1], version=auto_rx_version, antenna=self.user_antenna)
self.last_user_position_upload = time.time()
return _success
else:
return False
def habitat_upload(self, sentence):
''' Upload a UKHAS-standard telemetry sentence to Habitat
Args:
sentence (str): The UKHAS-standard telemetry sentence to upload.
'''
if self.inhibit:
self.log_info("Upload inhibited.")
return
# Generate payload to be uploaded
_sentence_b64 = b64encode(sentence.encode('ascii')) # Encode to ASCII to be able to perform B64 encoding...
_date = datetime.datetime.utcnow().isoformat("T") + "Z"
_user_call = self.user_callsign
_data = {
"type": "payload_telemetry",
"data": {
"_raw": _sentence_b64.decode('ascii') # ... but decode back to a string to enable JSON serialisation.
},
"receivers": {
_user_call: {
"time_created": _date,
"time_uploaded": _date,
},
},
}
# The URL to upload to.
_url = HABITAT_URL + "habitat/_design/payload_telemetry/_update/add_listener/%s" % sha256(_sentence_b64).hexdigest()
# Delay for a random amount of time between 0 and upload_retry_interval*2 seconds.
time.sleep(random.random()*self.upload_retry_interval*2.0)
_retries = 0
# When uploading, we have three possible outcomes:
# - Can't connect. No point immediately re-trying in this situation.
# - The packet is uploaded successfuly (201 / 403)
# - There is a upload conflict on the Habitat DB end (409). We can retry and it might work.
while _retries < self.upload_retries:
# Run the request.
try:
headers = {"User-Agent": "autorx-" + auto_rx_version}
_req = requests.put(_url, data=json.dumps(_data), timeout=self.upload_timeout, headers=headers)
except Exception as e:
self.log_error("Upload Failed: %s" % str(e))
break
if _req.status_code == 201 or _req.status_code == 403:
# 201 = Success, 403 = Success, sentence has already seen by others.
self.log_info("Uploaded sentence to Habitat successfully: %s" % sentence.strip())
_upload_success = True
break
elif _req.status_code == 409:
# 409 = Upload conflict (server busy). Sleep for a moment, then retry.
self.log_debug("Upload conflict.. retrying.")
time.sleep(random.random()*self.upload_retry_interval)
_retries += 1
else:
self.log_error("Error uploading to Habitat. Status Code: %d %s." % (_req.status_code, _req.text))
break
if _retries == self.upload_retries:
self.log_error("Upload conflict not resolved with %d retries." % self.upload_retries)
return
def habitat_upload_thread(self):
''' Handle uploading of packets to Habitat '''
self.log_debug("Started Habitat Uploader Thread.")
while self.upload_thread_running:
if self.habitat_upload_queue.qsize() > 0:
# If the queue is completely full, jump to the most recent telemetry sentence.
if self.habitat_upload_queue.qsize() == self.upload_queue_size:
while not self.habitat_upload_queue.empty():
sentence = self.habitat_upload_queue.get()
self.log_warning("Uploader queue was full - possible connectivity issue.")
else:
# Otherwise, get the first item in the queue.
sentence = self.habitat_upload_queue.get()
# Attempt to upload it.
self.habitat_upload(sentence)
else:
# Wait for a short time before checking the queue again.
time.sleep(0.1)
self.log_debug("Stopped Habitat Uploader Thread.")
def handle_telem_dict(self, telem, immediate=False):
# Try and convert it to a UKHAS sentence
try:
_sentence = sonde_telemetry_to_sentence(telem)
except Exception as e:
self.log_error("Error converting telemetry to sentence - %s" % str(e))
return
_callsign = "RS_" + telem['id']
# Wait for the upload_lock to be available, to ensure we don't end up with
# race conditions resulting in multiple payload docs being created.
self.upload_lock.acquire()
# Create a habitat document if one does not already exist:
if not self.observed_payloads[telem['id']]['habitat_document']:
# Check if there has already been telemetry from this ID observed on Habhub
_document_exists = check_callsign(_callsign)
# If so, we don't need to create a new document
if _document_exists:
self.observed_payloads[telem['id']]['habitat_document'] = True
else:
# Otherwise, we attempt to create a new document.
if self.inhibit:
# If we have an upload inhibit, don't create a payload doc.
_created = True
else:
_created = initPayloadDoc(_callsign, description="Meteorology Radiosonde", frequency=telem['freq_float'])
if _created:
self.observed_payloads[telem['id']]['habitat_document'] = True
else:
self.log_error("Error creating payload document!")
self.upload_lock.release()
return
if immediate:
self.log_info("Performing immediate upload for first telemetry sentence of %s." % telem['id'])
self.habitat_upload(_sentence)
else:
# Attept to add it to the habitat uploader queue.
try:
self.habitat_upload_queue.put_nowait(_sentence)
except Exception as e:
self.log_error("Error adding sentence to queue: %s" % str(e))
self.upload_lock.release()
def upload_timer(self):
""" Add packets to the habitat upload queue if it is time for us to upload. """
while self.timer_thread_running:
if int(time.time()) % self.synchronous_upload_time == 0:
# Time to upload!
for _id in self.observed_payloads.keys():
# If no data, continue...
if self.observed_payloads[_id]['data'].empty():
continue
else:
# Otherwise, dump the queue and keep the latest telemetry.
while not self.observed_payloads[_id]['data'].empty():
_telem = self.observed_payloads[_id]['data'].get()
self.handle_telem_dict(_telem)
# Sleep a second so we don't hit the synchronous upload time again.
time.sleep(1)
else:
# Not yet time to upload, wait for a bit.
time.sleep(0.1)
def process_queue(self):
""" Process packets from the input queue.
This thread handles packets from the input queue (provided by the decoders)
Packets are sorted by ID, and a dictionary entry is created.
"""
while self.input_processing_running:
# Process everything in the queue.
while self.input_queue.qsize() > 0:
# Grab latest telem dictionary.
_telem = self.input_queue.get_nowait()
_id = _telem['id']
if _id not in self.observed_payloads:
# We haven't seen this ID before, so create a new dictionary entry for it.
self.observed_payloads[_id] = {'count':1, 'data':Queue(), 'habitat_document': False, 'listener_updated': False}
self.log_debug("New Payload %s. Not observed enough to allow upload." % _id)
# However, we don't yet add anything to the queue for this payload...
else:
# We have seen this payload before!
# Increment the 'seen' counter.
self.observed_payloads[_id]['count'] += 1
# If we have seen this particular ID enough times, add the data to the ID's queue.
if self.observed_payloads[_id]['count'] >= self.callsign_validity_threshold:
# If this is the first time we have observed this payload, update the listener position.
if (self.observed_payloads[_id]['listener_updated'] == False) and (self.user_position is not None):
self.observed_payloads[_id]['listener_updated'] = self.user_position_upload()
# Because receiving balloon telemetry appears to be a competition, immediately upload the
# first valid position received.
self.handle_telem_dict(_telem, immediate=True)
else:
# Otherwise, add the telemetry to the upload queue
self.observed_payloads[_id]['data'].put(_telem)
else:
self.log_debug("Payload ID %s not observed enough to allow upload." % _id)
# If we haven't uploaded our station position recently, re-upload it.
if (time.time() - self.last_user_position_upload) > self.user_position_update_rate*3600:
self.user_position_upload()
time.sleep(0.1)
def add(self, telemetry):
""" Add a dictionary of telemetry to the input queue.
Args:
telemetry (dict): Telemetry dictionary to add to the input queue.
"""
# Check the telemetry dictionary contains the required fields.
for _field in self.REQUIRED_FIELDS:
if _field not in telemetry:
self.log_error("JSON object missing required field %s" % _field)
return
# Add it to the queue if we are running.
if self.input_processing_running:
self.input_queue.put(telemetry)
else:
self.log_error("Processing not running, discarding.")
def close(self):
''' Shutdown uploader and processing threads. '''
self.log_debug("Waiting for threads to close...")
self.input_processing_running = False
self.timer_thread_running = False
self.upload_thread_running = False
# Wait for all threads to close.
if self.upload_thread is not None:
self.upload_thread.join()
if self.timer_thread is not None:
self.timer_thread.join()
if self.input_thread is not None:
self.input_thread.join()
def log_debug(self, line):
""" Helper function to log a debug message with a descriptive heading.
Args:
line (str): Message to be logged.
"""
logging.debug("Habitat - %s" % line)
def log_info(self, line):
""" Helper function to log an informational message with a descriptive heading.
Args:
line (str): Message to be logged.
"""
logging.info("Habitat - %s" % line)
def log_error(self, line):
""" Helper function to log an error message with a descriptive heading.
Args:
line (str): Message to be logged.
"""
logging.error("Habitat - %s" % line)
def log_warning(self, line):
""" Helper function to log a warning message with a descriptive heading.
Args:
line (str): Message to be logged.
"""
logging.warning("Habitat - %s" % line)