ogn-python/app/gateway/bulkimport.py

411 wiersze
14 KiB
Python

from datetime import datetime, timedelta
from io import StringIO
from flask import current_app
from flask.cli import AppGroup
import click
from tqdm import tqdm
from mgrs import MGRS
from ogn.parser import parse, ParseError
from app.model import AircraftBeacon, ReceiverBeacon, Location
from app.utils import open_file
from app.gateway.process_tools import create_indices, add_missing_devices, add_missing_receivers, update_aircraft_beacons, update_receiver_beacons, update_receiver_location, transfer_aircraft_beacons, transfer_receiver_beacons, delete_aircraft_beacons, delete_receiver_beacons, update_aircraft_beacons_bigdata, update_receiver_beacons_bigdata, create_tables
from app import db
user_cli = AppGroup("bulkimport")
user_cli.help = "Tools for accelerated data import."
# define message types we want to proceed
AIRCRAFT_BEACON_TYPES = ["aprs_aircraft", "flarm", "tracker", "fanet", "lt24", "naviter", "skylines", "spider", "spot", "flymaster"]
RECEIVER_BEACON_TYPES = ["aprs_receiver", "receiver"]
# define fields we want to proceed
BEACON_KEY_FIELDS = ["name", "receiver_name", "timestamp"]
AIRCRAFT_BEACON_FIELDS = [
"location",
"altitude",
"dstcall",
"relay",
"track",
"ground_speed",
"address_type",
"aircraft_type",
"stealth",
"address",
"climb_rate",
"turn_rate",
"signal_quality",
"error_count",
"frequency_offset",
"gps_quality_horizontal",
"gps_quality_vertical",
"software_version",
"hardware_version",
"real_address",
"signal_power",
"distance",
"radial",
"quality",
"location_mgrs",
"location_mgrs_short",
"agl",
"receiver_id",
"device_id",
]
RECEIVER_BEACON_FIELDS = [
"location",
"altitude",
"dstcall",
"relay",
"version",
"platform",
"cpu_load",
"free_ram",
"total_ram",
"ntp_error",
"rt_crystal_correction",
"voltage",
"amperage",
"cpu_temp",
"senders_visible",
"senders_total",
"rec_input_noise",
"senders_signal",
"senders_messages",
"good_senders_signal",
"good_senders",
"good_and_bad_senders",
]
myMGRS = MGRS()
def string_to_message(raw_string, reference_date):
global receivers
try:
message = parse(raw_string, reference_date)
except NotImplementedError as e:
current_app.logger.error("No parser implemented for message: {}".format(raw_string))
return None
except ParseError as e:
current_app.logger.error("Parsing error with message: {}".format(raw_string))
return None
except TypeError as e:
current_app.logger.error("TypeError with message: {}".format(raw_string))
return None
except Exception as e:
current_app.logger.error("Other Exception with string: {}".format(raw_string))
return None
# update reference receivers and distance to the receiver
if message["aprs_type"] == "position":
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES + RECEIVER_BEACON_TYPES:
latitude = message["latitude"]
longitude = message["longitude"]
location = Location(longitude, latitude)
message["location"] = location.to_wkt()
location_mgrs = myMGRS.toMGRS(latitude, longitude).decode("utf-8")
message["location_mgrs"] = location_mgrs
message["location_mgrs_short"] = location_mgrs[0:5] + location_mgrs[5:7] + location_mgrs[10:12]
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES and "gps_quality" in message:
if message["gps_quality"] is not None and "horizontal" in message["gps_quality"]:
message["gps_quality_horizontal"] = message["gps_quality"]["horizontal"]
message["gps_quality_vertical"] = message["gps_quality"]["vertical"]
del message["gps_quality"]
# TODO: Fix python-ogn-client 0.91
if "senders_messages" in message and message["senders_messages"] is not None:
message["senders_messages"] = int(message["senders_messages"])
if "good_senders" in message and message["good_senders"] is not None:
message["good_senders"] = int(message["good_senders"])
if "good_and_bad_senders" in message and message["good_and_bad_senders"] is not None:
message["good_and_bad_senders"] = int(message["good_and_bad_senders"])
return message
class ContinuousDbFeeder:
def __init__(self,):
self.postfix = "continuous_import"
self.last_flush = datetime.utcnow()
self.last_add_missing = datetime.utcnow()
self.last_transfer = datetime.utcnow()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
create_tables(self.postfix)
create_indices(self.postfix)
def add(self, raw_string):
message = string_to_message(raw_string, reference_date=datetime.utcnow())
if message is None or ("raw_message" in message and message["raw_message"][0] == "#") or "beacon_type" not in message:
return
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES:
complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS])
self.aircraft_buffer.write(complete_message)
self.aircraft_buffer.write("\n")
elif message["beacon_type"] in RECEIVER_BEACON_TYPES:
complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS])
self.receiver_buffer.write(complete_message)
self.receiver_buffer.write("\n")
else:
current_app.logger.error("Ignore beacon_type: {}".format(message["beacon_type"]))
return
if datetime.utcnow() - self.last_flush >= timedelta(seconds=20):
self.flush()
self.prepare()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
self.last_flush = datetime.utcnow()
if datetime.utcnow() - self.last_add_missing >= timedelta(seconds=60):
self.add_missing()
self.last_add_missing = datetime.utcnow()
if datetime.utcnow() - self.last_transfer >= timedelta(seconds=30):
self.transfer()
self.delete_beacons()
self.last_transfer = datetime.utcnow()
def flush(self):
self.aircraft_buffer.seek(0)
self.receiver_buffer.seek(0)
connection = db.engine.raw_connection()
cursor = connection.cursor()
cursor.copy_from(self.aircraft_buffer, "aircraft_beacons_{0}".format(self.postfix), sep=",", columns=BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS)
cursor.copy_from(self.receiver_buffer, "receiver_beacons_{0}".format(self.postfix), sep=",", columns=BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS)
connection.commit()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
def add_missing(self):
add_missing_receivers(self.postfix)
add_missing_devices(self.postfix)
def prepare(self):
# make receivers complete
update_receiver_beacons(self.postfix)
update_receiver_location(self.postfix)
# make devices complete
update_aircraft_beacons(self.postfix)
def transfer(self):
# tranfer beacons
transfer_aircraft_beacons(self.postfix)
transfer_receiver_beacons(self.postfix)
def delete_beacons(self):
# delete already transfered beacons
delete_receiver_beacons(self.postfix)
delete_aircraft_beacons(self.postfix)
class FileDbFeeder:
def __init__(self):
self.postfix = "continuous_import"
self.last_flush = datetime.utcnow()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
create_tables(self.postfix)
create_indices(self.postfix)
def add(self, raw_string):
message = string_to_message(raw_string, reference_date=datetime.utcnow())
if message is None or ("raw_message" in message and message["raw_message"][0] == "#") or "beacon_type" not in message:
return
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES:
complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS])
self.aircraft_buffer.write(complete_message)
self.aircraft_buffer.write("\n")
elif message["beacon_type"] in RECEIVER_BEACON_TYPES:
complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS])
self.receiver_buffer.write(complete_message)
self.receiver_buffer.write("\n")
else:
current_app.logger.error("Ignore beacon_type: {}".format(message["beacon_type"]))
return
def prepare(self):
# make receivers complete
add_missing_receivers(self.postfix)
update_receiver_location(self.postfix)
# make devices complete
add_missing_devices(self.postfix)
# prepare beacons for transfer
create_indices(self.postfix)
update_receiver_beacons_bigdata(self.postfix)
update_aircraft_beacons_bigdata(self.postfix)
def get_aircraft_beacons_postfixes():
"""Get the postfixes from imported aircraft_beacons logs."""
postfixes = db.session.execute(
r"""
SELECT DISTINCT(RIGHT(tablename, 8))
FROM pg_catalog.pg_tables
WHERE schemaname = 'public' AND tablename LIKE 'aircraft\_beacons\_20______'
ORDER BY RIGHT(tablename, 10);
"""
).fetchall()
return [postfix for postfix in postfixes]
def export_to_path(postfix):
import os
import gzip
pass # wtf is this?
# aircraft_beacons_file = os.path.join(path, "aircraft_beacons_{0}.csv.gz".format(postfix))
# with gzip.open(aircraft_beacons_file, "wt", encoding="utf-8") as gzip_file:
# self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_aircraft_beacons_subquery()), gzip_file)
# receiver_beacons_file = os.path.join(path, "receiver_beacons_{0}.csv.gz".format(postfix))
# with gzip.open(receiver_beacons_file, "wt") as gzip_file:
# self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_receiver_beacons_subquery()), gzip_file)
def convert(sourcefile, datestr, saver):
from app.gateway.process import string_to_message
from app.gateway.process_tools import AIRCRAFT_BEACON_TYPES, RECEIVER_BEACON_TYPES
from datetime import datetime
fin = open_file(sourcefile)
# get total lines of the input file
total_lines = 0
for line in fin:
total_lines += 1
fin.seek(0)
current_line = 0
steps = 100000
reference_date = datetime.strptime(datestr + " 12:00:00", "%Y-%m-%d %H:%M:%S")
pbar = tqdm(fin, total=total_lines)
for line in pbar:
pbar.set_description("Importing {}".format(sourcefile))
current_line += 1
if current_line % steps == 0:
saver.flush()
message = string_to_message(line.strip(), reference_date=reference_date)
if message is None:
continue
def dictfilt(x, y):
return dict([(i, x[i]) for i in x if i in set(y)])
try:
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES:
message = dictfilt(
message,
(
"beacon_type",
"aprs_type",
"location_wkt",
"altitude",
"name",
"dstcall",
"relay",
"receiver_name",
"timestamp",
"track",
"ground_speed",
"address_type",
"aircraft_type",
"stealth",
"address",
"climb_rate",
"turn_rate",
"signal_quality",
"error_count",
"frequency_offset",
"gps_quality_horizontal",
"gps_quality_vertical",
"software_version",
"hardware_version",
"real_address",
"signal_power",
"distance",
"radial",
"quality",
"agl",
"location_mgrs",
"location_mgrs_short",
"receiver_id",
"device_id",
),
)
beacon = AircraftBeacon(**message)
elif message["beacon_type"] in RECEIVER_BEACON_TYPES:
if "rec_crystal_correction" in message:
del message["rec_crystal_correction"]
del message["rec_crystal_correction_fine"]
beacon = ReceiverBeacon(**message)
saver.add(beacon)
except Exception as e:
print(e)
saver.flush()
fin.close()
@user_cli.command("file_import")
@click.argument("path")
def file_import(path):
"""Import APRS logfiles into separate logfile tables."""
import os
import re
# Get Filepaths and dates to import
results = list()
for (root, dirs, files) in os.walk(path):
for file in sorted(files):
match = re.match(r"OGN_log\.txt_([0-9]{4}\-[0-9]{2}\-[0-9]{2})\.gz$", file)
if match:
results.append({"filepath": os.path.join(root, file), "datestr": match.group(1)})
with LogfileDbSaver() as saver: # noqa: F821
already_imported = saver.get_datestrs()
results = list(filter(lambda x: x["datestr"] not in already_imported, results))
pbar = tqdm(results)
for result in pbar:
filepath = result["filepath"]
datestr = result["datestr"]
pbar.set_description("Importing data for {}".format(datestr))
saver.set_datestr(datestr)
saver.create_tables()
convert(filepath, datestr, saver)
saver.add_missing_devices()
saver.add_missing_receivers()