diff --git a/app/commands/gateway.py b/app/commands/gateway.py index 7d942e3..7691e0a 100644 --- a/app/commands/gateway.py +++ b/app/commands/gateway.py @@ -1,8 +1,13 @@ +import os +import datetime + from flask import current_app from flask.cli import AppGroup +import click from ogn.client import AprsClient -from app.gateway.bulkimport import ContinuousDbFeeder + +from app.gateway.bulkimport import convert, DbFeeder user_cli = AppGroup("gateway") user_cli.help = "Connection to APRS servers." @@ -10,9 +15,7 @@ user_cli.help = "Connection to APRS servers." @user_cli.command("run") def run(aprs_user="anon-dev"): - """Run the aprs client.""" - - saver = ContinuousDbFeeder() + """Run the aprs client and feed the DB with incoming data.""" # User input validation if len(aprs_user) < 3 or len(aprs_user) > 9: @@ -23,10 +26,24 @@ def run(aprs_user="anon-dev"): client = AprsClient(aprs_user) client.connect() - try: - client.run(callback=saver.add, autoreconnect=True) - except KeyboardInterrupt: - current_app.logger.warning("\nStop ogn gateway") + with DbFeeder(prefix='continuous_import', reference_timestamp=datetime.utcnow, reference_timestamp_autoupdate=True) as feeder: + try: + client.run(callback=lambda x: feeder.add(x), autoreconnect=True) + except KeyboardInterrupt: + current_app.logger.warning("\nStop ogn gateway") - saver.flush() client.disconnect() + + +@user_cli.command("convert") +@click.argument("path") +def file_import(path): + """Convert APRS logfiles into csv files for fast bulk import.""" + + logfiles = [] + for (root, dirs, files) in os.walk(path): + for file in sorted(files): + logfiles.append(os.path.join(root, file)) + + for logfile in logfiles: + convert(logfile) diff --git a/app/gateway/bulkimport.py b/app/gateway/bulkimport.py index 69f0b8d..4e0a88f 100644 --- a/app/gateway/bulkimport.py +++ b/app/gateway/bulkimport.py @@ -1,3 +1,5 @@ +import os +import re from datetime import datetime, timedelta from io import StringIO @@ -9,9 +11,8 @@ from mgrs import MGRS from ogn.parser import parse, ParseError -from app.model import AircraftBeacon, AircraftType, ReceiverBeacon, Location -from app.utils import open_file -from app.gateway.process_tools import create_indices, add_missing_devices, add_missing_receivers, update_aircraft_beacons, update_receiver_beacons, update_receiver_location, transfer_aircraft_beacons, transfer_receiver_beacons, delete_aircraft_beacons, delete_receiver_beacons, update_aircraft_beacons_bigdata, update_receiver_beacons_bigdata, create_tables +from app.model import AircraftType, Location +from app.gateway.process_tools import open_file, create_tables, drop_tables, update_aircraft_beacons_bigdata from app import db @@ -19,6 +20,8 @@ user_cli = AppGroup("bulkimport") user_cli.help = "Tools for accelerated data import." +basepath = os.path.dirname(os.path.realpath(__file__)) + # define message types we want to proceed AIRCRAFT_BEACON_TYPES = ["aprs_aircraft", "flarm", "tracker", "fanet", "lt24", "naviter", "skylines", "spider", "spot", "flymaster"] RECEIVER_BEACON_TYPES = ["aprs_receiver", "receiver"] @@ -53,361 +56,151 @@ AIRCRAFT_BEACON_FIELDS = [ "location_mgrs", "location_mgrs_short", "agl", - "receiver_id", - "device_id", ] RECEIVER_BEACON_FIELDS = [ "location", "altitude", "dstcall", - "relay", - "version", - "platform", - "cpu_load", - "free_ram", - "total_ram", - "ntp_error", - "rt_crystal_correction", - "voltage", - "amperage", - "cpu_temp", - "senders_visible", - "senders_total", - "rec_input_noise", - "senders_signal", - "senders_messages", - "good_senders_signal", - "good_senders", - "good_and_bad_senders", ] -myMGRS = MGRS() +def initial_file_scan(file): + """Scan file and get rowcount and first server timestamp.""" + + row_count = 0 + timestamp = None + + for row in file: + row_count += 1 + if timestamp is None and row[0] == '#': + message = parse(row) + if message['aprs_type'] == 'server': + timestamp = message['timestamp'] + + file.seek(0) + return row_count, timestamp -def string_to_message(raw_string, reference_date): - global receivers +class DbFeeder: + def __init__(self, postfix, reference_timestamp, auto_update_timestamp): + self.postfix = postfix + self.reference_timestamp = reference_timestamp + self.auto_update_timestamp = auto_update_timestamp - try: - message = parse(raw_string, reference_date) - except NotImplementedError as e: - current_app.logger.error("No parser implemented for message: {}".format(raw_string)) - return None - except ParseError as e: - current_app.logger.error("Parsing error with message: {}".format(raw_string)) - return None - except TypeError as e: - current_app.logger.error("TypeError with message: {}".format(raw_string)) - return None - except Exception as e: - current_app.logger.error("Other Exception with string: {}".format(raw_string)) - return None + self.last_flush = datetime.utcnow() - # update reference receivers and distance to the receiver - if message["aprs_type"] == "position": - if message["beacon_type"] in AIRCRAFT_BEACON_TYPES + RECEIVER_BEACON_TYPES: + self.aircraft_buffer = StringIO() + self.receiver_buffer = StringIO() + + self.connection = db.engine.raw_connection() + self.cursor = self.connection.cursor() + + self.mgrs = MGRS() + + create_tables(self.postfix) + + def __enter__(self): + return self + + def __exit__(self, *args): + self._flush() + update_aircraft_beacons_bigdata(self.postfix) + self.connection.commit() + + self.cursor.close() + self.connection.close() + + def _flush(self): + self.aircraft_buffer.seek(0) + self.receiver_buffer.seek(0) + + self.cursor.copy_from(self.aircraft_buffer, "aircraft_beacons_{postfix}".format(postfix=self.postfix), sep=",", columns=BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS) + self.cursor.copy_from(self.receiver_buffer, "receiver_beacons_{postfix}".format(postfix=self.postfix), sep=",", columns=BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS) + self.connection.commit() + + self.aircraft_buffer = StringIO() + self.receiver_buffer = StringIO() + + def add(self, raw_string): + try: + message = parse(raw_string, reference_timestamp=self.reference_timestamp) + except NotImplementedError as e: + current_app.logger.error("No parser implemented for message: {}".format(raw_string)) + return + except ParseError as e: + current_app.logger.error("Parsing error with message: {}".format(raw_string)) + return + except TypeError as e: + current_app.logger.error("TypeError with message: {}".format(raw_string)) + return + except Exception as e: + current_app.logger.error("Other Exception with string: {}".format(raw_string)) + return + + if message['aprs_type'] not in ('server', 'position'): + return + + elif message['aprs_type'] == 'server' and self.auto_update_timestamp is True: + self.reference_timestamp = message['timestamp'] + return + + elif message['aprs_type'] == 'position': latitude = message["latitude"] longitude = message["longitude"] location = Location(longitude, latitude) message["location"] = location.to_wkt() - location_mgrs = myMGRS.toMGRS(latitude, longitude).decode("utf-8") + + location_mgrs = self.mgrs.toMGRS(latitude, longitude).decode("utf-8") message["location_mgrs"] = location_mgrs message["location_mgrs_short"] = location_mgrs[0:5] + location_mgrs[5:7] + location_mgrs[10:12] - if message["beacon_type"] in AIRCRAFT_BEACON_TYPES and "aircraft_type" in message: - message["aircraft_type"] = AircraftType(message["aircraft_type"]).name if message["aircraft_type"] else AircraftType.UNKNOWN.name + if "aircraft_type" in message: + message["aircraft_type"] = AircraftType(message["aircraft_type"]).name if message["aircraft_type"] in AircraftType.list() else AircraftType.UNKNOWN.name - if message["beacon_type"] in AIRCRAFT_BEACON_TYPES and "gps_quality" in message: - if message["gps_quality"] is not None and "horizontal" in message["gps_quality"]: - message["gps_quality_horizontal"] = message["gps_quality"]["horizontal"] - message["gps_quality_vertical"] = message["gps_quality"]["vertical"] - del message["gps_quality"] + if "gps_quality" in message: + if message["gps_quality"] is not None and "horizontal" in message["gps_quality"]: + message["gps_quality_horizontal"] = message["gps_quality"]["horizontal"] + message["gps_quality_vertical"] = message["gps_quality"]["vertical"] + del message["gps_quality"] - # TODO: Fix python-ogn-client 0.91 - if "senders_messages" in message and message["senders_messages"] is not None: - message["senders_messages"] = int(message["senders_messages"]) - if "good_senders" in message and message["good_senders"] is not None: - message["good_senders"] = int(message["good_senders"]) - if "good_and_bad_senders" in message and message["good_and_bad_senders"] is not None: - message["good_and_bad_senders"] = int(message["good_and_bad_senders"]) - - return message - - -class ContinuousDbFeeder: - def __init__(self,): - self.postfix = "continuous_import" - self.last_flush = datetime.utcnow() - self.last_add_missing = datetime.utcnow() - self.last_transfer = datetime.utcnow() - - self.aircraft_buffer = StringIO() - self.receiver_buffer = StringIO() - - create_tables(self.postfix) - create_indices(self.postfix) - - def add(self, raw_string): - message = string_to_message(raw_string, reference_date=datetime.utcnow()) - - if message is None or ("raw_message" in message and message["raw_message"][0] == "#") or "beacon_type" not in message: - return - - if message["beacon_type"] in AIRCRAFT_BEACON_TYPES: - complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS]) - self.aircraft_buffer.write(complete_message) - self.aircraft_buffer.write("\n") - elif message["beacon_type"] in RECEIVER_BEACON_TYPES: + if message["beacon_type"] in RECEIVER_BEACON_TYPES: complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS]) self.receiver_buffer.write(complete_message) self.receiver_buffer.write("\n") + elif message["beacon_type"] in AIRCRAFT_BEACON_TYPES: + complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS]) + self.aircraft_buffer.write(complete_message) + self.aircraft_buffer.write("\n") else: current_app.logger.error("Ignore beacon_type: {}".format(message["beacon_type"])) return - if datetime.utcnow() - self.last_flush >= timedelta(seconds=20): - self.flush() - self.prepare() - - self.aircraft_buffer = StringIO() - self.receiver_buffer = StringIO() - + if datetime.utcnow() - self.last_flush >= timedelta(seconds=5): + self._flush() self.last_flush = datetime.utcnow() - if datetime.utcnow() - self.last_add_missing >= timedelta(seconds=60): - self.add_missing() - self.last_add_missing = datetime.utcnow() - if datetime.utcnow() - self.last_transfer >= timedelta(seconds=30): - self.transfer() - self.delete_beacons() - self.last_transfer = datetime.utcnow() +def convert(sourcefile): + with open_file(sourcefile) as filehandler: + total_lines, reference_timestamp = initial_file_scan(filehandler) - def flush(self): - self.aircraft_buffer.seek(0) - self.receiver_buffer.seek(0) - - connection = db.engine.raw_connection() - cursor = connection.cursor() - cursor.copy_from(self.aircraft_buffer, "aircraft_beacons_{0}".format(self.postfix), sep=",", columns=BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS) - cursor.copy_from(self.receiver_buffer, "receiver_beacons_{0}".format(self.postfix), sep=",", columns=BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS) - connection.commit() - - self.aircraft_buffer = StringIO() - self.receiver_buffer = StringIO() - - def add_missing(self): - add_missing_receivers(self.postfix) - add_missing_devices(self.postfix) - - def prepare(self): - # make receivers complete - update_receiver_beacons(self.postfix) - update_receiver_location(self.postfix) - - # make devices complete - update_aircraft_beacons(self.postfix) - - def transfer(self): - # tranfer beacons - transfer_aircraft_beacons(self.postfix) - transfer_receiver_beacons(self.postfix) - - def delete_beacons(self): - # delete already transfered beacons - delete_receiver_beacons(self.postfix) - delete_aircraft_beacons(self.postfix) - - -class FileDbFeeder: - def __init__(self): - self.postfix = "continuous_import" - self.last_flush = datetime.utcnow() - - self.aircraft_buffer = StringIO() - self.receiver_buffer = StringIO() - - create_tables(self.postfix) - create_indices(self.postfix) - - def add(self, raw_string): - message = string_to_message(raw_string, reference_date=datetime.utcnow()) - - if message is None or ("raw_message" in message and message["raw_message"][0] == "#") or "beacon_type" not in message: - return - - if message["beacon_type"] in AIRCRAFT_BEACON_TYPES: - complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS]) - self.aircraft_buffer.write(complete_message) - self.aircraft_buffer.write("\n") - elif message["beacon_type"] in RECEIVER_BEACON_TYPES: - complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS]) - self.receiver_buffer.write(complete_message) - self.receiver_buffer.write("\n") + if reference_timestamp is not None: + auto_update_timestamp = True + postfix = str(reference_timestamp.total_seconds()) + else: + auto_update_timestamp = False + match = re.match(r".*OGN_log\.txt_([0-9]{4}\-[0-9]{2}\-[0-9]{2})\.gz$", sourcefile) + if match: + reference_timestamp = datetime.strptime(match.group(1), "%Y-%m-%d") + postfix = reference_timestamp.strftime("%Y_%m_%d") else: - current_app.logger.error("Ignore beacon_type: {}".format(message["beacon_type"])) + current_app.logger.error("No reference time information. Skipping file: {}".format(sourcefile)) return - def prepare(self): - # make receivers complete - add_missing_receivers(self.postfix) - update_receiver_location(self.postfix) - - # make devices complete - add_missing_devices(self.postfix) - - # prepare beacons for transfer - create_indices(self.postfix) - update_receiver_beacons_bigdata(self.postfix) - update_aircraft_beacons_bigdata(self.postfix) - - -def get_aircraft_beacons_postfixes(): - """Get the postfixes from imported aircraft_beacons logs.""" - - postfixes = db.session.execute( - r""" - SELECT DISTINCT(RIGHT(tablename, 8)) - FROM pg_catalog.pg_tables - WHERE schemaname = 'public' AND tablename LIKE 'aircraft\_beacons\_20______' - ORDER BY RIGHT(tablename, 10); - """ - ).fetchall() - - return [postfix for postfix in postfixes] - - -def export_to_path(postfix): - import os - import gzip - - pass # wtf is this? - # aircraft_beacons_file = os.path.join(path, "aircraft_beacons_{0}.csv.gz".format(postfix)) - # with gzip.open(aircraft_beacons_file, "wt", encoding="utf-8") as gzip_file: - # self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_aircraft_beacons_subquery()), gzip_file) - # receiver_beacons_file = os.path.join(path, "receiver_beacons_{0}.csv.gz".format(postfix)) - # with gzip.open(receiver_beacons_file, "wt") as gzip_file: - # self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_receiver_beacons_subquery()), gzip_file) - - -def convert(sourcefile, datestr, saver): - from app.gateway.process import string_to_message - from app.gateway.process_tools import AIRCRAFT_BEACON_TYPES, RECEIVER_BEACON_TYPES - from datetime import datetime - - fin = open_file(sourcefile) - - # get total lines of the input file - total_lines = 0 - for line in fin: - total_lines += 1 - fin.seek(0) - - current_line = 0 - steps = 100000 - reference_date = datetime.strptime(datestr + " 12:00:00", "%Y-%m-%d %H:%M:%S") - - pbar = tqdm(fin, total=total_lines) - for line in pbar: - pbar.set_description("Importing {}".format(sourcefile)) - - current_line += 1 - if current_line % steps == 0: - saver.flush() - - message = string_to_message(line.strip(), reference_date=reference_date) - if message is None: - continue - - def dictfilt(x, y): - return dict([(i, x[i]) for i in x if i in set(y)]) - - try: - if message["beacon_type"] in AIRCRAFT_BEACON_TYPES: - message = dictfilt( - message, - ( - "beacon_type", - "aprs_type", - "location_wkt", - "altitude", - "name", - "dstcall", - "relay", - "receiver_name", - "timestamp", - "track", - "ground_speed", - "address_type", - "aircraft_type", - "stealth", - "address", - "climb_rate", - "turn_rate", - "signal_quality", - "error_count", - "frequency_offset", - "gps_quality_horizontal", - "gps_quality_vertical", - "software_version", - "hardware_version", - "real_address", - "signal_power", - "distance", - "radial", - "quality", - "agl", - "location_mgrs", - "location_mgrs_short", - "receiver_id", - "device_id", - ), - ) - - beacon = AircraftBeacon(**message) - elif message["beacon_type"] in RECEIVER_BEACON_TYPES: - if "rec_crystal_correction" in message: - del message["rec_crystal_correction"] - del message["rec_crystal_correction_fine"] - beacon = ReceiverBeacon(**message) - saver.add(beacon) - except Exception as e: - print(e) - - saver.flush() - fin.close() - - -@user_cli.command("file_import") -@click.argument("path") -def file_import(path): - """Import APRS logfiles into separate logfile tables.""" - - import os - import re - - # Get Filepaths and dates to import - results = list() - for (root, dirs, files) in os.walk(path): - for file in sorted(files): - match = re.match(r"OGN_log\.txt_([0-9]{4}\-[0-9]{2}\-[0-9]{2})\.gz$", file) - if match: - results.append({"filepath": os.path.join(root, file), "datestr": match.group(1)}) - - with LogfileDbSaver() as saver: # noqa: F821 - already_imported = saver.get_datestrs() - - results = list(filter(lambda x: x["datestr"] not in already_imported, results)) - - pbar = tqdm(results) - for result in pbar: - filepath = result["filepath"] - datestr = result["datestr"] - pbar.set_description("Importing data for {}".format(datestr)) - - saver.set_datestr(datestr) - saver.create_tables() - convert(filepath, datestr, saver) - saver.add_missing_devices() - saver.add_missing_receivers() + with open_file(sourcefile) as fin: + with DbFeeder(postfix=postfix, reference_timestamp=reference_timestamp, auto_update_timestamp=auto_update_timestamp) as feeder: + pbar = tqdm(fin, total=total_lines) + for line in pbar: + pbar.set_description("Importing {}".format(sourcefile)) + feeder.add(raw_string=line) diff --git a/app/gateway/process_tools.py b/app/gateway/process_tools.py index 4e9d927..9cc46f0 100644 --- a/app/gateway/process_tools.py +++ b/app/gateway/process_tools.py @@ -1,184 +1,69 @@ +import os +import gzip +import time +from contextlib import contextmanager + +from flask import current_app from app import db +@contextmanager +def open_file(filename): + """Opens a regular OR gzipped textfile for reading.""" + + file = open(filename, "rb") + a = file.read(2) + file.close() + if a == b"\x1f\x8b": + file = gzip.open(filename, "rt", encoding="latin-1") + else: + file = open(filename, "rt", encoding="latin-1") + + try: + yield file + finally: + file.close() + + +class Timer(object): + def __init__(self, name=None): + self.name = name + + def __enter__(self): + self.tstart = time.time() + + def __exit__(self, type, value, traceback): + if self.name: + print("[{}]".format(self.name)) + print("Elapsed: {}".format(time.time() - self.tstart)) + + +def drop_tables(postfix): + """Drop tables for log file import.""" + + db.session.execute(""" + DROP TABLE IF EXISTS "aircraft_beacons_{postfix}"; + DROP TABLE IF EXISTS "receiver_beacons_{postfix}"; + """.format(postfix=postfix)) + db.session.commit() + + def create_tables(postfix): """Create tables for log file import.""" - db.session.execute('DROP TABLE IF EXISTS "aircraft_beacons_{0}"; CREATE TABLE aircraft_beacons_{0} AS TABLE aircraft_beacons WITH NO DATA;'.format(postfix)) - db.session.execute('DROP TABLE IF EXISTS "receiver_beacons_{0}"; CREATE TABLE receiver_beacons_{0} AS TABLE receiver_beacons WITH NO DATA;'.format(postfix)) - db.session.commit() - - -def create_indices(postfix): - """Creates indices for aircraft- and receiver-beacons.""" - - db.session.execute( - """ - CREATE INDEX IF NOT EXISTS ix_aircraft_beacons_{0}_device_id ON "aircraft_beacons_{0}" (device_id NULLS FIRST); - CREATE INDEX IF NOT EXISTS ix_aircraft_beacons_{0}_receiver_id ON "aircraft_beacons_{0}" (receiver_id NULLS FIRST); - CREATE INDEX IF NOT EXISTS ix_aircraft_beacons_{0}_timestamp_name_receiver_name ON "aircraft_beacons_{0}" (timestamp, name, receiver_name); - CREATE INDEX IF NOT EXISTS ix_receiver_beacons_{0}_timestamp_name_receiver_name ON "receiver_beacons_{0}" (timestamp, name, receiver_name); - """.format( - postfix - ) - ) - db.session.commit() - - -def create_indices_bigdata(postfix): - """Creates indices for aircraft- and receiver-beacons.""" - - db.session.execute( - """ - CREATE INDEX IF NOT EXISTS ix_aircraft_beacons_{0}_timestamp_name_receiver_name ON "aircraft_beacons_{0}" (timestamp, name, receiver_name); - CREATE INDEX IF NOT EXISTS ix_receiver_beacons_{0}_timestamp_name_receiver_name ON "receiver_beacons_{0}" (timestamp, name, receiver_name); - """.format( - postfix - ) - ) - db.session.commit() - - -def add_missing_devices(postfix): - """Add missing devices.""" - - db.session.execute( - """ - INSERT INTO devices(address) - SELECT DISTINCT (ab.address) - FROM "aircraft_beacons_{0}" AS ab - WHERE ab.address IS NOT NULL AND NOT EXISTS (SELECT 1 FROM devices AS d WHERE d.address = ab.address) - ORDER BY ab.address; - """.format( - postfix - ) - ) - db.session.commit() - - -def add_missing_receivers(postfix): - """Add missing receivers.""" - - db.session.execute( - """ - INSERT INTO receivers(name) - SELECT DISTINCT (rb.name) - FROM "receiver_beacons_{0}" AS rb - WHERE NOT EXISTS (SELECT 1 FROM receivers AS r WHERE r.name = rb.name) - ORDER BY rb.name; - - INSERT INTO receivers(name) - SELECT DISTINCT (ab.receiver_name) - FROM "aircraft_beacons_{0}" AS ab - WHERE NOT EXISTS (SELECT 1 FROM receivers AS r WHERE r.name = ab.receiver_name) - ORDER BY ab.receiver_name; - """.format( - postfix - ) - ) - db.session.commit() - - -def update_receiver_location(postfix): - """Updates the receiver location. We need this because we want the actual location for distance calculations.""" - - db.session.execute( - """ - UPDATE receivers AS r - SET - location = sq.location, - altitude = sq.altitude - FROM ( - SELECT DISTINCT ON (rb.receiver_id) rb.receiver_id, rb.location, rb.altitude - FROM "receiver_beacons_{0}" AS rb - WHERE rb.location IS NOT NULL - ORDER BY rb.receiver_id, rb.timestamp - ) AS sq - WHERE r.id = sq.receiver_id; - """.format( - postfix - ) - ) - db.session.commit() - - -def update_receiver_beacons(postfix): - """Updates the foreign keys.""" - - db.session.execute( - """ - UPDATE receiver_beacons_{0} AS rb - SET receiver_id = r.id - FROM receivers AS r - WHERE rb.receiver_id IS NULL AND rb.name = r.name; - """.format( - postfix - ) - ) - db.session.commit() - - -def update_receiver_beacons_bigdata(postfix): - """Updates the foreign keys. - Due to performance reasons we use a new table instead of updating the old.""" - - db.session.execute( - """ - SELECT - rb.location, rb.altitude, rb.name, rb.receiver_name, rb.dstcall, rb.timestamp, - - rb.version, rb.platform, rb.cpu_load, rb.free_ram, rb.total_ram, rb.ntp_error, rb.rt_crystal_correction, rb.voltage, rb.amperage, - rb.cpu_temp, rb.senders_visible, rb.senders_total, rb.rec_input_noise, rb.senders_signal, rb.senders_messages, rb.good_senders_signal, - rb.good_senders, rb.good_and_bad_senders, - - r.id AS receiver_id - INTO "receiver_beacons_{0}_temp" - FROM "receiver_beacons_{0}" AS rb, receivers AS r - WHERE rb.name = r.name; - - DROP TABLE IF EXISTS "receiver_beacons_{0}"; - ALTER TABLE "receiver_beacons_{0}_temp" RENAME TO "receiver_beacons_{0}"; - """.format( - postfix - ) - ) - db.session.commit() - - -def update_aircraft_beacons(postfix): - """Updates the foreign keys and calculates distance/radial and quality and computes the altitude above ground level. - Elevation data has to be in the table 'elevation' with srid 4326.""" - - db.session.execute( - """ - UPDATE aircraft_beacons_{0} AS ab - SET - device_id = d.id, - receiver_id = r.id, - distance = CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(ST_DistanceSphere(ab.location, r.location) AS REAL) ELSE NULL END, - radial = CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(degrees(ST_Azimuth(ab.location, r.location)) AS SMALLINT) ELSE NULL END, - quality = CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL AND ST_DistanceSphere(ab.location, r.location) > 0 AND ab.signal_quality IS NOT NULL - THEN CAST(signal_quality + 20*log(ST_DistanceSphere(ab.location, r.location)/10000) AS REAL) - ELSE NULL - END, - agl = CAST(ab.altitude - ST_Value(e.rast, ab.location) AS REAL) - - FROM devices AS d, receivers AS r, elevation AS e - WHERE ab.device_id IS NULL and ab.receiver_id IS NULL AND ab.address = d.address AND ab.receiver_name = r.name AND ST_Intersects(e.rast, ab.location); - """.format( - postfix - ) - ) + drop_tables(postfix) + db.session.execute(""" + CREATE TABLE aircraft_beacons_{postfix} AS TABLE aircraft_beacons WITH NO DATA; + CREATE TABLE receiver_beacons_{postfix} AS TABLE receiver_beacons WITH NO DATA; + """.format(postfix=postfix)) db.session.commit() def update_aircraft_beacons_bigdata(postfix): - """Updates the foreign keys and calculates distance/radial and quality and computes the altitude above ground level. - Elevation data has to be in the table 'elevation' with srid 4326. + """Calculates distance/radial and quality and computes the altitude above ground level. Due to performance reasons we use a new table instead of updating the old.""" - db.session.execute( - """ + db.session.execute(""" SELECT ab.location, ab.altitude, ab.name, ab.dstcall, ab.relay, ab.receiver_name, ab.timestamp, ab.track, ab.ground_speed, @@ -188,189 +73,38 @@ def update_aircraft_beacons_bigdata(postfix): ab.location_mgrs, ab.location_mgrs_short, - d.id AS device_id, - r.id AS receiver_id, CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(ST_DistanceSphere(ab.location, r.location) AS REAL) ELSE NULL END AS distance, - CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(degrees(ST_Azimuth(ab.location, r.location)) AS SMALLINT) ELSE NULL END AS radial, + CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(degrees(ST_Azimuth(ab.location, r.location)) AS SMALLINT) % 360 ELSE NULL END AS radial, CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL AND ST_DistanceSphere(ab.location, r.location) > 0 AND ab.signal_quality IS NOT NULL THEN CAST(signal_quality + 20*log(ST_DistanceSphere(ab.location, r.location)/10000) AS REAL) ELSE NULL END AS quality, - CAST(ab.altitude - ST_Value(e.rast, ab.location) AS REAL) AS agl + CAST((ab.altitude - subtable.elev_m) AS REAL) AS agl + INTO aircraft_beacons_{postfix}_temp + FROM + aircraft_beacons_{postfix} AS ab + JOIN LATERAL ( + SELECT ab.location, MAX(ST_NearestValue(e.rast, ab.location)) as elev_m + FROM elevation e + WHERE ST_Intersects(ab.location, e.rast) + GROUP BY ab.location + ) AS subtable ON TRUE, + (SELECT name, last(location, timestamp) AS location FROM receiver_beacons_{postfix} GROUP BY name) AS r + WHERE ab.receiver_name = r.name; - INTO "aircraft_beacons_{0}_temp" - FROM "aircraft_beacons_{0}" AS ab, devices AS d, receivers AS r, elevation AS e - WHERE ab.address = d.address AND receiver_name = r.name AND ST_Intersects(e.rast, ab.location); - - DROP TABLE IF EXISTS "aircraft_beacons_{0}"; - ALTER TABLE "aircraft_beacons_{0}_temp" RENAME TO "aircraft_beacons_{0}"; - """.format( - postfix - ) - ) - db.session.commit() + DROP TABLE IF EXISTS "aircraft_beacons_{postfix}"; + ALTER TABLE "aircraft_beacons_{postfix}_temp" RENAME TO "aircraft_beacons_{postfix}"; + """.format(postfix=postfix)) -def delete_receiver_beacons(postfix): - """Delete beacons from table.""" +def export_to_path(postfix, path): + connection = db.engine.raw_connection() + cursor = connection.cursor() - db.session.execute( - """ - DELETE FROM receiver_beacons_continuous_import AS rb - USING ( - SELECT name, receiver_name, timestamp - FROM receiver_beacons_continuous_import - WHERE receiver_id IS NOT NULL - ) AS sq - WHERE rb.name = sq.name AND rb.receiver_name = sq.receiver_name AND rb.timestamp = sq.timestamp - """.format( - postfix - ) - ) - db.session.commit() + aircraft_beacons_file = os.path.join(path, "aircraft_beacons_{postfix}.csv.gz".format(postfix=postfix)) + with gzip.open(aircraft_beacons_file, "wt", encoding="utf-8") as gzip_file: + cursor.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format("SELECT * FROM aircraft_beacons_{postfix}".format(postfix=postfix)), gzip_file) - -def delete_aircraft_beacons(postfix): - """Delete beacons from table.""" - - db.session.execute( - """ - DELETE FROM aircraft_beacons_continuous_import AS ab - USING ( - SELECT name, receiver_name, timestamp - FROM aircraft_beacons_continuous_import - WHERE receiver_id IS NOT NULL and device_id IS NOT NULL - ) AS sq - WHERE ab.name = sq.name AND ab.receiver_name = sq.receiver_name AND ab.timestamp = sq.timestamp - """.format( - postfix - ) - ) - db.session.commit() - - -def get_merged_aircraft_beacons_subquery(postfix): - """Some beacons are split into position and status beacon. With this query we merge them into one beacon.""" - - return """ - SELECT - ST_AsEWKT(MAX(location)) AS location, - MAX(altitude) AS altitude, - name, - MAX(dstcall) AS dstcall, - MAX(relay) AS relay, - receiver_name, - timestamp, - MAX(track) AS track, - MAX(ground_speed) AS ground_speed, - - MAX(address_type) AS address_type, - MAX(aircraft_type) AS aircraft_type, - CAST(MAX(CAST(stealth AS int)) AS boolean) AS stealth, - MAX(address) AS address, - MAX(climb_rate) AS climb_rate, - MAX(turn_rate) AS turn_rate, - MAX(signal_quality) AS signal_quality, - MAX(error_count) AS error_count, - MAX(frequency_offset) AS frequency_offset, - MAX(gps_quality_horizontal) AS gps_quality_horizontal, - MAX(gps_quality_vertical) AS gps_quality_vertical, - MAX(software_version) AS software_version, - MAX(hardware_version) AS hardware_version, - MAX(real_address) AS real_address, - MAX(signal_power) AS signal_power, - - CAST(MAX(distance) AS REAL) AS distance, - CAST(MAX(radial) AS REAL) AS radial, - CAST(MAX(quality) AS REAL) AS quality, - CAST(MAX(agl) AS REAL) AS agl, - MAX(location_mgrs) AS location_mgrs, - MAX(location_mgrs_short) AS location_mgrs_short, - - MAX(receiver_id) AS receiver_id, - MAX(device_id) AS device_id - FROM "aircraft_beacons_{0}" AS ab - GROUP BY timestamp, name, receiver_name - ORDER BY timestamp, name, receiver_name - """.format( - postfix - ) - - -def get_merged_receiver_beacons_subquery(postfix): - """Some beacons are split into position and status beacon. With this query we merge them into one beacon.""" - - return """ - SELECT - ST_AsEWKT(MAX(location)) AS location, - MAX(altitude) AS altitude, - name, - receiver_name, - MAX(dstcall) AS dstcall, - timestamp, - - MAX(version) AS version, - MAX(platform) AS platform, - MAX(cpu_load) AS cpu_load, - MAX(free_ram) AS free_ram, - MAX(total_ram) AS total_ram, - MAX(ntp_error) AS ntp_error, - MAX(rt_crystal_correction) AS rt_crystal_correction, - MAX(voltage) AS voltage, - MAX(amperage) AS amperage, - MAX(cpu_temp) AS cpu_temp, - MAX(senders_visible) AS senders_visible, - MAX(senders_total) AS senders_total, - MAX(rec_input_noise) AS rec_input_noise, - MAX(senders_signal) AS senders_signal, - MAX(senders_messages) AS senders_messages, - MAX(good_senders_signal) AS good_senders_signal, - MAX(good_senders) AS good_senders, - MAX(good_and_bad_senders) AS good_and_bad_senders, - - MAX(receiver_id) AS receiver_id - FROM "receiver_beacons_{0}" AS rb - GROUP BY timestamp, name, receiver_name - ORDER BY timestamp, name, receiver_name - """.format( - postfix - ) - - -def transfer_aircraft_beacons(postfix): - query = """ - INSERT INTO aircraft_beacons(location, altitude, name, dstcall, relay, receiver_name, timestamp, track, ground_speed, - address_type, aircraft_type, stealth, address, climb_rate, turn_rate, signal_quality, error_count, frequency_offset, gps_quality_horizontal, gps_quality_vertical, software_version, hardware_version, real_address, signal_power, - distance, radial, quality, agl, location_mgrs, location_mgrs_short, - receiver_id, device_id) - SELECT sq.* - FROM ({}) sq - WHERE sq.receiver_id IS NOT NULL AND sq.device_id IS NOT NULL - ON CONFLICT DO NOTHING; - """.format( - get_merged_aircraft_beacons_subquery(postfix) - ) - - db.session.execute(query) - db.session.commit() - - -def transfer_receiver_beacons(postfix): - query = """ - INSERT INTO receiver_beacons(location, altitude, name, receiver_name, dstcall, timestamp, - - version, platform, cpu_load, free_ram, total_ram, ntp_error, rt_crystal_correction, voltage, - amperage, cpu_temp, senders_visible, senders_total, rec_input_noise, senders_signal, - senders_messages, good_senders_signal, good_senders, good_and_bad_senders, - - receiver_id) - SELECT sq.* - FROM ({}) sq - WHERE sq.receiver_id IS NOT NULL - ON CONFLICT DO NOTHING; - """.format( - get_merged_receiver_beacons_subquery(postfix) - ) - - db.session.execute(query) - db.session.commit() + receiver_beacons_file = os.path.join(path, "receiver_beacons_{postfix}.csv.gz".format(postfix=postfix)) + with gzip.open(receiver_beacons_file, "wt") as gzip_file: + cursor.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format("SELECT * FROM receiver_beacons_{postfix}".format(postfix=postfix)), gzip_file)