kopia lustrzana https://github.com/glidernet/ogn-python
137 wiersze
4.5 KiB
Python
137 wiersze
4.5 KiB
Python
import csv
|
|
import gzip
|
|
from io import StringIO
|
|
from datetime import datetime, timedelta
|
|
|
|
from aerofiles.seeyou import Reader
|
|
from ogn.parser.utils import FEETS_TO_METER
|
|
import requests
|
|
|
|
from .model import AircraftType, DeviceInfoOrigin, DeviceInfo, Airport, Location
|
|
|
|
|
|
DDB_URL = "http://ddb.glidernet.org/download/?t=1"
|
|
FLARMNET_URL = "http://www.flarmnet.org/files/data.fln"
|
|
|
|
|
|
address_prefixes = {"F": "FLR", "O": "OGN", "I": "ICA"}
|
|
|
|
nm2m = 1852
|
|
mi2m = 1609.34
|
|
|
|
|
|
def get_days(start, end):
|
|
days = [start + timedelta(days=x) for x in range(0, (end - start).days + 1)]
|
|
return days
|
|
|
|
|
|
def date_to_timestamps(date):
|
|
start = datetime(date.year, date.month, date.day, 0, 0, 0)
|
|
end = datetime(date.year, date.month, date.day, 23, 59, 59)
|
|
return (start, end)
|
|
|
|
|
|
def get_ddb(csv_file=None, address_origin=DeviceInfoOrigin.UNKNOWN):
|
|
if csv_file is None:
|
|
r = requests.get(DDB_URL)
|
|
rows = "\n".join(i for i in r.text.splitlines() if i[0] != "#")
|
|
else:
|
|
r = open(csv_file, "r")
|
|
rows = "".join(i for i in r.readlines() if i[0] != "#")
|
|
|
|
data = csv.reader(StringIO(rows), quotechar="'", quoting=csv.QUOTE_ALL)
|
|
|
|
device_infos = list()
|
|
for row in data:
|
|
device_info = DeviceInfo()
|
|
device_info.address_type = row[0]
|
|
device_info.address = row[1]
|
|
device_info.aircraft = row[2]
|
|
device_info.registration = row[3]
|
|
device_info.competition = row[4]
|
|
device_info.tracked = row[5] == "Y"
|
|
device_info.identified = row[6] == "Y"
|
|
device_info.aircraft_type = AircraftType(int(row[7]))
|
|
device_info.address_origin = address_origin
|
|
|
|
device_infos.append(device_info)
|
|
|
|
return device_infos
|
|
|
|
|
|
def get_flarmnet(fln_file=None, address_origin=DeviceInfoOrigin.FLARMNET):
|
|
if fln_file is None:
|
|
r = requests.get(FLARMNET_URL)
|
|
rows = [bytes.fromhex(line).decode("latin1") for line in r.text.split("\n") if len(line) == 172]
|
|
else:
|
|
with open(fln_file, "r") as file:
|
|
rows = [bytes.fromhex(line.strip()).decode("latin1") for line in file.readlines() if len(line) == 172]
|
|
|
|
device_infos = list()
|
|
for row in rows:
|
|
device_info = DeviceInfo()
|
|
device_info.address = row[0:6].strip()
|
|
device_info.aircraft = row[48:69].strip()
|
|
device_info.registration = row[69:76].strip()
|
|
device_info.competition = row[76:79].strip()
|
|
|
|
device_infos.append(device_info)
|
|
|
|
return device_infos
|
|
|
|
|
|
def get_trackable(ddb):
|
|
result = []
|
|
for i in ddb:
|
|
if i.tracked and i.address_type in address_prefixes:
|
|
result.append("{}{}".format(address_prefixes[i.address_type], i.address))
|
|
return result
|
|
|
|
|
|
def get_airports(cupfile):
|
|
airports = list()
|
|
with open(cupfile) as f:
|
|
for line in f:
|
|
try:
|
|
for waypoint in Reader([line]):
|
|
if waypoint["style"] > 5: # reject unlandable places
|
|
continue
|
|
|
|
airport = Airport()
|
|
airport.name = waypoint["name"]
|
|
airport.code = waypoint["code"]
|
|
airport.country_code = waypoint["country"]
|
|
airport.style = waypoint["style"]
|
|
airport.description = waypoint["description"]
|
|
location = Location(waypoint["longitude"], waypoint["latitude"])
|
|
airport.location_wkt = location.to_wkt()
|
|
airport.altitude = waypoint["elevation"]["value"]
|
|
if waypoint["elevation"]["unit"] == "ft":
|
|
airport.altitude = airport.altitude * FEETS_TO_METER
|
|
airport.runway_direction = waypoint["runway_direction"]
|
|
airport.runway_length = waypoint["runway_length"]["value"]
|
|
if waypoint["runway_length"]["unit"] == "nm":
|
|
airport.altitude = airport.altitude * nm2m
|
|
elif waypoint["runway_length"]["unit"] == "ml":
|
|
airport.altitude = airport.altitude * mi2m
|
|
airport.frequency = waypoint["frequency"]
|
|
|
|
airports.append(airport)
|
|
except AttributeError as e:
|
|
print("Failed to parse line: {} {}".format(line, e))
|
|
|
|
return airports
|
|
|
|
|
|
def open_file(filename):
|
|
"""Opens a regular or unzipped textfile for reading."""
|
|
f = open(filename, "rb")
|
|
a = f.read(2)
|
|
f.close()
|
|
if a == b"\x1f\x8b":
|
|
f = gzip.open(filename, "rt", encoding="latin-1")
|
|
return f
|
|
else:
|
|
f = open(filename, "rt", encoding="latin-1")
|
|
return f
|