From 0c99770a73abeda84d3936765078ffcb3a045007 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Mon, 4 Mar 2019 22:14:13 +0100 Subject: [PATCH] Fixed gateway --- ogn_python/collect/database.py | 77 ---- ogn_python/commands/__init__.py | 2 - ogn_python/commands/bulkimport.py | 578 ---------------------------- ogn_python/commands/database.py | 4 +- ogn_python/commands/gateway.py | 12 +- ogn_python/gateway/bulkimport.py | 283 ++++++++++++++ ogn_python/gateway/process_tools.py | 435 ++++++++++++++------- ogn_python/model/device.py | 1 + ogn_python/model/device_stats.py | 1 + 9 files changed, 582 insertions(+), 811 deletions(-) delete mode 100644 ogn_python/commands/bulkimport.py create mode 100644 ogn_python/gateway/bulkimport.py diff --git a/ogn_python/collect/database.py b/ogn_python/collect/database.py index 8de571b..d5de55b 100644 --- a/ogn_python/collect/database.py +++ b/ogn_python/collect/database.py @@ -69,83 +69,6 @@ def import_ddb(session=None): return "Imported {} devices.".format(counter) -@app.task -def add_missing_devices(session=None): - """Add/update entries in devices table and update foreign keys in aircraft beacons.""" - - if session is None: - session = app.session - - # Create missing Device from AircraftBeacon - available_devices = session.query(Device.address) \ - .subquery() - - missing_devices_query = session.query(distinct(AircraftBeacon.address)) \ - .filter(and_(AircraftBeacon.device_id == null(), not_(AircraftBeacon.address.like('00%')), AircraftBeacon.error_count == 0)) \ - .filter(~AircraftBeacon.address.in_(available_devices)) - - ins = insert(Device).from_select([Device.address], missing_devices_query) - res = session.execute(ins) - insert_count = res.rowcount - session.commit() - - # Update relations to aircraft beacons - upd = session.query(AircraftBeacon) \ - .filter(AircraftBeacon.device_id == null()) \ - .filter(AircraftBeacon.address == Device.address) \ - .update({ - AircraftBeacon.device_id: Device.id}, - synchronize_session='fetch') - - session.commit() - logger.info("Devices: {} inserted, {} updated".format(insert_count, add_missing_receivers)) - logger.info("Updated {} AircraftBeacons".format(upd)) - - return "{} Devices inserted, {} Devices updated, {} AircraftBeacons updated" \ - .format(insert_count, add_missing_receivers, upd) - - -@app.task -def add_missing_receivers(session=None): - """Add/add_missing_receivers entries in receiver table and update receivers foreign keys and distance in aircraft beacons and update foreign keys in receiver beacons.""" - - if session is None: - session = app.session - - # Create missing Receiver from ReceiverBeacon - available_receivers = session.query(Receiver.name) \ - .subquery() - - missing_receiver_query = session.query(distinct(ReceiverBeacon.name)) \ - .filter(ReceiverBeacon.receiver_id == null()) \ - .filter(~ReceiverBeacon.name.in_(available_receivers)) - - ins = insert(Receiver).from_select([Receiver.name], missing_receiver_query) - res = session.execute(ins) - insert_count = res.rowcount - - # Update relations to aircraft beacons - update_aircraft_beacons = session.query(AircraftBeacon) \ - .filter(and_(AircraftBeacon.receiver_id == null(), AircraftBeacon.receiver_name == Receiver.name)) \ - .update({AircraftBeacon.receiver_id: Receiver.id, - AircraftBeacon.distance: func.ST_Distance_Sphere(AircraftBeacon.location_wkt, Receiver.location_wkt)}, - synchronize_session='fetch') - - # Update relations to receiver beacons - update_receiver_beacons = session.query(ReceiverBeacon) \ - .filter(and_(ReceiverBeacon.receiver_id == null(), ReceiverBeacon.name == Receiver.name)) \ - .update({ReceiverBeacon.receiver_id: Receiver.id}, - synchronize_session='fetch') - - session.commit() - - logger.info("Receivers: {} inserted, {} updated.".format(insert_count, add_missing_receivers)) - logger.info("Updated relations: {} aircraft beacons, {} receiver beacons".format(update_aircraft_beacons, update_receiver_beacons)) - - return "{} Receivers inserted, {} Receivers updated, {} AircraftBeacons updated, {} ReceiverBeacons updated" \ - .format(insert_count, add_missing_receivers, update_aircraft_beacons, update_receiver_beacons) - - @app.task def update_country_code(session=None): """Update country code in receivers table if None.""" diff --git a/ogn_python/commands/__init__.py b/ogn_python/commands/__init__.py index d126d6e..469306a 100644 --- a/ogn_python/commands/__init__.py +++ b/ogn_python/commands/__init__.py @@ -1,6 +1,5 @@ from ogn_python import app -from .bulkimport import user_cli as bulkimport_cli from .database import user_cli as database_cli from .export import user_cli as export_cli from .flights import user_cli as flights_cli @@ -8,7 +7,6 @@ from .gateway import user_cli as gateway_cli from .logbook import user_cli as logbook_cli from .stats import user_cli as stats_cli -app.cli.add_command(bulkimport_cli) app.cli.add_command(database_cli) app.cli.add_command(export_cli) app.cli.add_command(flights_cli) diff --git a/ogn_python/commands/bulkimport.py b/ogn_python/commands/bulkimport.py deleted file mode 100644 index e83beae..0000000 --- a/ogn_python/commands/bulkimport.py +++ /dev/null @@ -1,578 +0,0 @@ -from flask.cli import AppGroup -import click - -import psycopg2 -from tqdm import tqdm -from io import StringIO - -from ogn_python.model import AircraftBeacon, ReceiverBeacon -from ogn_python.utils import open_file -from ogn_python.commands.database import get_database_days - -from ogn_python import db - -user_cli = AppGroup('bulkimport') -user_cli.help = "Tools for accelerated data import." - - -ALEMBIC_CONFIG_FILE = "alembic.ini" - - -class LogfileDbSaver(): - def __init__(self): - """Establish the database connection.""" - try: - self.conn = psycopg2.connect(database="ogn", user="postgres", password="postgres", host="localhost", port="5432") - except Exception as e: - raise Exception("I am unable to connect to the database") - self.cur = self.conn.cursor() - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - """Closes the database connection.""" - - self.cur.close() - self.conn.close() - - def set_datestr(self, datestr): - """Sets the datestr of the current tables.""" - - self.prefix = datestr.replace('-', '_') - self.aircraft_table = 'aircraft_beacons_{}'.format(self.prefix) - self.receiver_table = 'receiver_beacons_{}'.format(self.prefix) - self.aircraft_buffer = StringIO() - self.receiver_buffer = StringIO() - - def get_datestrs(self, no_index_only=False): - """Get the date strings from imported log files.""" - - index_clause = " AND hasindexes = FALSE" if no_index_only else "" - - self.cur.execute((""" - SELECT DISTINCT(RIGHT(tablename, 10)) - FROM pg_catalog.pg_tables - WHERE schemaname = 'public' AND tablename LIKE 'aircraft_beacons_%'{} - ORDER BY RIGHT(tablename, 10); - """.format(index_clause))) - - return [datestr[0].replace('_', '-') for datestr in self.cur.fetchall()] - - def create_tables(self): - """Create date dependent tables for log file import.""" - - try: - self.cur.execute('CREATE EXTENSION IF NOT EXISTS postgis;') - self.cur.execute('CREATE EXTENSION IF NOT EXISTS btree_gist;') - self.cur.execute('DROP TABLE IF EXISTS "{0}"; CREATE TABLE {0} AS TABLE aircraft_beacons WITH NO DATA;'.format(self.aircraft_table)) - self.cur.execute('DROP TABLE IF EXISTS "{0}"; CREATE TABLE {0} AS TABLE receiver_beacons WITH NO DATA;'.format(self.receiver_table)) - self.conn.commit() - except Exception as e: - raise Exception("I can't create the tables") - - def add(self, beacon): - """Adds the values of the beacon to the buffer.""" - - value_string = ','.join([str(value) for value in beacon.get_values()]) + '\n' - if isinstance(beacon, AircraftBeacon): - self.aircraft_buffer.write(value_string) - elif isinstance(beacon, ReceiverBeacon): - self.receiver_buffer.write(value_string) - - def flush(self): - """Writes the buffer into the tables and reset the buffer.""" - - self.aircraft_buffer.seek(0) - self.receiver_buffer.seek(0) - self.cur.copy_from(self.aircraft_buffer, self.aircraft_table, sep=',', null='None', columns=AircraftBeacon.get_columns()) - self.cur.copy_from(self.receiver_buffer, self.receiver_table, sep=',', null='None', columns=ReceiverBeacon.get_columns()) - self.conn.commit() - self.aircraft_buffer = StringIO() - self.receiver_buffer = StringIO() - - def export_to_path(self, path): - import os, gzip - aircraft_beacons_file = os.path.join(path, self.aircraft_table + '.csv.gz') - with gzip.open(aircraft_beacons_file, 'wt', encoding='utf-8') as gzip_file: - self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_aircraft_beacons_subquery()), gzip_file) - receiver_beacons_file = os.path.join(path, self.receiver_table + '.csv.gz') - with gzip.open(receiver_beacons_file, 'wt') as gzip_file: - self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_receiver_beacons_subquery()), gzip_file) - - def create_indices(self): - """Creates indices for aircraft- and receiver-beacons.""" - - self.cur.execute(""" - CREATE INDEX IF NOT EXISTS ix_{0}_timestamp_name_receiver_name ON "{0}" (timestamp, name, receiver_name); - CREATE INDEX IF NOT EXISTS ix_{0}_device_id_timestamp_error_count ON "{0}" (device_id, timestamp, error_count); - CREATE INDEX IF NOT EXISTS ix_{1}_timestamp_name_receiver_name ON "{1}" (timestamp, name, receiver_name); - """.format(self.aircraft_table, self.receiver_table)) - self.conn.commit() - - def add_missing_devices(self): - """Add missing devices.""" - - self.cur.execute(""" - INSERT INTO devices(address) - SELECT DISTINCT(ab.address) - FROM "{}" AS ab - WHERE NOT EXISTS (SELECT 1 FROM devices AS d WHERE d.address = ab.address) - ORDER BY ab.address; - """.format(self.aircraft_table)) - self.conn.commit() - - def add_missing_receivers(self): - """Add missing receivers.""" - - self.cur.execute(""" - INSERT INTO receivers(name) - SELECT DISTINCT(rb.name) - FROM "{0}" AS rb - WHERE NOT EXISTS (SELECT 1 FROM receivers AS r WHERE r.name = rb.name) - ORDER BY name; - """.format(self.receiver_table)) - self.conn.commit() - - def update_receiver_location(self): - """Updates the receiver location. We need this because we want the actual location for distance calculations.""" - - self.cur.execute(""" - UPDATE receivers AS r - SET location = sq.location, - altitude = sq.altitude - FROM - (SELECT DISTINCT ON (rb.receiver_id) rb.receiver_id, rb.location, rb.altitude - FROM "{1}" AS rb - WHERE rb.location IS NOT NULL - ORDER BY rb.receiver_id, rb.timestamp - ) AS sq - WHERE r.id = sq.receiver_id; - """.format(self.aircraft_table, self.receiver_table)) - self.conn.commit() - - def update_receiver_beacons(self): - """Updates the foreign keys. Due to performance reasons we use a new table instead of updating the old.""" - - self.cur.execute(""" - SELECT - - rb.location, rb.altitude, rb.name, rb.receiver_name, rb.dstcall, rb.timestamp, - - rb.version, rb.platform, rb.cpu_load, rb.free_ram, rb.total_ram, rb.ntp_error, rb.rt_crystal_correction, rb.voltage, rb.amperage, - rb.cpu_temp, rb.senders_visible, rb.senders_total, rb.rec_input_noise, rb.senders_signal, rb.senders_messages, rb.good_senders_signal, - rb.good_senders, rb.good_and_bad_senders, - - r.id AS receiver_id - INTO "{0}_temp" - FROM "{0}" AS rb, receivers AS r - WHERE rb.name = r.name; - - DROP TABLE IF EXISTS "{0}"; - ALTER TABLE "{0}_temp" RENAME TO "{0}"; - """.format(self.receiver_table)) - self.conn.commit() - - def update_aircraft_beacons(self): - """Updates the foreign keys and calculates distance/radial and quality and computes the altitude above ground level. - Elevation data has to be in the table 'elevation' with srid 4326. - Due to performance reasons we use a new table instead of updating the old.""" - - self.cur.execute(""" - SELECT - ab.location, ab.altitude, ab.name, ab.dstcall, ab.relay, ab.receiver_name, ab.timestamp, ab.track, ab.ground_speed, - - ab.address_type, ab.aircraft_type, ab.stealth, ab.address, ab.climb_rate, ab.turn_rate, ab.signal_quality, ab.error_count, - ab.frequency_offset, ab.gps_quality_horizontal, ab.gps_quality_vertical, ab.software_version, ab.hardware_version, ab.real_address, ab.signal_power, - - ab.location_mgrs, - ab.location_mgrs_short, - - d.id AS device_id, - r.id AS receiver_id, - CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(ST_DistanceSphere(ab.location, r.location) AS REAL) ELSE NULL END AS distance, - CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(degrees(ST_Azimuth(ab.location, r.location)) AS SMALLINT) ELSE NULL END AS radial, - CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL AND ST_DistanceSphere(ab.location, r.location) > 0 AND ab.signal_quality IS NOT NULL - THEN CAST(signal_quality + 20*log(ST_DistanceSphere(ab.location, r.location)/10000) AS REAL) - ELSE NULL - END AS quality, - CAST(ab.altitude - ST_Value(e.rast, ab.location) AS REAL) AS agl - - INTO "{0}_temp" - FROM "{0}" AS ab, devices AS d, receivers AS r, elevation AS e - WHERE ab.address = d.address AND receiver_name = r.name AND ST_Intersects(e.rast, ab.location); - - DROP TABLE IF EXISTS "{0}"; - ALTER TABLE "{0}_temp" RENAME TO "{0}"; - """.format(self.aircraft_table)) - self.conn.commit() - - def get_merged_aircraft_beacons_subquery(self): - """Some beacons are split into position and status beacon. With this query we merge them into one beacon.""" - - return """ - SELECT - ST_AsEWKT(MAX(location)) AS location, - MAX(altitude) AS altitude, - name, - MAX(dstcall) AS dstcall, - MAX(relay) AS relay, - receiver_name, - timestamp, - MAX(track) AS track, - MAX(ground_speed) AS ground_speed, - - MAX(address_type) AS address_type, - MAX(aircraft_type) AS aircraft_type, - CAST(MAX(CAST(stealth AS int)) AS boolean) AS stealth, - MAX(address) AS address, - MAX(climb_rate) AS climb_rate, - MAX(turn_rate) AS turn_rate, - MAX(signal_quality) AS signal_quality, - MAX(error_count) AS error_count, - MAX(frequency_offset) AS frequency_offset, - MAX(gps_quality_horizontal) AS gps_quality_horizontal, - MAX(gps_quality_vertical) AS gps_quality_vertical, - MAX(software_version) AS software_version, - MAX(hardware_version) AS hardware_version, - MAX(real_address) AS real_address, - MAX(signal_power) AS signal_power, - - CAST(MAX(distance) AS REAL) AS distance, - CAST(MAX(radial) AS REAL) AS radial, - CAST(MAX(quality) AS REAL) AS quality, - CAST(MAX(agl) AS REAL) AS agl, - MAX(location_mgrs) AS location_mgrs, - MAX(location_mgrs_short) AS location_mgrs_short, - - MAX(receiver_id) AS receiver_id, - MAX(device_id) AS device_id - FROM "{0}" AS ab - GROUP BY timestamp, name, receiver_name - ORDER BY timestamp, name, receiver_name - """.format(self.aircraft_table) - - def get_merged_receiver_beacons_subquery(self): - """Some beacons are split into position and status beacon. With this query we merge them into one beacon.""" - - return """ - SELECT - ST_AsEWKT(MAX(location)) AS location, - MAX(altitude) AS altitude, - name, - receiver_name, - MAX(dstcall) AS dstcall, - timestamp, - - MAX(version) AS version, - MAX(platform) AS platform, - MAX(cpu_load) AS cpu_load, - MAX(free_ram) AS free_ram, - MAX(total_ram) AS total_ram, - MAX(ntp_error) AS ntp_error, - MAX(rt_crystal_correction) AS rt_crystal_correction, - MAX(voltage) AS voltage, - MAX(amperage) AS amperage, - MAX(cpu_temp) AS cpu_temp, - MAX(senders_visible) AS senders_visible, - MAX(senders_total) AS senders_total, - MAX(rec_input_noise) AS rec_input_noise, - MAX(senders_signal) AS senders_signal, - MAX(senders_messages) AS senders_messages, - MAX(good_senders_signal) AS good_senders_signal, - MAX(good_senders) AS good_senders, - MAX(good_and_bad_senders) AS good_and_bad_senders, - - MAX(receiver_id) AS receiver_id - FROM "{0}" AS rb - GROUP BY timestamp, name, receiver_name - ORDER BY timestamp, name, receiver_name - """.format(self.receiver_table) - - def transfer_aircraft_beacons(self): - query = """ - INSERT INTO aircraft_beacons(location, altitude, name, dstcall, relay, receiver_name, timestamp, track, ground_speed, - address_type, aircraft_type, stealth, address, climb_rate, turn_rate, signal_quality, error_count, frequency_offset, gps_quality_horizontal, gps_quality_vertical, software_version, hardware_version, real_address, signal_power, - distance, radial, quality, agl, location_mgrs, location_mgrs_short, - receiver_id, device_id) - {} - ON CONFLICT DO NOTHING; - """.format(self.get_merged_aircraft_beacons_subquery()) - - self.cur.execute(query) - self.conn.commit() - - def transfer_receiver_beacons(self): - query = """ - INSERT INTO receiver_beacons(location, altitude, name, receiver_name, dstcall, timestamp, - - version, platform, cpu_load, free_ram, total_ram, ntp_error, rt_crystal_correction, voltage, - amperage, cpu_temp, senders_visible, senders_total, rec_input_noise, senders_signal, - senders_messages, good_senders_signal, good_senders, good_and_bad_senders, - - receiver_id) - {} - ON CONFLICT DO NOTHING; - """.format(self.get_merged_receiver_beacons_subquery()) - - self.cur.execute(query) - self.conn.commit() - - def create_flights2d(self): - query = """ - INSERT INTO flights2d - ( - date, - device_id, - path - ) - SELECT sq5.date, - sq5.device_id, - st_collect(sq5.linestring order BY sq5.part) multilinestring - FROM ( - SELECT sq4.timestamp::date AS date, - sq4.device_id, - sq4.part, - st_makeline(sq4.location ORDER BY sq4.timestamp) linestring - FROM ( - SELECT sq3.timestamp, - sq3.location, - sq3.device_id, - sum(sq3.ping) OVER (partition BY sq3.timestamp::date, sq3.device_id ORDER BY sq3.timestamp) part - FROM ( - SELECT sq2.t1 AS timestamp, - sq2.l1 AS location, - sq2.d1 device_id, - CASE - WHEN sq2.t1 - sq2.t2 < interval'100s' - AND st_distancesphere(sq2.l1, sq2.l2) < 1000 THEN 0 - ELSE 1 - END AS ping - FROM ( - SELECT sq.timestamp t1, - lag(sq.timestamp) OVER (partition BY sq.device_id ORDER BY sq.timestamp) t2, - sq.location l1, - lag(sq.location) OVER (partition BY sq.device_id ORDER BY sq.timestamp) l2, - sq.device_id d1, - lag(sq.device_id) OVER (partition BY sq.device_id ORDER BY sq.timestamp) d2 - FROM ( - SELECT DISTINCT ON (device_id, timestamp) timestamp, device_id, location - FROM {} - WHERE device_id IS NOT NULL AND ground_speed > 250 AND agl < 100 - ORDER BY device_id, timestamp, error_count) sq) sq2 ) sq3 ) sq4 - GROUP BY sq4.timestamp::date, - sq4.device_id, - sq4.part ) sq5 - GROUP BY sq5.date, - sq5.device_id - ON CONFLICT DO NOTHING; - """.format(self.aircraft_table) - - self.cur.execute(query) - self.conn.commit() - - def create_gaps2d(self): - query = """ - INSERT INTO gaps2d(date, device_id, path) - SELECT sq3.date, - sq3.device_id, - ST_Collect(sq3.path) - FROM ( - SELECT - sq2.t1::DATE AS date, - sq2.d1 device_id, - ST_MakeLine(sq2.l1, sq2.l2) path - FROM - ( - SELECT sq.timestamp t1, - LAG(sq.timestamp) OVER ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) t2, - sq.location l1, - LAG(sq.location) OVER ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) l2, - sq.device_id d1, - LAG(sq.device_id) OVER ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) d2, - sq.agl a1, - LAG(sq.agl) over ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) a2 - FROM - ( - SELECT DISTINCT ON (device_id, timestamp) timestamp, device_id, location, agl - FROM {} - ORDER BY device_id, timestamp, error_count - ) sq - ) sq2 - WHERE EXTRACT(epoch FROM sq2.t1 - sq2.t2) > 300 - AND ST_DistanceSphere(sq2.l1, sq2.l2) / EXTRACT(epoch FROM sq2.t1 - sq2.t2) BETWEEN 15 AND 50 - AND sq2.a1 > 300 AND sq2.a2 > 300 - ) sq3 - GROUP BY sq3.date, sq3.device_id - ON CONFLICT DO NOTHING; - """.format(self.aircraft_table) - - self.cur.execute(query) - self.conn.commit() - - -def convert(sourcefile, datestr, saver): - from ogn_python.gateway.process import string_to_message - from ogn_python.gateway.process_tools import AIRCRAFT_BEACON_TYPES, RECEIVER_BEACON_TYPES - from datetime import datetime - - fin = open_file(sourcefile) - - # get total lines of the input file - total_lines = 0 - for line in fin: - total_lines += 1 - fin.seek(0) - - current_line = 0 - steps = 100000 - reference_date = datetime.strptime(datestr + ' 12:00:00', '%Y-%m-%d %H:%M:%S') - - pbar = tqdm(fin, total=total_lines) - for line in pbar: - pbar.set_description('Importing {}'.format(sourcefile)) - - current_line += 1 - if current_line % steps == 0: - saver.flush() - - message = string_to_message(line.strip(), reference_date=reference_date) - if message is None: - continue - - dictfilt = lambda x, y: dict([(i, x[i]) for i in x if i in set(y)]) - - try: - if message['beacon_type'] in AIRCRAFT_BEACON_TYPES: - message = dictfilt(message, ('beacon_type', 'aprs_type', 'location_wkt', 'altitude', 'name', 'dstcall', 'relay', 'receiver_name', 'timestamp', 'track', 'ground_speed', - 'address_type', 'aircraft_type', 'stealth', 'address', 'climb_rate', 'turn_rate', 'signal_quality', 'error_count', 'frequency_offset', 'gps_quality_horizontal', 'gps_quality_vertical', 'software_version', 'hardware_version', 'real_address', 'signal_power', - 'distance', 'radial', 'quality', 'agl', 'location_mgrs', 'location_mgrs_short', - 'receiver_id', 'device_id')) - - beacon = AircraftBeacon(**message) - elif message['beacon_type'] in RECEIVER_BEACON_TYPES: - if 'rec_crystal_correction' in message: - del message['rec_crystal_correction'] - del message['rec_crystal_correction_fine'] - beacon = ReceiverBeacon(**message) - saver.add(beacon) - except Exception as e: - print(e) - - saver.flush() - fin.close() - - -@user_cli.command('file_import') -@click.argument('path') -def file_import(path): - """Import APRS logfiles into separate logfile tables.""" - - import os - import re - - # Get Filepaths and dates to import - results = list() - for (root, dirs, files) in os.walk(path): - for file in sorted(files): - match = re.match('OGN_log\.txt_([0-9]{4}\-[0-9]{2}\-[0-9]{2})\.gz$', file) - if match: - results.append({'filepath': os.path.join(root, file), - 'datestr': match.group(1)}) - - with LogfileDbSaver() as saver: - already_imported = saver.get_datestrs() - - results = list(filter(lambda x: x['datestr'] not in already_imported, results)) - - pbar = tqdm(results) - for result in pbar: - filepath = result['filepath'] - datestr = result['datestr'] - pbar.set_description("Importing data for {}".format(datestr)) - - saver.set_datestr(datestr) - saver.create_tables() - convert(filepath, datestr, saver) - saver.add_missing_devices() - saver.add_missing_receivers() - - -@user_cli.command('update') -def update(): - """Update beacons (add foreign keys, compute distance, bearing, ags, etc.) in separate logfile tables.""" - - with LogfileDbSaver() as saver: - datestrs = saver.get_datestrs(no_index_only=True) - datestrs = saver.get_datestrs() - pbar = tqdm(datestrs) - for datestr in pbar: - pbar.set_description("Updating relations for {}".format(datestr)) - saver.set_datestr(datestr) - saver.update_receiver_location() - saver.update_aircraft_beacons() - saver.update_receiver_beacons() - saver.create_indices() - - -@user_cli.command('transfer') -@click.argument('start') -@click.argument('end') -def transfer(start, end): - """Transfer beacons from separate logfile tables to beacon table.""" - - with LogfileDbSaver() as saver: - dates = get_database_days(start, end) - datestrs = [date.strftime('%Y_%m_%d') for date in dates] - pbar = tqdm(datestrs) - for datestr in pbar: - pbar.set_description("Transfer beacons for {}".format(datestr)) - saver.set_datestr(datestr) - saver.transfer_aircraft_beacons() - saver.transfer_receiver_beacons() - - -@user_cli.command('create_flights2d') -def create_flights2d(): - """Create complete flight traces from logfile tables.""" - - with LogfileDbSaver() as saver: - datestrs = saver.get_datestrs() - pbar = tqdm(datestrs) - for datestr in pbar: - pbar.set_description("Create Flights2D for {}".format(datestr)) - saver.set_datestr(datestr) - saver.create_flights2d() - - -@user_cli.command('create_gaps2d') -def create_gaps2d(): - """Create 'gaps' from logfile tables.""" - - with LogfileDbSaver() as saver: - datestrs = saver.get_datestrs() - pbar = tqdm(datestrs) - for datestr in pbar: - pbar.set_description("Create Gaps2D for {}".format(datestr)) - saver.set_datestr(datestr) - saver.create_gaps2d() - - -@user_cli.command('file_export') -@click.argument('path') -def file_export(path): - """Export separate logfile tables to csv files. They can be used for fast bulk import with sql COPY command.""" - - import os - if not os.path.isdir(path): - print("'{}' is not a path. Exiting") - return - - with LogfileDbSaver() as saver: - datestrs = saver.get_datestrs() - pbar = tqdm(datestrs) - for datestr in pbar: - pbar.set_description("Exporting data for {}".format(datestr)) - saver.set_datestr(datestr) - saver.export_to_path(path) - diff --git a/ogn_python/commands/database.py b/ogn_python/commands/database.py index 8fa3340..0254711 100644 --- a/ogn_python/commands/database.py +++ b/ogn_python/commands/database.py @@ -51,7 +51,7 @@ def init(): db.session.execute('CREATE EXTENSION IF NOT EXISTS postgis;') db.session.execute('CREATE EXTENSION IF NOT EXISTS btree_gist;') db.session.commit() - Base.metadata.create_all(engine) + db.create_all() #alembic_cfg = Config(ALEMBIC_CONFIG_FILE) #command.stamp(alembic_cfg, "head") @@ -84,7 +84,7 @@ def upgrade(): def drop(sure): """Drop all tables.""" if sure == 'y': - Base.metadata.drop_all(engine) + db.drop_all() print('Dropped all tables.') else: print("Add argument '--sure y' to drop all tables.") diff --git a/ogn_python/commands/gateway.py b/ogn_python/commands/gateway.py index e553c53..2d1039c 100644 --- a/ogn_python/commands/gateway.py +++ b/ogn_python/commands/gateway.py @@ -2,9 +2,9 @@ from flask.cli import AppGroup import click from ogn.client import AprsClient -from ogn_python.gateway.process_tools import DbSaver +from ogn_python.gateway.bulkimport import ContinuousDbFeeder -from ogn_python import db +from ogn_python import app user_cli = AppGroup('gateway') user_cli.help = "Connection to APRS servers." @@ -14,21 +14,21 @@ user_cli.help = "Connection to APRS servers." def run(aprs_user='anon-dev'): """Run the aprs client.""" - saver = DbSaver(session=db.session) + saver = ContinuousDbFeeder() # User input validation if len(aprs_user) < 3 or len(aprs_user) > 9: print('aprs_user must be a string of 3-9 characters.') return - print('Start ogn gateway') + app.logger.warning('Start ogn gateway') client = AprsClient(aprs_user) client.connect() try: - client.run(callback=saver.add_raw_message, autoreconnect=True) + client.run(callback=saver.add, autoreconnect=True) except KeyboardInterrupt: - print('\nStop ogn gateway') + app.logger.warning('\nStop ogn gateway') saver.flush() client.disconnect() diff --git a/ogn_python/gateway/bulkimport.py b/ogn_python/gateway/bulkimport.py new file mode 100644 index 0000000..1d727fe --- /dev/null +++ b/ogn_python/gateway/bulkimport.py @@ -0,0 +1,283 @@ +from datetime import datetime, timedelta +from io import StringIO + +from flask.cli import AppGroup +import click +from tqdm import tqdm +from mgrs import MGRS + +from ogn.parser import parse, ParseError + +from ogn_python.model import AircraftBeacon, ReceiverBeacon, Location +from ogn_python.utils import open_file +from ogn_python.gateway.process_tools import * + +from ogn_python import db +from ogn_python import app + +user_cli = AppGroup('bulkimport') +user_cli.help = "Tools for accelerated data import." + + +# define message types we want to proceed +AIRCRAFT_BEACON_TYPES = ['aprs_aircraft', 'flarm', 'tracker', 'fanet', 'lt24', 'naviter', 'skylines', 'spider', 'spot'] +RECEIVER_BEACON_TYPES = ['aprs_receiver', 'receiver'] + +# define fields we want to proceed +BEACON_KEY_FIELDS = ['name', 'receiver_name', 'timestamp'] +AIRCRAFT_BEACON_FIELDS = ['location', 'altitude', 'dstcall', 'relay', 'track', 'ground_speed', 'address_type', 'aircraft_type', 'stealth', 'address', 'climb_rate', 'turn_rate', 'signal_quality', 'error_count', 'frequency_offset', 'gps_quality_horizontal', 'gps_quality_vertical', 'software_version', 'hardware_version', 'real_address', 'signal_power', 'distance', 'radial', 'quality', 'location_mgrs', 'location_mgrs_short', 'agl', 'receiver_id', 'device_id'] +RECEIVER_BEACON_FIELDS = ['location', 'altitude', 'dstcall', 'relay', 'version', 'platform', 'cpu_load', 'free_ram', 'total_ram', 'ntp_error', 'rt_crystal_correction', 'voltage', 'amperage', 'cpu_temp', 'senders_visible', 'senders_total', 'rec_input_noise', 'senders_signal', 'senders_messages', 'good_senders_signal', 'good_senders', 'good_and_bad_senders'] + + +myMGRS = MGRS() + + +def string_to_message(raw_string, reference_date): + global receivers + + try: + message = parse(raw_string, reference_date) + except NotImplementedError as e: + app.logger.error('No parser implemented for message: {}'.format(raw_string)) + return None + except ParseError as e: + app.logger.error('Parsing error with message: {}'.format(raw_string)) + return None + except TypeError as e: + app.logger.error('TypeError with message: {}'.format(raw_string)) + return None + except Exception as e: + app.logger.error('Other Exception with string: {}'.format(raw_string)) + return None + + # update reference receivers and distance to the receiver + if message['aprs_type'] == 'position': + if message['beacon_type'] in AIRCRAFT_BEACON_TYPES + RECEIVER_BEACON_TYPES: + latitude = message['latitude'] + longitude = message['longitude'] + + location = Location(longitude, latitude) + message['location'] = location.to_wkt() + location_mgrs = myMGRS.toMGRS(latitude, longitude).decode('utf-8') + message['location_mgrs'] = location_mgrs + message['location_mgrs_short'] = location_mgrs[0:5] + location_mgrs[5:7] + location_mgrs[10:12] + + if message['beacon_type'] in AIRCRAFT_BEACON_TYPES and 'gps_quality' in message: + if message['gps_quality'] is not None and 'horizontal' in message['gps_quality']: + message['gps_quality_horizontal'] = message['gps_quality']['horizontal'] + message['gps_quality_vertical'] = message['gps_quality']['vertical'] + del message['gps_quality'] + + # TODO: Fix python-ogn-client 0.91 + if 'senders_messages' in message and message['senders_messages'] is not None: + message['senders_messages'] = int(message['senders_messages']) + if 'good_senders' in message and message['good_senders'] is not None: + message['good_senders'] = int(message['good_senders']) + if 'good_and_bad_senders' in message and message['good_and_bad_senders'] is not None: + message['good_and_bad_senders'] = int(message['good_and_bad_senders']) + + return message + + +class ContinuousDbFeeder: + def __init__(self,): + self.postfix = 'continuous_import' + self.last_flush = datetime.utcnow() + + self.aircraft_buffer = StringIO() + self.receiver_buffer = StringIO() + + create_tables(self.postfix) + create_indices(self.postfix) + + def add(self, raw_string): + message = string_to_message(raw_string, reference_date=datetime.utcnow()) + + if message is None or ('raw_message' in message and message['raw_message'][0] == '#') or 'beacon_type' not in message: + return + + if message['beacon_type'] in AIRCRAFT_BEACON_TYPES: + complete_message = ','.join([str(message[k]) if k in message and message[k] is not None else '\\N' for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS]) + self.aircraft_buffer.write(complete_message) + self.aircraft_buffer.write('\n') + elif message['beacon_type'] in RECEIVER_BEACON_TYPES: + complete_message = ','.join([str(message[k]) if k in message and message[k] is not None else '\\N' for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS]) + self.receiver_buffer.write(complete_message) + self.receiver_buffer.write('\n') + else: + app.logger.error("Ignore beacon_type: {}".format(message['beacon_type'])) + return + + if datetime.utcnow() - self.last_flush >= timedelta(seconds=1): + self.flush() + self.prepare() + self.transfer() + self.delete_beacons() + + self.aircraft_buffer = StringIO() + self.receiver_buffer = StringIO() + + self.last_flush = datetime.utcnow() + + def flush(self): + self.aircraft_buffer.seek(0) + self.receiver_buffer.seek(0) + + connection = db.engine.raw_connection() + cursor = connection.cursor() + cursor.copy_from(self.aircraft_buffer, 'aircraft_beacons_{0}'.format(self.postfix), sep=',', columns=BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS) + cursor.copy_from(self.receiver_buffer, 'receiver_beacons_{0}'.format(self.postfix), sep=',', columns=BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS) + connection.commit() + + self.aircraft_buffer = StringIO() + self.receiver_buffer = StringIO() + + def prepare(self): + # make receivers complete + add_missing_receivers(self.postfix) + update_receiver_beacons(self.postfix) + update_receiver_location(self.postfix) + + # make devices complete + add_missing_devices(self.postfix) + + # prepare beacons for transfer + update_aircraft_beacons(self.postfix) + + def transfer(self): + # tranfer beacons + transfer_aircraft_beacons(self.postfix) + transfer_receiver_beacons(self.postfix) + + def delete_beacons(self): + # delete already transfered beacons + delete_receiver_beacons(self.postfix) + delete_aircraft_beacons(self.postfix) + + +def prepare_bigdata(postfix): + # make receivers complete + add_missing_receivers(postfix) + update_receiver_location(postfix) + + # make devices complete + add_missing_devices(postfix) + + # prepare beacons for transfer + create_indices(postfix) + update_receiver_beacons_bigdata(postfix) + update_aircraft_beacons_bigdata(postfix) + + +def get_aircraft_beacons_postfixes(): + """Get the postfixes from imported aircraft_beacons logs.""" + + postfixes = db.session.execute(""" + SELECT DISTINCT(RIGHT(tablename, 8)) + FROM pg_catalog.pg_tables + WHERE schemaname = 'public' AND tablename LIKE 'aircraft\_beacons\_20______' + ORDER BY RIGHT(tablename, 10); + """).fetchall() + + return [postfix for postfix in postfixes] + + + + + +def export_to_path(postfix): + import os, gzip + aircraft_beacons_file = os.path.join(path, 'aircraft_beacons_{0}.csv.gz'.format(postfix)) + with gzip.open(aircraft_beacons_file, 'wt', encoding='utf-8') as gzip_file: + self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_aircraft_beacons_subquery()), gzip_file) + receiver_beacons_file = os.path.join(path, 'receiver_beacons_{0}.csv.gz'.format(postfix)) + with gzip.open(receiver_beacons_file, 'wt') as gzip_file: + self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_receiver_beacons_subquery()), gzip_file) + + + +def convert(sourcefile, datestr, saver): + from ogn_python.gateway.process import string_to_message + from ogn_python.gateway.process_tools import AIRCRAFT_BEACON_TYPES, RECEIVER_BEACON_TYPES + from datetime import datetime + + fin = open_file(sourcefile) + + # get total lines of the input file + total_lines = 0 + for line in fin: + total_lines += 1 + fin.seek(0) + + current_line = 0 + steps = 100000 + reference_date = datetime.strptime(datestr + ' 12:00:00', '%Y-%m-%d %H:%M:%S') + + pbar = tqdm(fin, total=total_lines) + for line in pbar: + pbar.set_description('Importing {}'.format(sourcefile)) + + current_line += 1 + if current_line % steps == 0: + saver.flush() + + message = string_to_message(line.strip(), reference_date=reference_date) + if message is None: + continue + + dictfilt = lambda x, y: dict([(i, x[i]) for i in x if i in set(y)]) + + try: + if message['beacon_type'] in AIRCRAFT_BEACON_TYPES: + message = dictfilt(message, ('beacon_type', 'aprs_type', 'location_wkt', 'altitude', 'name', 'dstcall', 'relay', 'receiver_name', 'timestamp', 'track', 'ground_speed', + 'address_type', 'aircraft_type', 'stealth', 'address', 'climb_rate', 'turn_rate', 'signal_quality', 'error_count', 'frequency_offset', 'gps_quality_horizontal', 'gps_quality_vertical', 'software_version', 'hardware_version', 'real_address', 'signal_power', + 'distance', 'radial', 'quality', 'agl', 'location_mgrs', 'location_mgrs_short', + 'receiver_id', 'device_id')) + + beacon = AircraftBeacon(**message) + elif message['beacon_type'] in RECEIVER_BEACON_TYPES: + if 'rec_crystal_correction' in message: + del message['rec_crystal_correction'] + del message['rec_crystal_correction_fine'] + beacon = ReceiverBeacon(**message) + saver.add(beacon) + except Exception as e: + print(e) + + saver.flush() + fin.close() + + +@user_cli.command('file_import') +@click.argument('path') +def file_import(path): + """Import APRS logfiles into separate logfile tables.""" + + import os + import re + + # Get Filepaths and dates to import + results = list() + for (root, dirs, files) in os.walk(path): + for file in sorted(files): + match = re.match('OGN_log\.txt_([0-9]{4}\-[0-9]{2}\-[0-9]{2})\.gz$', file) + if match: + results.append({'filepath': os.path.join(root, file), + 'datestr': match.group(1)}) + + with LogfileDbSaver() as saver: + already_imported = saver.get_datestrs() + + results = list(filter(lambda x: x['datestr'] not in already_imported, results)) + + pbar = tqdm(results) + for result in pbar: + filepath = result['filepath'] + datestr = result['datestr'] + pbar.set_description("Importing data for {}".format(datestr)) + + saver.set_datestr(datestr) + saver.create_tables() + convert(filepath, datestr, saver) + saver.add_missing_devices() + saver.add_missing_receivers() diff --git a/ogn_python/gateway/process_tools.py b/ogn_python/gateway/process_tools.py index 3971a36..4999ae6 100644 --- a/ogn_python/gateway/process_tools.py +++ b/ogn_python/gateway/process_tools.py @@ -1,173 +1,316 @@ -from datetime import datetime, timedelta -from ogn.parser import parse, ParseError -from ogn_python.model import AircraftBeacon, ReceiverBeacon, Location -from ogn_python.collect.database import upsert +from ogn_python import db -from mgrs import MGRS +def create_tables(postfix): + """Create tables for log file import.""" + + db.session.execute('DROP TABLE IF EXISTS "aircraft_beacons_{0}"; CREATE TABLE aircraft_beacons_{0} AS TABLE aircraft_beacons WITH NO DATA;'.format(postfix)) + db.session.execute('DROP TABLE IF EXISTS "receiver_beacons_{0}"; CREATE TABLE receiver_beacons_{0} AS TABLE receiver_beacons WITH NO DATA;'.format(postfix)) + db.session.commit() -# define message types we want to proceed -AIRCRAFT_BEACON_TYPES = ['aprs_aircraft', 'flarm', 'tracker', 'fanet', 'lt24', 'naviter', 'skylines', 'spider', 'spot'] -RECEIVER_BEACON_TYPES = ['aprs_receiver', 'receiver'] +def create_indices(postfix): + """Creates indices for aircraft- and receiver-beacons.""" -# define fields we want to proceed -BEACON_KEY_FIELDS = ['name', 'receiver_name', 'timestamp'] -AIRCRAFT_BEACON_FIELDS = ['location', 'altitude', 'dstcall', 'relay', 'track', 'ground_speed', 'address_type', 'aircraft_type', 'stealth', 'address', 'climb_rate', 'turn_rate', 'signal_quality', 'error_count', 'frequency_offset', 'gps_quality_horizontal', 'gps_quality_vertical', 'software_version', 'hardware_version', 'real_address', 'signal_power', 'distance', 'radial', 'quality', 'location_mgrs', 'location_mgrs_short', 'agl', 'receiver_id', 'device_id'] -RECEIVER_BEACON_FIELDS = ['location', 'altitude', 'dstcall', 'relay', 'version', 'platform', 'cpu_load', 'free_ram', 'total_ram', 'ntp_error', 'rt_crystal_correction', 'voltage', 'amperage', 'cpu_temp', 'senders_visible', 'senders_total', 'rec_input_noise', 'senders_signal', 'senders_messages', 'good_senders_signal', 'good_senders', 'good_and_bad_senders'] + db.session.execute(""" + CREATE INDEX IF NOT EXISTS ix_aircraft_beacons_{0}_device_id ON "aircraft_beacons_{0}" (device_id NULLS FIRST); + CREATE INDEX IF NOT EXISTS ix_aircraft_beacons_{0}_receiver_id ON "aircraft_beacons_{0}" (receiver_id NULLS FIRST); + CREATE INDEX IF NOT EXISTS ix_aircraft_beacons_{0}_timestamp_name_receiver_name ON "aircraft_beacons_{0}" (timestamp, name, receiver_name); + CREATE INDEX IF NOT EXISTS ix_receiver_beacons_{0}_timestamp_name_receiver_name ON "receiver_beacons_{0}" (timestamp, name, receiver_name); + """.format(postfix)) + db.session.commit() -myMGRS = MGRS() +def create_indices_bigdata(postfix): + """Creates indices for aircraft- and receiver-beacons.""" + + db.session.execute(""" + CREATE INDEX IF NOT EXISTS ix_aircraft_beacons_{0}_timestamp_name_receiver_name ON "aircraft_beacons_{0}" (timestamp, name, receiver_name); + CREATE INDEX IF NOT EXISTS ix_receiver_beacons_{0}_timestamp_name_receiver_name ON "receiver_beacons_{0}" (timestamp, name, receiver_name); + """.format(postfix)) + db.session.commit() -def _replace_lonlat_with_wkt(message): - latitude = message['latitude'] - longitude = message['longitude'] +def add_missing_devices(postfix): + """Add missing devices.""" - location = Location(longitude, latitude) - message['location_wkt'] = location.to_wkt() - location_mgrs = myMGRS.toMGRS(latitude, longitude).decode('utf-8') - message['location_mgrs'] = location_mgrs - message['location_mgrs_short'] = location_mgrs[0:5] + location_mgrs[5:7] + location_mgrs[10:12] - del message['latitude'] - del message['longitude'] - return message + db.session.execute(""" + INSERT INTO devices(address) + SELECT DISTINCT(ab.address) + FROM "aircraft_beacons_{0}" AS ab + WHERE NOT EXISTS (SELECT 1 FROM devices AS d WHERE d.address = ab.address) + ORDER BY ab.address; + """.format(postfix)) + db.session.commit() -def string_to_message(raw_string, reference_date): - global receivers +def add_missing_receivers(postfix): + """Add missing receivers.""" - try: - message = parse(raw_string, reference_date) - except NotImplementedError as e: - print('No parser implemented for message: {}'.format(raw_string)) - return None - except ParseError as e: - print('Parsing error with message: {}'.format(raw_string)) - return None - except TypeError as e: - print('TypeError with message: {}'.format(raw_string)) - return None - except Exception as e: - print(raw_string) - print(e) - return None - - # update reference receivers and distance to the receiver - if message['aprs_type'] == 'position': - if message['beacon_type'] in AIRCRAFT_BEACON_TYPES + RECEIVER_BEACON_TYPES: - message = _replace_lonlat_with_wkt(message) - - if message['beacon_type'] in AIRCRAFT_BEACON_TYPES and 'gps_quality' in message: - if message['gps_quality'] is not None and 'horizontal' in message['gps_quality']: - message['gps_quality_horizontal'] = message['gps_quality']['horizontal'] - message['gps_quality_vertical'] = message['gps_quality']['vertical'] - del message['gps_quality'] - - # update raw_message - message['raw_message'] = raw_string - - return message + db.session.execute(""" + INSERT INTO receivers(name) + SELECT DISTINCT(rb.name) + FROM "receiver_beacons_{0}" AS rb + WHERE NOT EXISTS (SELECT 1 FROM receivers AS r WHERE r.name = rb.name) + ORDER BY name; + """.format(postfix)) + db.session.commit() -class DbSaver: - def __init__(self, session): - self.session = session - self.aircraft_message_map = dict() - self.receiver_message_map = dict() - self.last_commit = datetime.utcnow() +def update_receiver_location(postfix): + """Updates the receiver location. We need this because we want the actual location for distance calculations.""" - def _put_in_map(self, message, my_map): - key = message['name'] + message['receiver_name'] + message['timestamp'].strftime('%s') - - if key in my_map: - other = my_map[key] - merged = {k: message[k] if message[k] is not None else other[k] for k in message.keys()} - my_map[key] = merged - else: - my_map[key] = message - - def add_raw_message(self, raw_string, reference_date=None): - if not reference_date: - reference_date=datetime.utcnow() - message = string_to_message(raw_string, reference_date=reference_date) - if message is not None: - self.add_message(message) - else: - print(raw_string) - - def add_message(self, message): - if message is None or ('raw_message' in message and message['raw_message'][0] == '#') or 'beacon_type' not in message: - return - - if 'location_wkt' in message: - message['location'] = message.pop('location_wkt') # total_time_wasted_here = 3 - - if message['beacon_type'] in AIRCRAFT_BEACON_TYPES: - complete_message = {k: message[k] if k in message else None for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS} - self._put_in_map(message=complete_message, my_map=self.aircraft_message_map) - elif message['beacon_type'] in RECEIVER_BEACON_TYPES: - complete_message = {k: message[k] if k in message else None for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS} - self._put_in_map(message=complete_message, my_map=self.receiver_message_map) - else: - print("Ignore beacon_type: {}".format(message['beacon_type'])) - return - - elapsed_time = datetime.utcnow() - self.last_commit - if elapsed_time >= timedelta(seconds=5): - self.flush() - - def flush(self): - if len(self.aircraft_message_map) > 0: - messages = list(self.aircraft_message_map.values()) - upsert(session=self.session, model=AircraftBeacon, rows=messages, update_cols=AIRCRAFT_BEACON_FIELDS) - if len(self.receiver_message_map) > 0: - messages = list(self.receiver_message_map.values()) - upsert(session=self.session, model=ReceiverBeacon, rows=messages, update_cols=RECEIVER_BEACON_FIELDS) - self.session.commit() - - self.aircraft_message_map = dict() - self.receiver_message_map = dict() - self.last_commit = datetime.utcnow() + db.session.execute(""" + UPDATE receivers AS r + SET + location = sq.location, + altitude = sq.altitude + FROM ( + SELECT DISTINCT ON (rb.receiver_id) rb.receiver_id, rb.location, rb.altitude + FROM "receiver_beacons_{0}" AS rb + WHERE rb.location IS NOT NULL + ORDER BY rb.receiver_id, rb.timestamp + ) AS sq + WHERE r.id = sq.receiver_id; + """.format(postfix)) + db.session.commit() -import os, gzip, csv +def update_receiver_beacons(postfix): + """Updates the foreign keys.""" + + db.session.execute(""" + UPDATE receiver_beacons_{0} AS rb + SET receiver_id = r.id + FROM receivers AS r + WHERE rb.name = r.name; + """.format(postfix)) + db.session.commit() -class FileSaver: - def __init__(self): - self.aircraft_messages = list() - self.receiver_messages = list() +def update_receiver_beacons_bigdata(postfix): + """Updates the foreign keys. + Due to performance reasons we use a new table instead of updating the old.""" - def open(self, path, reference_date_string): - aircraft_beacon_filename = os.path.join(path, 'aircraft_beacons.csv_' + reference_date_string + '.gz') - receiver_beacon_filename = os.path.join(path, 'receiver_beacons.csv_' + reference_date_string + '.gz') + db.session.execute(""" + SELECT + rb.location, rb.altitude, rb.name, rb.receiver_name, rb.dstcall, rb.timestamp, - if not os.path.exists(aircraft_beacon_filename) and not os.path.exists(receiver_beacon_filename): - self.fout_ab = gzip.open(aircraft_beacon_filename, 'wt') - self.fout_rb = gzip.open(receiver_beacon_filename, 'wt') - else: - raise FileExistsError + rb.version, rb.platform, rb.cpu_load, rb.free_ram, rb.total_ram, rb.ntp_error, rb.rt_crystal_correction, rb.voltage, rb.amperage, + rb.cpu_temp, rb.senders_visible, rb.senders_total, rb.rec_input_noise, rb.senders_signal, rb.senders_messages, rb.good_senders_signal, + rb.good_senders, rb.good_and_bad_senders, - self.aircraft_writer = csv.writer(self.fout_ab, delimiter=',') - self.aircraft_writer.writerow(AircraftBeacon.get_columns()) + r.id AS receiver_id + INTO "receiver_beacons_{0}_temp" + FROM "receiver_beacons_{0}" AS rb, receivers AS r + WHERE rb.name = r.name; - self.receiver_writer = csv.writer(self.fout_rb, delimiter=',') - self.receiver_writer.writerow(ReceiverBeacon.get_columns()) + DROP TABLE IF EXISTS "receiver_beacons_{0}"; + ALTER TABLE "receiver_beacons_{0}_temp" RENAME TO "receiver_beacons_{0}"; + """.format(postfix)) + db.session.commit() - return 1 - def add_message(self, beacon): - if isinstance(beacon, AircraftBeacon): - self.aircraft_messages.append(beacon.get_values()) - elif isinstance(beacon, ReceiverBeacon): - self.receiver_messages.append(beacon.get_values()) +def update_aircraft_beacons(postfix): + """Updates the foreign keys and calculates distance/radial and quality and computes the altitude above ground level. + Elevation data has to be in the table 'elevation' with srid 4326.""" - def flush(self): - self.aircraft_writer.writerows(self.aircraft_messages) - self.receiver_writer.writerows(self.receiver_messages) - self.aircraft_messages = list() - self.receiver_messages = list() + db.session.execute(""" + UPDATE aircraft_beacons_{0} AS ab + SET + device_id = d.id, + receiver_id = r.id, + distance = CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(ST_DistanceSphere(ab.location, r.location) AS REAL) ELSE NULL END, + radial = CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(degrees(ST_Azimuth(ab.location, r.location)) AS SMALLINT) ELSE NULL END, + quality = CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL AND ST_DistanceSphere(ab.location, r.location) > 0 AND ab.signal_quality IS NOT NULL + THEN CAST(signal_quality + 20*log(ST_DistanceSphere(ab.location, r.location)/10000) AS REAL) + ELSE NULL + END - def close(self): - self.fout_ab.close() - self.fout_rb.close() + FROM devices AS d, receivers AS r + WHERE ab.address = d.address AND ab.receiver_name = r.name; + """.format(postfix)) + db.session.commit() + + +def update_aircraft_beacons_bigdata(postfix): + """Updates the foreign keys and calculates distance/radial and quality and computes the altitude above ground level. + Elevation data has to be in the table 'elevation' with srid 4326. + Due to performance reasons we use a new table instead of updating the old.""" + + db.session.execute(""" + SELECT + ab.location, ab.altitude, ab.name, ab.dstcall, ab.relay, ab.receiver_name, ab.timestamp, ab.track, ab.ground_speed, + + ab.address_type, ab.aircraft_type, ab.stealth, ab.address, ab.climb_rate, ab.turn_rate, ab.signal_quality, ab.error_count, + ab.frequency_offset, ab.gps_quality_horizontal, ab.gps_quality_vertical, ab.software_version, ab.hardware_version, ab.real_address, ab.signal_power, + + ab.location_mgrs, + ab.location_mgrs_short, + + d.id AS device_id, + r.id AS receiver_id, + CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(ST_DistanceSphere(ab.location, r.location) AS REAL) ELSE NULL END AS distance, + CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(degrees(ST_Azimuth(ab.location, r.location)) AS SMALLINT) ELSE NULL END AS radial, + CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL AND ST_DistanceSphere(ab.location, r.location) > 0 AND ab.signal_quality IS NOT NULL + THEN CAST(signal_quality + 20*log(ST_DistanceSphere(ab.location, r.location)/10000) AS REAL) + ELSE NULL + END AS quality, + CAST(ab.altitude - ST_Value(e.rast, ab.location) AS REAL) AS agl + + INTO "aircraft_beacons_{0}_temp" + FROM "aircraft_beacons_{0}" AS ab, devices AS d, receivers AS r, elevation AS e + WHERE ab.address = d.address AND receiver_name = r.name AND ST_Intersects(e.rast, ab.location); + + DROP TABLE IF EXISTS "aircraft_beacons_{0}"; + ALTER TABLE "aircraft_beacons_{0}_temp" RENAME TO "aircraft_beacons_{0}"; + """.format(postfix)) + db.session.commit() + + +def delete_receiver_beacons(postfix): + """Delete beacons from table.""" + + db.session.execute(""" + DELETE FROM receiver_beacons_continuous_import AS rb + USING ( + SELECT name, receiver_name, timestamp + FROM receiver_beacons_continuous_import + WHERE receiver_id IS NOT NULL + ) AS sq + WHERE rb.name = sq.name AND rb.receiver_name = sq.receiver_name AND rb.timestamp = sq.timestamp + """.format(postfix)) + db.session.commit() + + +def delete_aircraft_beacons(postfix): + """Delete beacons from table.""" + + db.session.execute(""" + DELETE FROM aircraft_beacons_continuous_import AS ab + USING ( + SELECT name, receiver_name, timestamp + FROM aircraft_beacons_continuous_import + WHERE receiver_id IS NOT NULL and device_id IS NOT NULL + ) AS sq + WHERE ab.name = sq.name AND ab.receiver_name = sq.receiver_name AND ab.timestamp = sq.timestamp + """.format(postfix)) + db.session.commit() + + +def get_merged_aircraft_beacons_subquery(postfix): + """Some beacons are split into position and status beacon. With this query we merge them into one beacon.""" + + return """ + SELECT + ST_AsEWKT(MAX(location)) AS location, + MAX(altitude) AS altitude, + name, + MAX(dstcall) AS dstcall, + MAX(relay) AS relay, + receiver_name, + timestamp, + MAX(track) AS track, + MAX(ground_speed) AS ground_speed, + + MAX(address_type) AS address_type, + MAX(aircraft_type) AS aircraft_type, + CAST(MAX(CAST(stealth AS int)) AS boolean) AS stealth, + MAX(address) AS address, + MAX(climb_rate) AS climb_rate, + MAX(turn_rate) AS turn_rate, + MAX(signal_quality) AS signal_quality, + MAX(error_count) AS error_count, + MAX(frequency_offset) AS frequency_offset, + MAX(gps_quality_horizontal) AS gps_quality_horizontal, + MAX(gps_quality_vertical) AS gps_quality_vertical, + MAX(software_version) AS software_version, + MAX(hardware_version) AS hardware_version, + MAX(real_address) AS real_address, + MAX(signal_power) AS signal_power, + + CAST(MAX(distance) AS REAL) AS distance, + CAST(MAX(radial) AS REAL) AS radial, + CAST(MAX(quality) AS REAL) AS quality, + CAST(MAX(agl) AS REAL) AS agl, + MAX(location_mgrs) AS location_mgrs, + MAX(location_mgrs_short) AS location_mgrs_short, + + MAX(receiver_id) AS receiver_id, + MAX(device_id) AS device_id + FROM "aircraft_beacons_{0}" AS ab + GROUP BY timestamp, name, receiver_name + ORDER BY timestamp, name, receiver_name + """.format(postfix) + + +def get_merged_receiver_beacons_subquery(postfix): + """Some beacons are split into position and status beacon. With this query we merge them into one beacon.""" + + return """ + SELECT + ST_AsEWKT(MAX(location)) AS location, + MAX(altitude) AS altitude, + name, + receiver_name, + MAX(dstcall) AS dstcall, + timestamp, + + MAX(version) AS version, + MAX(platform) AS platform, + MAX(cpu_load) AS cpu_load, + MAX(free_ram) AS free_ram, + MAX(total_ram) AS total_ram, + MAX(ntp_error) AS ntp_error, + MAX(rt_crystal_correction) AS rt_crystal_correction, + MAX(voltage) AS voltage, + MAX(amperage) AS amperage, + MAX(cpu_temp) AS cpu_temp, + MAX(senders_visible) AS senders_visible, + MAX(senders_total) AS senders_total, + MAX(rec_input_noise) AS rec_input_noise, + MAX(senders_signal) AS senders_signal, + MAX(senders_messages) AS senders_messages, + MAX(good_senders_signal) AS good_senders_signal, + MAX(good_senders) AS good_senders, + MAX(good_and_bad_senders) AS good_and_bad_senders, + + MAX(receiver_id) AS receiver_id + FROM "receiver_beacons_{0}" AS rb + GROUP BY timestamp, name, receiver_name + ORDER BY timestamp, name, receiver_name + """.format(postfix) + + +def transfer_aircraft_beacons(postfix): + query = """ + INSERT INTO aircraft_beacons(location, altitude, name, dstcall, relay, receiver_name, timestamp, track, ground_speed, + address_type, aircraft_type, stealth, address, climb_rate, turn_rate, signal_quality, error_count, frequency_offset, gps_quality_horizontal, gps_quality_vertical, software_version, hardware_version, real_address, signal_power, + distance, radial, quality, agl, location_mgrs, location_mgrs_short, + receiver_id, device_id) + SELECT sq.* + FROM ({}) sq + WHERE sq.receiver_id IS NOT NULL AND sq.device_id IS NOT NULL + ON CONFLICT DO NOTHING; + """.format(get_merged_aircraft_beacons_subquery(postfix)) + + db.session.execute(query) + db.session.commit() + + +def transfer_receiver_beacons(postfix): + query = """ + INSERT INTO receiver_beacons(location, altitude, name, receiver_name, dstcall, timestamp, + + version, platform, cpu_load, free_ram, total_ram, ntp_error, rt_crystal_correction, voltage, + amperage, cpu_temp, senders_visible, senders_total, rec_input_noise, senders_signal, + senders_messages, good_senders_signal, good_senders, good_and_bad_senders, + + receiver_id) + SELECT sq.* + FROM ({}) sq + WHERE sq.receiver_id IS NOT NULL + ON CONFLICT DO NOTHING; + """.format(get_merged_receiver_beacons_subquery(postfix)) + + db.session.execute(query) + db.session.commit() diff --git a/ogn_python/model/device.py b/ogn_python/model/device.py index 8251cf8..fc7d868 100644 --- a/ogn_python/model/device.py +++ b/ogn_python/model/device.py @@ -6,6 +6,7 @@ class Device(db.Model): id = db.Column(db.Integer, primary_key=True) + name = db.Column(db.String, index=True) #address = db.Column(db.String(6), index=True) address = db.Column(db.String, index=True) firstseen = db.Column(db.DateTime, index=True) diff --git a/ogn_python/model/device_stats.py b/ogn_python/model/device_stats.py index 34f5277..8865a7d 100644 --- a/ogn_python/model/device_stats.py +++ b/ogn_python/model/device_stats.py @@ -9,6 +9,7 @@ class DeviceStats(db.Model): date = db.Column(db.Date) # Static data + name = db.Column(db.String) firstseen = db.Column(db.DateTime) lastseen = db.Column(db.DateTime) aircraft_type = db.Column(db.SmallInteger)