ogn-python/app/gateway/bulkimport.py

408 wiersze
14 KiB
Python

from datetime import datetime, timedelta
from io import StringIO
from flask.cli import AppGroup
import click
from tqdm import tqdm
from mgrs import MGRS
from ogn.parser import parse, ParseError
from app.model import AircraftBeacon, ReceiverBeacon, Location
from app.utils import open_file
from app.gateway.process_tools import *
from app import db
from app import app
user_cli = AppGroup("bulkimport")
user_cli.help = "Tools for accelerated data import."
# define message types we want to proceed
AIRCRAFT_BEACON_TYPES = ["aprs_aircraft", "flarm", "tracker", "fanet", "lt24", "naviter", "skylines", "spider", "spot"]
RECEIVER_BEACON_TYPES = ["aprs_receiver", "receiver"]
# define fields we want to proceed
BEACON_KEY_FIELDS = ["name", "receiver_name", "timestamp"]
AIRCRAFT_BEACON_FIELDS = [
"location",
"altitude",
"dstcall",
"relay",
"track",
"ground_speed",
"address_type",
"aircraft_type",
"stealth",
"address",
"climb_rate",
"turn_rate",
"signal_quality",
"error_count",
"frequency_offset",
"gps_quality_horizontal",
"gps_quality_vertical",
"software_version",
"hardware_version",
"real_address",
"signal_power",
"distance",
"radial",
"quality",
"location_mgrs",
"location_mgrs_short",
"agl",
"receiver_id",
"device_id",
]
RECEIVER_BEACON_FIELDS = [
"location",
"altitude",
"dstcall",
"relay",
"version",
"platform",
"cpu_load",
"free_ram",
"total_ram",
"ntp_error",
"rt_crystal_correction",
"voltage",
"amperage",
"cpu_temp",
"senders_visible",
"senders_total",
"rec_input_noise",
"senders_signal",
"senders_messages",
"good_senders_signal",
"good_senders",
"good_and_bad_senders",
]
myMGRS = MGRS()
def string_to_message(raw_string, reference_date):
global receivers
try:
message = parse(raw_string, reference_date)
except NotImplementedError as e:
app.logger.error("No parser implemented for message: {}".format(raw_string))
return None
except ParseError as e:
app.logger.error("Parsing error with message: {}".format(raw_string))
return None
except TypeError as e:
app.logger.error("TypeError with message: {}".format(raw_string))
return None
except Exception as e:
app.logger.error("Other Exception with string: {}".format(raw_string))
return None
# update reference receivers and distance to the receiver
if message["aprs_type"] == "position":
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES + RECEIVER_BEACON_TYPES:
latitude = message["latitude"]
longitude = message["longitude"]
location = Location(longitude, latitude)
message["location"] = location.to_wkt()
location_mgrs = myMGRS.toMGRS(latitude, longitude).decode("utf-8")
message["location_mgrs"] = location_mgrs
message["location_mgrs_short"] = location_mgrs[0:5] + location_mgrs[5:7] + location_mgrs[10:12]
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES and "gps_quality" in message:
if message["gps_quality"] is not None and "horizontal" in message["gps_quality"]:
message["gps_quality_horizontal"] = message["gps_quality"]["horizontal"]
message["gps_quality_vertical"] = message["gps_quality"]["vertical"]
del message["gps_quality"]
# TODO: Fix python-ogn-client 0.91
if "senders_messages" in message and message["senders_messages"] is not None:
message["senders_messages"] = int(message["senders_messages"])
if "good_senders" in message and message["good_senders"] is not None:
message["good_senders"] = int(message["good_senders"])
if "good_and_bad_senders" in message and message["good_and_bad_senders"] is not None:
message["good_and_bad_senders"] = int(message["good_and_bad_senders"])
return message
class ContinuousDbFeeder:
def __init__(self,):
self.postfix = "continuous_import"
self.last_flush = datetime.utcnow()
self.last_add_missing = datetime.utcnow()
self.last_transfer = datetime.utcnow()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
create_tables(self.postfix)
create_indices(self.postfix)
def add(self, raw_string):
message = string_to_message(raw_string, reference_date=datetime.utcnow())
if message is None or ("raw_message" in message and message["raw_message"][0] == "#") or "beacon_type" not in message:
return
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES:
complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS])
self.aircraft_buffer.write(complete_message)
self.aircraft_buffer.write("\n")
elif message["beacon_type"] in RECEIVER_BEACON_TYPES:
complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS])
self.receiver_buffer.write(complete_message)
self.receiver_buffer.write("\n")
else:
app.logger.error("Ignore beacon_type: {}".format(message["beacon_type"]))
return
if datetime.utcnow() - self.last_flush >= timedelta(seconds=20):
self.flush()
self.prepare()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
self.last_flush = datetime.utcnow()
if datetime.utcnow() - self.last_add_missing >= timedelta(seconds=60):
self.add_missing()
self.last_add_missing = datetime.utcnow()
if datetime.utcnow() - self.last_transfer >= timedelta(seconds=30):
self.transfer()
self.delete_beacons()
self.last_transfer = datetime.utcnow()
def flush(self):
self.aircraft_buffer.seek(0)
self.receiver_buffer.seek(0)
connection = db.engine.raw_connection()
cursor = connection.cursor()
cursor.copy_from(self.aircraft_buffer, "aircraft_beacons_{0}".format(self.postfix), sep=",", columns=BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS)
cursor.copy_from(self.receiver_buffer, "receiver_beacons_{0}".format(self.postfix), sep=",", columns=BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS)
connection.commit()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
def add_missing(self):
add_missing_receivers(self.postfix)
add_missing_devices(self.postfix)
def prepare(self):
# make receivers complete
update_receiver_beacons(self.postfix)
update_receiver_location(self.postfix)
# make devices complete
update_aircraft_beacons(self.postfix)
def transfer(self):
# tranfer beacons
transfer_aircraft_beacons(self.postfix)
transfer_receiver_beacons(self.postfix)
def delete_beacons(self):
# delete already transfered beacons
delete_receiver_beacons(self.postfix)
delete_aircraft_beacons(self.postfix)
class FileDbFeeder:
def __init__(self):
self.postfix = "continuous_import"
self.last_flush = datetime.utcnow()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
create_tables(self.postfix)
create_indices(self.postfix)
def add(self, raw_string):
message = string_to_message(raw_string, reference_date=datetime.utcnow())
if message is None or ("raw_message" in message and message["raw_message"][0] == "#") or "beacon_type" not in message:
return
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES:
complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS])
self.aircraft_buffer.write(complete_message)
self.aircraft_buffer.write("\n")
elif message["beacon_type"] in RECEIVER_BEACON_TYPES:
complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS])
self.receiver_buffer.write(complete_message)
self.receiver_buffer.write("\n")
else:
app.logger.error("Ignore beacon_type: {}".format(message["beacon_type"]))
return
def prepare(self):
# make receivers complete
add_missing_receivers(self.postfix)
update_receiver_location(self.postfix)
# make devices complete
add_missing_devices(self.postfix)
# prepare beacons for transfer
create_indices(self.postfix)
update_receiver_beacons_bigdata(self.postfix)
update_aircraft_beacons_bigdata(self.postfix)
def get_aircraft_beacons_postfixes():
"""Get the postfixes from imported aircraft_beacons logs."""
postfixes = db.session.execute(
"""
SELECT DISTINCT(RIGHT(tablename, 8))
FROM pg_catalog.pg_tables
WHERE schemaname = 'public' AND tablename LIKE 'aircraft\_beacons\_20______'
ORDER BY RIGHT(tablename, 10);
"""
).fetchall()
return [postfix for postfix in postfixes]
def export_to_path(postfix):
import os, gzip
aircraft_beacons_file = os.path.join(path, "aircraft_beacons_{0}.csv.gz".format(postfix))
with gzip.open(aircraft_beacons_file, "wt", encoding="utf-8") as gzip_file:
self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_aircraft_beacons_subquery()), gzip_file)
receiver_beacons_file = os.path.join(path, "receiver_beacons_{0}.csv.gz".format(postfix))
with gzip.open(receiver_beacons_file, "wt") as gzip_file:
self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_receiver_beacons_subquery()), gzip_file)
def convert(sourcefile, datestr, saver):
from app.gateway.process import string_to_message
from app.gateway.process_tools import AIRCRAFT_BEACON_TYPES, RECEIVER_BEACON_TYPES
from datetime import datetime
fin = open_file(sourcefile)
# get total lines of the input file
total_lines = 0
for line in fin:
total_lines += 1
fin.seek(0)
current_line = 0
steps = 100000
reference_date = datetime.strptime(datestr + " 12:00:00", "%Y-%m-%d %H:%M:%S")
pbar = tqdm(fin, total=total_lines)
for line in pbar:
pbar.set_description("Importing {}".format(sourcefile))
current_line += 1
if current_line % steps == 0:
saver.flush()
message = string_to_message(line.strip(), reference_date=reference_date)
if message is None:
continue
dictfilt = lambda x, y: dict([(i, x[i]) for i in x if i in set(y)])
try:
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES:
message = dictfilt(
message,
(
"beacon_type",
"aprs_type",
"location_wkt",
"altitude",
"name",
"dstcall",
"relay",
"receiver_name",
"timestamp",
"track",
"ground_speed",
"address_type",
"aircraft_type",
"stealth",
"address",
"climb_rate",
"turn_rate",
"signal_quality",
"error_count",
"frequency_offset",
"gps_quality_horizontal",
"gps_quality_vertical",
"software_version",
"hardware_version",
"real_address",
"signal_power",
"distance",
"radial",
"quality",
"agl",
"location_mgrs",
"location_mgrs_short",
"receiver_id",
"device_id",
),
)
beacon = AircraftBeacon(**message)
elif message["beacon_type"] in RECEIVER_BEACON_TYPES:
if "rec_crystal_correction" in message:
del message["rec_crystal_correction"]
del message["rec_crystal_correction_fine"]
beacon = ReceiverBeacon(**message)
saver.add(beacon)
except Exception as e:
print(e)
saver.flush()
fin.close()
@user_cli.command("file_import")
@click.argument("path")
def file_import(path):
"""Import APRS logfiles into separate logfile tables."""
import os
import re
# Get Filepaths and dates to import
results = list()
for (root, dirs, files) in os.walk(path):
for file in sorted(files):
match = re.match("OGN_log\.txt_([0-9]{4}\-[0-9]{2}\-[0-9]{2})\.gz$", file)
if match:
results.append({"filepath": os.path.join(root, file), "datestr": match.group(1)})
with LogfileDbSaver() as saver:
already_imported = saver.get_datestrs()
results = list(filter(lambda x: x["datestr"] not in already_imported, results))
pbar = tqdm(results)
for result in pbar:
filepath = result["filepath"]
datestr = result["datestr"]
pbar.set_description("Importing data for {}".format(datestr))
saver.set_datestr(datestr)
saver.create_tables()
convert(filepath, datestr, saver)
saver.add_missing_devices()
saver.add_missing_receivers()