Refactor bulk import tools

pull/78/head
Konstantin Gründger 2019-09-25 23:17:30 +02:00
rodzic 3d1e9908b3
commit 2b86617237
3 zmienionych plików z 223 dodań i 679 usunięć

Wyświetl plik

@ -1,8 +1,13 @@
import os
import datetime
from flask import current_app from flask import current_app
from flask.cli import AppGroup from flask.cli import AppGroup
import click
from ogn.client import AprsClient from ogn.client import AprsClient
from app.gateway.bulkimport import ContinuousDbFeeder
from app.gateway.bulkimport import convert, DbFeeder
user_cli = AppGroup("gateway") user_cli = AppGroup("gateway")
user_cli.help = "Connection to APRS servers." user_cli.help = "Connection to APRS servers."
@ -10,9 +15,7 @@ user_cli.help = "Connection to APRS servers."
@user_cli.command("run") @user_cli.command("run")
def run(aprs_user="anon-dev"): def run(aprs_user="anon-dev"):
"""Run the aprs client.""" """Run the aprs client and feed the DB with incoming data."""
saver = ContinuousDbFeeder()
# User input validation # User input validation
if len(aprs_user) < 3 or len(aprs_user) > 9: if len(aprs_user) < 3 or len(aprs_user) > 9:
@ -23,10 +26,24 @@ def run(aprs_user="anon-dev"):
client = AprsClient(aprs_user) client = AprsClient(aprs_user)
client.connect() client.connect()
try: with DbFeeder(prefix='continuous_import', reference_timestamp=datetime.utcnow, reference_timestamp_autoupdate=True) as feeder:
client.run(callback=saver.add, autoreconnect=True) try:
except KeyboardInterrupt: client.run(callback=lambda x: feeder.add(x), autoreconnect=True)
current_app.logger.warning("\nStop ogn gateway") except KeyboardInterrupt:
current_app.logger.warning("\nStop ogn gateway")
saver.flush()
client.disconnect() client.disconnect()
@user_cli.command("convert")
@click.argument("path")
def file_import(path):
"""Convert APRS logfiles into csv files for fast bulk import."""
logfiles = []
for (root, dirs, files) in os.walk(path):
for file in sorted(files):
logfiles.append(os.path.join(root, file))
for logfile in logfiles:
convert(logfile)

Wyświetl plik

@ -1,3 +1,5 @@
import os
import re
from datetime import datetime, timedelta from datetime import datetime, timedelta
from io import StringIO from io import StringIO
@ -9,9 +11,8 @@ from mgrs import MGRS
from ogn.parser import parse, ParseError from ogn.parser import parse, ParseError
from app.model import AircraftBeacon, AircraftType, ReceiverBeacon, Location from app.model import AircraftType, Location
from app.utils import open_file from app.gateway.process_tools import open_file, create_tables, drop_tables, update_aircraft_beacons_bigdata
from app.gateway.process_tools import create_indices, add_missing_devices, add_missing_receivers, update_aircraft_beacons, update_receiver_beacons, update_receiver_location, transfer_aircraft_beacons, transfer_receiver_beacons, delete_aircraft_beacons, delete_receiver_beacons, update_aircraft_beacons_bigdata, update_receiver_beacons_bigdata, create_tables
from app import db from app import db
@ -19,6 +20,8 @@ user_cli = AppGroup("bulkimport")
user_cli.help = "Tools for accelerated data import." user_cli.help = "Tools for accelerated data import."
basepath = os.path.dirname(os.path.realpath(__file__))
# define message types we want to proceed # define message types we want to proceed
AIRCRAFT_BEACON_TYPES = ["aprs_aircraft", "flarm", "tracker", "fanet", "lt24", "naviter", "skylines", "spider", "spot", "flymaster"] AIRCRAFT_BEACON_TYPES = ["aprs_aircraft", "flarm", "tracker", "fanet", "lt24", "naviter", "skylines", "spider", "spot", "flymaster"]
RECEIVER_BEACON_TYPES = ["aprs_receiver", "receiver"] RECEIVER_BEACON_TYPES = ["aprs_receiver", "receiver"]
@ -53,361 +56,151 @@ AIRCRAFT_BEACON_FIELDS = [
"location_mgrs", "location_mgrs",
"location_mgrs_short", "location_mgrs_short",
"agl", "agl",
"receiver_id",
"device_id",
] ]
RECEIVER_BEACON_FIELDS = [ RECEIVER_BEACON_FIELDS = [
"location", "location",
"altitude", "altitude",
"dstcall", "dstcall",
"relay",
"version",
"platform",
"cpu_load",
"free_ram",
"total_ram",
"ntp_error",
"rt_crystal_correction",
"voltage",
"amperage",
"cpu_temp",
"senders_visible",
"senders_total",
"rec_input_noise",
"senders_signal",
"senders_messages",
"good_senders_signal",
"good_senders",
"good_and_bad_senders",
] ]
myMGRS = MGRS() def initial_file_scan(file):
"""Scan file and get rowcount and first server timestamp."""
row_count = 0
timestamp = None
for row in file:
row_count += 1
if timestamp is None and row[0] == '#':
message = parse(row)
if message['aprs_type'] == 'server':
timestamp = message['timestamp']
file.seek(0)
return row_count, timestamp
def string_to_message(raw_string, reference_date): class DbFeeder:
global receivers def __init__(self, postfix, reference_timestamp, auto_update_timestamp):
self.postfix = postfix
self.reference_timestamp = reference_timestamp
self.auto_update_timestamp = auto_update_timestamp
try: self.last_flush = datetime.utcnow()
message = parse(raw_string, reference_date)
except NotImplementedError as e:
current_app.logger.error("No parser implemented for message: {}".format(raw_string))
return None
except ParseError as e:
current_app.logger.error("Parsing error with message: {}".format(raw_string))
return None
except TypeError as e:
current_app.logger.error("TypeError with message: {}".format(raw_string))
return None
except Exception as e:
current_app.logger.error("Other Exception with string: {}".format(raw_string))
return None
# update reference receivers and distance to the receiver self.aircraft_buffer = StringIO()
if message["aprs_type"] == "position": self.receiver_buffer = StringIO()
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES + RECEIVER_BEACON_TYPES:
self.connection = db.engine.raw_connection()
self.cursor = self.connection.cursor()
self.mgrs = MGRS()
create_tables(self.postfix)
def __enter__(self):
return self
def __exit__(self, *args):
self._flush()
update_aircraft_beacons_bigdata(self.postfix)
self.connection.commit()
self.cursor.close()
self.connection.close()
def _flush(self):
self.aircraft_buffer.seek(0)
self.receiver_buffer.seek(0)
self.cursor.copy_from(self.aircraft_buffer, "aircraft_beacons_{postfix}".format(postfix=self.postfix), sep=",", columns=BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS)
self.cursor.copy_from(self.receiver_buffer, "receiver_beacons_{postfix}".format(postfix=self.postfix), sep=",", columns=BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS)
self.connection.commit()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
def add(self, raw_string):
try:
message = parse(raw_string, reference_timestamp=self.reference_timestamp)
except NotImplementedError as e:
current_app.logger.error("No parser implemented for message: {}".format(raw_string))
return
except ParseError as e:
current_app.logger.error("Parsing error with message: {}".format(raw_string))
return
except TypeError as e:
current_app.logger.error("TypeError with message: {}".format(raw_string))
return
except Exception as e:
current_app.logger.error("Other Exception with string: {}".format(raw_string))
return
if message['aprs_type'] not in ('server', 'position'):
return
elif message['aprs_type'] == 'server' and self.auto_update_timestamp is True:
self.reference_timestamp = message['timestamp']
return
elif message['aprs_type'] == 'position':
latitude = message["latitude"] latitude = message["latitude"]
longitude = message["longitude"] longitude = message["longitude"]
location = Location(longitude, latitude) location = Location(longitude, latitude)
message["location"] = location.to_wkt() message["location"] = location.to_wkt()
location_mgrs = myMGRS.toMGRS(latitude, longitude).decode("utf-8")
location_mgrs = self.mgrs.toMGRS(latitude, longitude).decode("utf-8")
message["location_mgrs"] = location_mgrs message["location_mgrs"] = location_mgrs
message["location_mgrs_short"] = location_mgrs[0:5] + location_mgrs[5:7] + location_mgrs[10:12] message["location_mgrs_short"] = location_mgrs[0:5] + location_mgrs[5:7] + location_mgrs[10:12]
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES and "aircraft_type" in message: if "aircraft_type" in message:
message["aircraft_type"] = AircraftType(message["aircraft_type"]).name if message["aircraft_type"] else AircraftType.UNKNOWN.name message["aircraft_type"] = AircraftType(message["aircraft_type"]).name if message["aircraft_type"] in AircraftType.list() else AircraftType.UNKNOWN.name
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES and "gps_quality" in message: if "gps_quality" in message:
if message["gps_quality"] is not None and "horizontal" in message["gps_quality"]: if message["gps_quality"] is not None and "horizontal" in message["gps_quality"]:
message["gps_quality_horizontal"] = message["gps_quality"]["horizontal"] message["gps_quality_horizontal"] = message["gps_quality"]["horizontal"]
message["gps_quality_vertical"] = message["gps_quality"]["vertical"] message["gps_quality_vertical"] = message["gps_quality"]["vertical"]
del message["gps_quality"] del message["gps_quality"]
# TODO: Fix python-ogn-client 0.91 if message["beacon_type"] in RECEIVER_BEACON_TYPES:
if "senders_messages" in message and message["senders_messages"] is not None:
message["senders_messages"] = int(message["senders_messages"])
if "good_senders" in message and message["good_senders"] is not None:
message["good_senders"] = int(message["good_senders"])
if "good_and_bad_senders" in message and message["good_and_bad_senders"] is not None:
message["good_and_bad_senders"] = int(message["good_and_bad_senders"])
return message
class ContinuousDbFeeder:
def __init__(self,):
self.postfix = "continuous_import"
self.last_flush = datetime.utcnow()
self.last_add_missing = datetime.utcnow()
self.last_transfer = datetime.utcnow()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
create_tables(self.postfix)
create_indices(self.postfix)
def add(self, raw_string):
message = string_to_message(raw_string, reference_date=datetime.utcnow())
if message is None or ("raw_message" in message and message["raw_message"][0] == "#") or "beacon_type" not in message:
return
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES:
complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS])
self.aircraft_buffer.write(complete_message)
self.aircraft_buffer.write("\n")
elif message["beacon_type"] in RECEIVER_BEACON_TYPES:
complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS]) complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS])
self.receiver_buffer.write(complete_message) self.receiver_buffer.write(complete_message)
self.receiver_buffer.write("\n") self.receiver_buffer.write("\n")
elif message["beacon_type"] in AIRCRAFT_BEACON_TYPES:
complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS])
self.aircraft_buffer.write(complete_message)
self.aircraft_buffer.write("\n")
else: else:
current_app.logger.error("Ignore beacon_type: {}".format(message["beacon_type"])) current_app.logger.error("Ignore beacon_type: {}".format(message["beacon_type"]))
return return
if datetime.utcnow() - self.last_flush >= timedelta(seconds=20): if datetime.utcnow() - self.last_flush >= timedelta(seconds=5):
self.flush() self._flush()
self.prepare()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
self.last_flush = datetime.utcnow() self.last_flush = datetime.utcnow()
if datetime.utcnow() - self.last_add_missing >= timedelta(seconds=60):
self.add_missing()
self.last_add_missing = datetime.utcnow()
if datetime.utcnow() - self.last_transfer >= timedelta(seconds=30): def convert(sourcefile):
self.transfer() with open_file(sourcefile) as filehandler:
self.delete_beacons() total_lines, reference_timestamp = initial_file_scan(filehandler)
self.last_transfer = datetime.utcnow()
def flush(self): if reference_timestamp is not None:
self.aircraft_buffer.seek(0) auto_update_timestamp = True
self.receiver_buffer.seek(0) postfix = str(reference_timestamp.total_seconds())
else:
connection = db.engine.raw_connection() auto_update_timestamp = False
cursor = connection.cursor() match = re.match(r".*OGN_log\.txt_([0-9]{4}\-[0-9]{2}\-[0-9]{2})\.gz$", sourcefile)
cursor.copy_from(self.aircraft_buffer, "aircraft_beacons_{0}".format(self.postfix), sep=",", columns=BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS) if match:
cursor.copy_from(self.receiver_buffer, "receiver_beacons_{0}".format(self.postfix), sep=",", columns=BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS) reference_timestamp = datetime.strptime(match.group(1), "%Y-%m-%d")
connection.commit() postfix = reference_timestamp.strftime("%Y_%m_%d")
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
def add_missing(self):
add_missing_receivers(self.postfix)
add_missing_devices(self.postfix)
def prepare(self):
# make receivers complete
update_receiver_beacons(self.postfix)
update_receiver_location(self.postfix)
# make devices complete
update_aircraft_beacons(self.postfix)
def transfer(self):
# tranfer beacons
transfer_aircraft_beacons(self.postfix)
transfer_receiver_beacons(self.postfix)
def delete_beacons(self):
# delete already transfered beacons
delete_receiver_beacons(self.postfix)
delete_aircraft_beacons(self.postfix)
class FileDbFeeder:
def __init__(self):
self.postfix = "continuous_import"
self.last_flush = datetime.utcnow()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
create_tables(self.postfix)
create_indices(self.postfix)
def add(self, raw_string):
message = string_to_message(raw_string, reference_date=datetime.utcnow())
if message is None or ("raw_message" in message and message["raw_message"][0] == "#") or "beacon_type" not in message:
return
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES:
complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS])
self.aircraft_buffer.write(complete_message)
self.aircraft_buffer.write("\n")
elif message["beacon_type"] in RECEIVER_BEACON_TYPES:
complete_message = ",".join([str(message[k]) if k in message and message[k] is not None else "\\N" for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS])
self.receiver_buffer.write(complete_message)
self.receiver_buffer.write("\n")
else: else:
current_app.logger.error("Ignore beacon_type: {}".format(message["beacon_type"])) current_app.logger.error("No reference time information. Skipping file: {}".format(sourcefile))
return return
def prepare(self): with open_file(sourcefile) as fin:
# make receivers complete with DbFeeder(postfix=postfix, reference_timestamp=reference_timestamp, auto_update_timestamp=auto_update_timestamp) as feeder:
add_missing_receivers(self.postfix) pbar = tqdm(fin, total=total_lines)
update_receiver_location(self.postfix) for line in pbar:
pbar.set_description("Importing {}".format(sourcefile))
# make devices complete feeder.add(raw_string=line)
add_missing_devices(self.postfix)
# prepare beacons for transfer
create_indices(self.postfix)
update_receiver_beacons_bigdata(self.postfix)
update_aircraft_beacons_bigdata(self.postfix)
def get_aircraft_beacons_postfixes():
"""Get the postfixes from imported aircraft_beacons logs."""
postfixes = db.session.execute(
r"""
SELECT DISTINCT(RIGHT(tablename, 8))
FROM pg_catalog.pg_tables
WHERE schemaname = 'public' AND tablename LIKE 'aircraft\_beacons\_20______'
ORDER BY RIGHT(tablename, 10);
"""
).fetchall()
return [postfix for postfix in postfixes]
def export_to_path(postfix):
import os
import gzip
pass # wtf is this?
# aircraft_beacons_file = os.path.join(path, "aircraft_beacons_{0}.csv.gz".format(postfix))
# with gzip.open(aircraft_beacons_file, "wt", encoding="utf-8") as gzip_file:
# self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_aircraft_beacons_subquery()), gzip_file)
# receiver_beacons_file = os.path.join(path, "receiver_beacons_{0}.csv.gz".format(postfix))
# with gzip.open(receiver_beacons_file, "wt") as gzip_file:
# self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_receiver_beacons_subquery()), gzip_file)
def convert(sourcefile, datestr, saver):
from app.gateway.process import string_to_message
from app.gateway.process_tools import AIRCRAFT_BEACON_TYPES, RECEIVER_BEACON_TYPES
from datetime import datetime
fin = open_file(sourcefile)
# get total lines of the input file
total_lines = 0
for line in fin:
total_lines += 1
fin.seek(0)
current_line = 0
steps = 100000
reference_date = datetime.strptime(datestr + " 12:00:00", "%Y-%m-%d %H:%M:%S")
pbar = tqdm(fin, total=total_lines)
for line in pbar:
pbar.set_description("Importing {}".format(sourcefile))
current_line += 1
if current_line % steps == 0:
saver.flush()
message = string_to_message(line.strip(), reference_date=reference_date)
if message is None:
continue
def dictfilt(x, y):
return dict([(i, x[i]) for i in x if i in set(y)])
try:
if message["beacon_type"] in AIRCRAFT_BEACON_TYPES:
message = dictfilt(
message,
(
"beacon_type",
"aprs_type",
"location_wkt",
"altitude",
"name",
"dstcall",
"relay",
"receiver_name",
"timestamp",
"track",
"ground_speed",
"address_type",
"aircraft_type",
"stealth",
"address",
"climb_rate",
"turn_rate",
"signal_quality",
"error_count",
"frequency_offset",
"gps_quality_horizontal",
"gps_quality_vertical",
"software_version",
"hardware_version",
"real_address",
"signal_power",
"distance",
"radial",
"quality",
"agl",
"location_mgrs",
"location_mgrs_short",
"receiver_id",
"device_id",
),
)
beacon = AircraftBeacon(**message)
elif message["beacon_type"] in RECEIVER_BEACON_TYPES:
if "rec_crystal_correction" in message:
del message["rec_crystal_correction"]
del message["rec_crystal_correction_fine"]
beacon = ReceiverBeacon(**message)
saver.add(beacon)
except Exception as e:
print(e)
saver.flush()
fin.close()
@user_cli.command("file_import")
@click.argument("path")
def file_import(path):
"""Import APRS logfiles into separate logfile tables."""
import os
import re
# Get Filepaths and dates to import
results = list()
for (root, dirs, files) in os.walk(path):
for file in sorted(files):
match = re.match(r"OGN_log\.txt_([0-9]{4}\-[0-9]{2}\-[0-9]{2})\.gz$", file)
if match:
results.append({"filepath": os.path.join(root, file), "datestr": match.group(1)})
with LogfileDbSaver() as saver: # noqa: F821
already_imported = saver.get_datestrs()
results = list(filter(lambda x: x["datestr"] not in already_imported, results))
pbar = tqdm(results)
for result in pbar:
filepath = result["filepath"]
datestr = result["datestr"]
pbar.set_description("Importing data for {}".format(datestr))
saver.set_datestr(datestr)
saver.create_tables()
convert(filepath, datestr, saver)
saver.add_missing_devices()
saver.add_missing_receivers()

Wyświetl plik

@ -1,184 +1,69 @@
import os
import gzip
import time
from contextlib import contextmanager
from flask import current_app
from app import db from app import db
@contextmanager
def open_file(filename):
"""Opens a regular OR gzipped textfile for reading."""
file = open(filename, "rb")
a = file.read(2)
file.close()
if a == b"\x1f\x8b":
file = gzip.open(filename, "rt", encoding="latin-1")
else:
file = open(filename, "rt", encoding="latin-1")
try:
yield file
finally:
file.close()
class Timer(object):
def __init__(self, name=None):
self.name = name
def __enter__(self):
self.tstart = time.time()
def __exit__(self, type, value, traceback):
if self.name:
print("[{}]".format(self.name))
print("Elapsed: {}".format(time.time() - self.tstart))
def drop_tables(postfix):
"""Drop tables for log file import."""
db.session.execute("""
DROP TABLE IF EXISTS "aircraft_beacons_{postfix}";
DROP TABLE IF EXISTS "receiver_beacons_{postfix}";
""".format(postfix=postfix))
db.session.commit()
def create_tables(postfix): def create_tables(postfix):
"""Create tables for log file import.""" """Create tables for log file import."""
db.session.execute('DROP TABLE IF EXISTS "aircraft_beacons_{0}"; CREATE TABLE aircraft_beacons_{0} AS TABLE aircraft_beacons WITH NO DATA;'.format(postfix)) drop_tables(postfix)
db.session.execute('DROP TABLE IF EXISTS "receiver_beacons_{0}"; CREATE TABLE receiver_beacons_{0} AS TABLE receiver_beacons WITH NO DATA;'.format(postfix)) db.session.execute("""
db.session.commit() CREATE TABLE aircraft_beacons_{postfix} AS TABLE aircraft_beacons WITH NO DATA;
CREATE TABLE receiver_beacons_{postfix} AS TABLE receiver_beacons WITH NO DATA;
""".format(postfix=postfix))
def create_indices(postfix):
"""Creates indices for aircraft- and receiver-beacons."""
db.session.execute(
"""
CREATE INDEX IF NOT EXISTS ix_aircraft_beacons_{0}_device_id ON "aircraft_beacons_{0}" (device_id NULLS FIRST);
CREATE INDEX IF NOT EXISTS ix_aircraft_beacons_{0}_receiver_id ON "aircraft_beacons_{0}" (receiver_id NULLS FIRST);
CREATE INDEX IF NOT EXISTS ix_aircraft_beacons_{0}_timestamp_name_receiver_name ON "aircraft_beacons_{0}" (timestamp, name, receiver_name);
CREATE INDEX IF NOT EXISTS ix_receiver_beacons_{0}_timestamp_name_receiver_name ON "receiver_beacons_{0}" (timestamp, name, receiver_name);
""".format(
postfix
)
)
db.session.commit()
def create_indices_bigdata(postfix):
"""Creates indices for aircraft- and receiver-beacons."""
db.session.execute(
"""
CREATE INDEX IF NOT EXISTS ix_aircraft_beacons_{0}_timestamp_name_receiver_name ON "aircraft_beacons_{0}" (timestamp, name, receiver_name);
CREATE INDEX IF NOT EXISTS ix_receiver_beacons_{0}_timestamp_name_receiver_name ON "receiver_beacons_{0}" (timestamp, name, receiver_name);
""".format(
postfix
)
)
db.session.commit()
def add_missing_devices(postfix):
"""Add missing devices."""
db.session.execute(
"""
INSERT INTO devices(address)
SELECT DISTINCT (ab.address)
FROM "aircraft_beacons_{0}" AS ab
WHERE ab.address IS NOT NULL AND NOT EXISTS (SELECT 1 FROM devices AS d WHERE d.address = ab.address)
ORDER BY ab.address;
""".format(
postfix
)
)
db.session.commit()
def add_missing_receivers(postfix):
"""Add missing receivers."""
db.session.execute(
"""
INSERT INTO receivers(name)
SELECT DISTINCT (rb.name)
FROM "receiver_beacons_{0}" AS rb
WHERE NOT EXISTS (SELECT 1 FROM receivers AS r WHERE r.name = rb.name)
ORDER BY rb.name;
INSERT INTO receivers(name)
SELECT DISTINCT (ab.receiver_name)
FROM "aircraft_beacons_{0}" AS ab
WHERE NOT EXISTS (SELECT 1 FROM receivers AS r WHERE r.name = ab.receiver_name)
ORDER BY ab.receiver_name;
""".format(
postfix
)
)
db.session.commit()
def update_receiver_location(postfix):
"""Updates the receiver location. We need this because we want the actual location for distance calculations."""
db.session.execute(
"""
UPDATE receivers AS r
SET
location = sq.location,
altitude = sq.altitude
FROM (
SELECT DISTINCT ON (rb.receiver_id) rb.receiver_id, rb.location, rb.altitude
FROM "receiver_beacons_{0}" AS rb
WHERE rb.location IS NOT NULL
ORDER BY rb.receiver_id, rb.timestamp
) AS sq
WHERE r.id = sq.receiver_id;
""".format(
postfix
)
)
db.session.commit()
def update_receiver_beacons(postfix):
"""Updates the foreign keys."""
db.session.execute(
"""
UPDATE receiver_beacons_{0} AS rb
SET receiver_id = r.id
FROM receivers AS r
WHERE rb.receiver_id IS NULL AND rb.name = r.name;
""".format(
postfix
)
)
db.session.commit()
def update_receiver_beacons_bigdata(postfix):
"""Updates the foreign keys.
Due to performance reasons we use a new table instead of updating the old."""
db.session.execute(
"""
SELECT
rb.location, rb.altitude, rb.name, rb.receiver_name, rb.dstcall, rb.timestamp,
rb.version, rb.platform, rb.cpu_load, rb.free_ram, rb.total_ram, rb.ntp_error, rb.rt_crystal_correction, rb.voltage, rb.amperage,
rb.cpu_temp, rb.senders_visible, rb.senders_total, rb.rec_input_noise, rb.senders_signal, rb.senders_messages, rb.good_senders_signal,
rb.good_senders, rb.good_and_bad_senders,
r.id AS receiver_id
INTO "receiver_beacons_{0}_temp"
FROM "receiver_beacons_{0}" AS rb, receivers AS r
WHERE rb.name = r.name;
DROP TABLE IF EXISTS "receiver_beacons_{0}";
ALTER TABLE "receiver_beacons_{0}_temp" RENAME TO "receiver_beacons_{0}";
""".format(
postfix
)
)
db.session.commit()
def update_aircraft_beacons(postfix):
"""Updates the foreign keys and calculates distance/radial and quality and computes the altitude above ground level.
Elevation data has to be in the table 'elevation' with srid 4326."""
db.session.execute(
"""
UPDATE aircraft_beacons_{0} AS ab
SET
device_id = d.id,
receiver_id = r.id,
distance = CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(ST_DistanceSphere(ab.location, r.location) AS REAL) ELSE NULL END,
radial = CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(degrees(ST_Azimuth(ab.location, r.location)) AS SMALLINT) ELSE NULL END,
quality = CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL AND ST_DistanceSphere(ab.location, r.location) > 0 AND ab.signal_quality IS NOT NULL
THEN CAST(signal_quality + 20*log(ST_DistanceSphere(ab.location, r.location)/10000) AS REAL)
ELSE NULL
END,
agl = CAST(ab.altitude - ST_Value(e.rast, ab.location) AS REAL)
FROM devices AS d, receivers AS r, elevation AS e
WHERE ab.device_id IS NULL and ab.receiver_id IS NULL AND ab.address = d.address AND ab.receiver_name = r.name AND ST_Intersects(e.rast, ab.location);
""".format(
postfix
)
)
db.session.commit() db.session.commit()
def update_aircraft_beacons_bigdata(postfix): def update_aircraft_beacons_bigdata(postfix):
"""Updates the foreign keys and calculates distance/radial and quality and computes the altitude above ground level. """Calculates distance/radial and quality and computes the altitude above ground level.
Elevation data has to be in the table 'elevation' with srid 4326.
Due to performance reasons we use a new table instead of updating the old.""" Due to performance reasons we use a new table instead of updating the old."""
db.session.execute( db.session.execute("""
"""
SELECT SELECT
ab.location, ab.altitude, ab.name, ab.dstcall, ab.relay, ab.receiver_name, ab.timestamp, ab.track, ab.ground_speed, ab.location, ab.altitude, ab.name, ab.dstcall, ab.relay, ab.receiver_name, ab.timestamp, ab.track, ab.ground_speed,
@ -188,189 +73,38 @@ def update_aircraft_beacons_bigdata(postfix):
ab.location_mgrs, ab.location_mgrs,
ab.location_mgrs_short, ab.location_mgrs_short,
d.id AS device_id,
r.id AS receiver_id,
CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(ST_DistanceSphere(ab.location, r.location) AS REAL) ELSE NULL END AS distance, CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(ST_DistanceSphere(ab.location, r.location) AS REAL) ELSE NULL END AS distance,
CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(degrees(ST_Azimuth(ab.location, r.location)) AS SMALLINT) ELSE NULL END AS radial, CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(degrees(ST_Azimuth(ab.location, r.location)) AS SMALLINT) % 360 ELSE NULL END AS radial,
CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL AND ST_DistanceSphere(ab.location, r.location) > 0 AND ab.signal_quality IS NOT NULL CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL AND ST_DistanceSphere(ab.location, r.location) > 0 AND ab.signal_quality IS NOT NULL
THEN CAST(signal_quality + 20*log(ST_DistanceSphere(ab.location, r.location)/10000) AS REAL) THEN CAST(signal_quality + 20*log(ST_DistanceSphere(ab.location, r.location)/10000) AS REAL)
ELSE NULL ELSE NULL
END AS quality, END AS quality,
CAST(ab.altitude - ST_Value(e.rast, ab.location) AS REAL) AS agl CAST((ab.altitude - subtable.elev_m) AS REAL) AS agl
INTO aircraft_beacons_{postfix}_temp
FROM
aircraft_beacons_{postfix} AS ab
JOIN LATERAL (
SELECT ab.location, MAX(ST_NearestValue(e.rast, ab.location)) as elev_m
FROM elevation e
WHERE ST_Intersects(ab.location, e.rast)
GROUP BY ab.location
) AS subtable ON TRUE,
(SELECT name, last(location, timestamp) AS location FROM receiver_beacons_{postfix} GROUP BY name) AS r
WHERE ab.receiver_name = r.name;
INTO "aircraft_beacons_{0}_temp" DROP TABLE IF EXISTS "aircraft_beacons_{postfix}";
FROM "aircraft_beacons_{0}" AS ab, devices AS d, receivers AS r, elevation AS e ALTER TABLE "aircraft_beacons_{postfix}_temp" RENAME TO "aircraft_beacons_{postfix}";
WHERE ab.address = d.address AND receiver_name = r.name AND ST_Intersects(e.rast, ab.location); """.format(postfix=postfix))
DROP TABLE IF EXISTS "aircraft_beacons_{0}";
ALTER TABLE "aircraft_beacons_{0}_temp" RENAME TO "aircraft_beacons_{0}";
""".format(
postfix
)
)
db.session.commit()
def delete_receiver_beacons(postfix): def export_to_path(postfix, path):
"""Delete beacons from table.""" connection = db.engine.raw_connection()
cursor = connection.cursor()
db.session.execute( aircraft_beacons_file = os.path.join(path, "aircraft_beacons_{postfix}.csv.gz".format(postfix=postfix))
""" with gzip.open(aircraft_beacons_file, "wt", encoding="utf-8") as gzip_file:
DELETE FROM receiver_beacons_continuous_import AS rb cursor.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format("SELECT * FROM aircraft_beacons_{postfix}".format(postfix=postfix)), gzip_file)
USING (
SELECT name, receiver_name, timestamp
FROM receiver_beacons_continuous_import
WHERE receiver_id IS NOT NULL
) AS sq
WHERE rb.name = sq.name AND rb.receiver_name = sq.receiver_name AND rb.timestamp = sq.timestamp
""".format(
postfix
)
)
db.session.commit()
receiver_beacons_file = os.path.join(path, "receiver_beacons_{postfix}.csv.gz".format(postfix=postfix))
def delete_aircraft_beacons(postfix): with gzip.open(receiver_beacons_file, "wt") as gzip_file:
"""Delete beacons from table.""" cursor.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format("SELECT * FROM receiver_beacons_{postfix}".format(postfix=postfix)), gzip_file)
db.session.execute(
"""
DELETE FROM aircraft_beacons_continuous_import AS ab
USING (
SELECT name, receiver_name, timestamp
FROM aircraft_beacons_continuous_import
WHERE receiver_id IS NOT NULL and device_id IS NOT NULL
) AS sq
WHERE ab.name = sq.name AND ab.receiver_name = sq.receiver_name AND ab.timestamp = sq.timestamp
""".format(
postfix
)
)
db.session.commit()
def get_merged_aircraft_beacons_subquery(postfix):
"""Some beacons are split into position and status beacon. With this query we merge them into one beacon."""
return """
SELECT
ST_AsEWKT(MAX(location)) AS location,
MAX(altitude) AS altitude,
name,
MAX(dstcall) AS dstcall,
MAX(relay) AS relay,
receiver_name,
timestamp,
MAX(track) AS track,
MAX(ground_speed) AS ground_speed,
MAX(address_type) AS address_type,
MAX(aircraft_type) AS aircraft_type,
CAST(MAX(CAST(stealth AS int)) AS boolean) AS stealth,
MAX(address) AS address,
MAX(climb_rate) AS climb_rate,
MAX(turn_rate) AS turn_rate,
MAX(signal_quality) AS signal_quality,
MAX(error_count) AS error_count,
MAX(frequency_offset) AS frequency_offset,
MAX(gps_quality_horizontal) AS gps_quality_horizontal,
MAX(gps_quality_vertical) AS gps_quality_vertical,
MAX(software_version) AS software_version,
MAX(hardware_version) AS hardware_version,
MAX(real_address) AS real_address,
MAX(signal_power) AS signal_power,
CAST(MAX(distance) AS REAL) AS distance,
CAST(MAX(radial) AS REAL) AS radial,
CAST(MAX(quality) AS REAL) AS quality,
CAST(MAX(agl) AS REAL) AS agl,
MAX(location_mgrs) AS location_mgrs,
MAX(location_mgrs_short) AS location_mgrs_short,
MAX(receiver_id) AS receiver_id,
MAX(device_id) AS device_id
FROM "aircraft_beacons_{0}" AS ab
GROUP BY timestamp, name, receiver_name
ORDER BY timestamp, name, receiver_name
""".format(
postfix
)
def get_merged_receiver_beacons_subquery(postfix):
"""Some beacons are split into position and status beacon. With this query we merge them into one beacon."""
return """
SELECT
ST_AsEWKT(MAX(location)) AS location,
MAX(altitude) AS altitude,
name,
receiver_name,
MAX(dstcall) AS dstcall,
timestamp,
MAX(version) AS version,
MAX(platform) AS platform,
MAX(cpu_load) AS cpu_load,
MAX(free_ram) AS free_ram,
MAX(total_ram) AS total_ram,
MAX(ntp_error) AS ntp_error,
MAX(rt_crystal_correction) AS rt_crystal_correction,
MAX(voltage) AS voltage,
MAX(amperage) AS amperage,
MAX(cpu_temp) AS cpu_temp,
MAX(senders_visible) AS senders_visible,
MAX(senders_total) AS senders_total,
MAX(rec_input_noise) AS rec_input_noise,
MAX(senders_signal) AS senders_signal,
MAX(senders_messages) AS senders_messages,
MAX(good_senders_signal) AS good_senders_signal,
MAX(good_senders) AS good_senders,
MAX(good_and_bad_senders) AS good_and_bad_senders,
MAX(receiver_id) AS receiver_id
FROM "receiver_beacons_{0}" AS rb
GROUP BY timestamp, name, receiver_name
ORDER BY timestamp, name, receiver_name
""".format(
postfix
)
def transfer_aircraft_beacons(postfix):
query = """
INSERT INTO aircraft_beacons(location, altitude, name, dstcall, relay, receiver_name, timestamp, track, ground_speed,
address_type, aircraft_type, stealth, address, climb_rate, turn_rate, signal_quality, error_count, frequency_offset, gps_quality_horizontal, gps_quality_vertical, software_version, hardware_version, real_address, signal_power,
distance, radial, quality, agl, location_mgrs, location_mgrs_short,
receiver_id, device_id)
SELECT sq.*
FROM ({}) sq
WHERE sq.receiver_id IS NOT NULL AND sq.device_id IS NOT NULL
ON CONFLICT DO NOTHING;
""".format(
get_merged_aircraft_beacons_subquery(postfix)
)
db.session.execute(query)
db.session.commit()
def transfer_receiver_beacons(postfix):
query = """
INSERT INTO receiver_beacons(location, altitude, name, receiver_name, dstcall, timestamp,
version, platform, cpu_load, free_ram, total_ram, ntp_error, rt_crystal_correction, voltage,
amperage, cpu_temp, senders_visible, senders_total, rec_input_noise, senders_signal,
senders_messages, good_senders_signal, good_senders, good_and_bad_senders,
receiver_id)
SELECT sq.*
FROM ({}) sq
WHERE sq.receiver_id IS NOT NULL
ON CONFLICT DO NOTHING;
""".format(
get_merged_receiver_beacons_subquery(postfix)
)
db.session.execute(query)
db.session.commit()