radiosonde_auto_rx/auto_rx/autorx/geometry.py

263 wiersze
9.3 KiB
Python

#!/usr/bin/env python
#
# Project Horus - Flight Data to Geometry
#
# Copyright (C) 2018 Mark Jessop <vk5qi@rfhead.net>
# Released under GNU GPL v3 or later
#
import math
import traceback
import logging
import numpy as np
from .utils import position_info
def getDensity(altitude):
'''
Calculate the atmospheric density for a given altitude in metres.
This is a direct port of the oziplotter Atmosphere class
'''
#Constants
airMolWeight = 28.9644 # Molecular weight of air
densitySL = 1.225 # Density at sea level [kg/m3]
pressureSL = 101325 # Pressure at sea level [Pa]
temperatureSL = 288.15 # Temperature at sea level [deg K]
gamma = 1.4
gravity = 9.80665 # Acceleration of gravity [m/s2]
tempGrad = -0.0065 # Temperature gradient [deg K/m]
RGas = 8.31432 # Gas constant [kg/Mol/K]
R = 287.053
deltaTemperature = 0.0;
# Lookup Tables
altitudes = [0, 11000, 20000, 32000, 47000, 51000, 71000, 84852]
pressureRels = [1, 2.23361105092158e-1, 5.403295010784876e-2, 8.566678359291667e-3, 1.0945601337771144e-3, 6.606353132858367e-4, 3.904683373343926e-5, 3.6850095235747942e-6]
temperatures = [288.15, 216.65, 216.65, 228.65, 270.65, 270.65, 214.65, 186.946]
tempGrads = [-6.5, 0, 1, 2.8, 0, -2.8, -2, 0]
gMR = gravity * airMolWeight / RGas;
# Pick a region to work in
i = 0
if(altitude > 0):
while (altitude > altitudes[i+1]):
i = i + 1
# Lookup based on region
baseTemp = temperatures[i]
tempGrad = tempGrads[i] / 1000.0
pressureRelBase = pressureRels[i]
deltaAltitude = altitude - altitudes[i]
temperature = baseTemp + tempGrad * deltaAltitude
# Calculate relative pressure
if(math.fabs(tempGrad) < 1e-10):
pressureRel = pressureRelBase * math.exp(-1 *gMR * deltaAltitude / 1000.0 / baseTemp)
else:
pressureRel = pressureRelBase * math.pow(baseTemp / temperature, gMR / tempGrad / 1000.0)
# Add temperature offset
temperature = temperature + deltaTemperature
# Finally, work out the density...
speedOfSound = math.sqrt(gamma * R * temperature)
pressure = pressureRel * pressureSL
density = densitySL * pressureRel * temperatureSL / temperature
return density
def seaLevelDescentRate(descent_rate, altitude):
''' Calculate the descent rate at sea level, for a given descent rate at altitude '''
rho = getDensity(altitude)
return math.sqrt((rho / 1.22) * math.pow(descent_rate, 2))
def time_to_landing(current_altitude, current_descent_rate=-5.0, ground_asl=0.0, step_size=1):
''' Calculate an estimated time to landing (in seconds) of a payload, based on its current altitude and descent rate '''
# A few checks on the input data.
if current_descent_rate > 0.0:
# If we are still ascending, return none.
return None
if current_altitude <= ground_asl:
# If the current altitude is *below* ground level, we have landed.
return 0
# Calculate the sea level descent rate.
_desc_rate = math.fabs(seaLevelDescentRate(current_descent_rate, current_altitude))
_drag_coeff = _desc_rate*1.1045 # Magic multiplier from predict.php
_alt = current_altitude
_start_time = 0
# Now step through the flight in <step_size> second steps.
# Once the altitude is below our ground level, stop, and return the elapsed time.
while _alt >= ground_asl:
_alt += step_size * -1*(_drag_coeff/math.sqrt(getDensity(_alt)))
_start_time += step_size
return _start_time
class GenericTrack(object):
"""
A Generic 'track' object, which stores track positions for a payload or chase car.
Telemetry is added using the add_telemetry method, which takes a dictionary with time/lat/lon/alt keys (at minimum).
This object performs a running average of the ascent/descent rate, and calculates the predicted landing rate if the payload
is in descent.
The track history can be exported to a LineString using the to_line_string method.
"""
def __init__(self,
ascent_averaging = 6,
landing_rate = 5.0):
''' Create a GenericTrack Object. '''
# Averaging rate.
self.ASCENT_AVERAGING = ascent_averaging
# Payload state.
self.landing_rate = landing_rate
self.ascent_rate = 0.0
self.heading = 0.0
self.speed = 0.0
self.is_descending = False
# Internal store of track history data.
# Data is stored as a list-of-lists, with elements of [datetime, lat, lon, alt, comment]
self.track_history = []
def add_telemetry(self,data_dict):
'''
Accept telemetry data as a dictionary with fields
datetime, lat, lon, alt, comment
'''
try:
_datetime = data_dict['time']
_lat = data_dict['lat']
_lon = data_dict['lon']
_alt = data_dict['alt']
if 'comment' in data_dict.keys():
_comment = data_dict['comment']
else:
_comment = ""
self.track_history.append([_datetime, _lat, _lon, _alt, _comment])
self.update_states()
return self.get_latest_state()
except ValueError:
# ValueErrors show up when the positions used are too close together, or when
# altitudes are the same between positions (divide-by-zero error)
# We can safely skip over these.
pass
except Exception as e:
logging.debug("Web - Error adding new telemetry to GenericTrack %s" % str(e))
def get_latest_state(self):
''' Get the latest position of the payload '''
if len(self.track_history) == 0:
return None
else:
_latest_position = self.track_history[-1]
_state = {
'time' : _latest_position[0],
'lat' : _latest_position[1],
'lon' : _latest_position[2],
'alt' : _latest_position[3],
'ascent_rate': self.ascent_rate,
'is_descending': self.is_descending,
'landing_rate': self.landing_rate,
'heading': self.heading,
'speed': self.speed
}
return _state
def calculate_ascent_rate(self):
''' Calculate the ascent/descent rate of the payload based on the available data '''
if len(self.track_history) <= 1:
return 0.0
elif len(self.track_history) == 2:
# Basic ascent rate case - only 2 samples.
_time_delta = (self.track_history[-1][0] - self.track_history[-2][0]).total_seconds()
_altitude_delta = self.track_history[-1][3] - self.track_history[-2][3]
return _altitude_delta/_time_delta
else:
_num_samples = min(len(self.track_history), self.ASCENT_AVERAGING)
_asc_rates = []
for _i in range(-1*(_num_samples-1), 0):
_time_delta = (self.track_history[_i][0] - self.track_history[_i-1][0]).total_seconds()
_altitude_delta = self.track_history[_i][3] - self.track_history[_i-1][3]
_asc_rates.append(_altitude_delta/_time_delta)
return np.mean(_asc_rates)
def calculate_heading(self):
''' Calculate the heading of the payload '''
if len(self.track_history) <= 1:
return 0.0
else:
_pos_1 = self.track_history[-2]
_pos_2 = self.track_history[-1]
_pos_info = position_info((_pos_1[1],_pos_1[2],_pos_1[3]), (_pos_2[1],_pos_2[2],_pos_2[3]))
return _pos_info['bearing']
def calculate_speed(self):
""" Calculate Payload Speed in metres per second """
if len(self.track_history)<=1:
return 0.0
else:
_time_delta = (self.track_history[-1][0] - self.track_history[-2][0]).total_seconds()
_pos_1 = self.track_history[-2]
_pos_2 = self.track_history[-1]
_pos_info = position_info((_pos_1[1],_pos_1[2],_pos_1[3]), (_pos_2[1],_pos_2[2],_pos_2[3]))
_speed = _pos_info['great_circle_distance']/_time_delta
return _speed
def update_states(self):
''' Update internal states based on the current data '''
self.ascent_rate = self.calculate_ascent_rate()
self.heading = self.calculate_heading()
self.speed = self.calculate_speed()
self.is_descending = self.ascent_rate < 0.0
if self.is_descending:
_current_alt = self.track_history[-1][3]
self.landing_rate = seaLevelDescentRate(self.ascent_rate, _current_alt)
def to_polyline(self):
''' Generate and return a Leaflet PolyLine compatible array '''
# Copy array into a numpy representation for easier slicing.
if len(self.track_history) == 0:
return []
elif len(self.track_history) == 1:
# LineStrings need at least 2 points. If we only have a single point,
# fudge it by duplicating the single point.
_track_data_np = np.array([self.track_history[0], self.track_history[0]])
else:
_track_data_np = np.array(self.track_history)
# Produce new array
_track_points = np.column_stack((_track_data_np[:,1], _track_data_np[:,2], _track_data_np[:,3]))
return _track_points.tolist()