From f443294553c81b24fbe2d782bb62459fcae9fab0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Fri, 22 May 2020 13:59:25 +0200 Subject: [PATCH] Added receiver status beacon and merge them to the recever_beacon table --- app/gateway/bulkimport.py | 238 +++++++++++++++++++++++++++-------- app/model/beacon.py | 2 +- app/model/receiver_beacon.py | 10 +- 3 files changed, 199 insertions(+), 51 deletions(-) diff --git a/app/gateway/bulkimport.py b/app/gateway/bulkimport.py index a5eb8d3..939bec1 100644 --- a/app/gateway/bulkimport.py +++ b/app/gateway/bulkimport.py @@ -28,7 +28,7 @@ AIRCRAFT_BEACON_TYPES = ["aprs_aircraft", "flarm", "tracker", "fanet", "lt24", " RECEIVER_BEACON_TYPES = ["aprs_receiver", "receiver"] # define fields we want to proceed -AIRCRAFT_BEACON_FIELDS = [ +AIRCRAFT_POSITION_BEACON_FIELDS = [ "location", "altitude", "name", @@ -59,15 +59,29 @@ AIRCRAFT_BEACON_FIELDS = [ "location_mgrs", "location_mgrs_short", "agl", + + "reference_timestamp" ] -RECEIVER_BEACON_FIELDS = [ +RECEIVER_POSITION_BEACON_FIELDS = [ "location", "altitude", "name", "dstcall", "receiver_name", "timestamp", + + "reference_timestamp" +] + +RECEIVER_STATUS_BEACON_FIELDS = [ + "name", + "dstcall", + "receiver_name", + "timestamp", + + "version", + "platform", ] @@ -122,7 +136,7 @@ class StringConverter: current_app.logger.error("Other Exception with string: {}".format(raw_string)) return - if message['aprs_type'] not in ('server', 'position'): + if message['aprs_type'] not in ('server', 'position', 'status'): return elif message['aprs_type'] == 'server' and self.auto_update_timestamp is True: @@ -150,8 +164,8 @@ class StringConverter: return message - def _get_aircraft_beacon_csv_string(self, message, none_character=''): - csv_string = "{0},{1},{2},{3},{4},{5},{6},{7},{8},{9},{10},{11},{12},{13},{14},{15},{16},{17},{18},{19},{20},{21},{22},{23},{24},{25},{26},{27},{28},{29}\n".format( + def _get_aircraft_position_beacon_csv_string(self, message, none_character=''): + csv_string = "{0},{1},{2},{3},{4},{5},{6},{7},{8},{9},{10},{11},{12},{13},{14},{15},{16},{17},{18},{19},{20},{21},{22},{23},{24},{25},{26},{27},{28},{29},{30}\n".format( message['location'], int(message['altitude']) if message['altitude'] else none_character, message['name'], @@ -182,17 +196,33 @@ class StringConverter: message['location_mgrs'], message['location_mgrs_short'], message['agl'] if 'agl' in message else none_character, #29 + + message['reference_timestamp'], ) return csv_string - def _get_receiver_beacon_csv_string(self, message, none_character=''): - csv_string = "{0},{1},{2},{3},{4},{5}\n".format( + def _get_receiver_position_beacon_csv_string(self, message, none_character=''): + csv_string = "{0},{1},{2},{3},{4},{5},{6}\n".format( message['location'], int(message['altitude']) if message['altitude'] else none_character, message['name'], message['dstcall'], message['receiver_name'], message['timestamp'], + + message['reference_timestamp'], + ) + return csv_string + + def _get_receiver_status_beacon_csv_string(self, message, none_character=''): + csv_string = "{0},{1},{2},{3},{4},{5}\n".format( + message['name'], + message['dstcall'], + message['receiver_name'], + message['timestamp'], + + message['version'] if 'version' in message and message['version'] else none_character, + message['platform'] if 'platform' in message and message['platform'] else none_character, ) return csv_string @@ -208,8 +238,8 @@ class FileFeeder(StringConverter): super().__init__(reference_timestamp, reference_timestamp_autoupdate) def __enter__(self): - self.aircraft_beacons_file.write(','.join(AIRCRAFT_BEACON_FIELDS)) - self.receiver_beacons_file.write(','.join(RECEIVER_BEACON_FIELDS)) + self.aircraft_beacons_file.write(','.join(AIRCRAFT_POSITION_BEACON_FIELDS)) + self.receiver_beacons_file.write(','.join(RECEIVER_POSITION_BEACON_FIELDS)) return self def __exit__(self, *args): @@ -219,10 +249,10 @@ class FileFeeder(StringConverter): def add(self, raw_string): message = self._convert(raw_string) if message['beacon_type'] in AIRCRAFT_BEACON_TYPES: - csv_string = self._get_aircraft_beacon_csv_string(message) + csv_string = self._get_aircraft_position_beacon_csv_string(message) self.aircraft_beacons_file.write(csv_string) elif message['beacon_type'] in RECEIVER_BEACON_TYPES: - csv_string = self._get_receiver_beacon_csv_string(message) + csv_string = self._get_receiver_position_beacon_csv_string(message) self.receiver_beacons_file.write(csv_string) @@ -231,8 +261,10 @@ class DbFeeder(StringConverter): self.reference_timestamp = reference_timestamp self.reference_timestamp_autoupdate = reference_timestamp_autoupdate - self.aircraft_beacons_buffer = StringIO() - self.receiver_beacons_buffer = StringIO() + self.aircraft_position_beacons_buffer = StringIO() + self.aircraft_status_beacons_buffer = StringIO() + self.receiver_position_beacons_buffer = StringIO() + self.receiver_status_beacons_buffer = StringIO() self.last_flush = datetime.utcnow() @@ -248,51 +280,56 @@ class DbFeeder(StringConverter): if not message: return - if message['beacon_type'] in AIRCRAFT_BEACON_TYPES: - csv_string = self._get_aircraft_beacon_csv_string(message, none_character=r'\N') - self.aircraft_beacons_buffer.write(csv_string) - elif message['beacon_type'] in RECEIVER_BEACON_TYPES: - csv_string = self._get_receiver_beacon_csv_string(message, none_character=r'\N') - self.receiver_beacons_buffer.write(csv_string) + if message['beacon_type'] in AIRCRAFT_BEACON_TYPES and message['aprs_type'] == 'position': + csv_string = self._get_aircraft_position_beacon_csv_string(message, none_character=r'\N') + self.aircraft_position_beacons_buffer.write(csv_string) + elif message['beacon_type'] in AIRCRAFT_BEACON_TYPES and message['aprs_type'] == 'status': + pass # ignore it + elif message['beacon_type'] in RECEIVER_BEACON_TYPES and message['aprs_type'] == 'position': + csv_string = self._get_receiver_position_beacon_csv_string(message, none_character=r'\N') + self.receiver_position_beacons_buffer.write(csv_string) + elif message['beacon_type'] in RECEIVER_BEACON_TYPES and message['aprs_type'] == 'status': + csv_string = self._get_receiver_status_beacon_csv_string(message, none_character=r'\N') + self.receiver_status_beacons_buffer.write(csv_string) else: - current_app.logger.error(f"Not supported beacon type, skipped: {raw_string}") + current_app.logger.error(f"Not supported. beacon_type: '{message['beacon_type']}', aprs_type: '{message['aprs_type']}', skipped: {raw_string}") if datetime.utcnow() - self.last_flush >= timedelta(seconds=1): self.flush() self.last_flush = datetime.utcnow() - def flush(self): + def _flush_position_beacons(self): connection = db.engine.raw_connection() cursor = connection.cursor() - self.aircraft_beacons_buffer.seek(0) - self.receiver_beacons_buffer.seek(0) + self.aircraft_position_beacons_buffer.seek(0) + self.receiver_position_beacons_buffer.seek(0) - cursor.execute("CREATE TEMPORARY TABLE aircraft_beacons_temp (LIKE aircraft_beacons) ON COMMIT DROP;") - cursor.execute("CREATE TEMPORARY TABLE receiver_beacons_temp (LIKE receiver_beacons) ON COMMIT DROP;") + cursor.execute("CREATE TEMPORARY TABLE aircraft_position_beacons_temp (LIKE aircraft_beacons) ON COMMIT DROP;") + cursor.execute("CREATE TEMPORARY TABLE receiver_position_beacons_temp (LIKE receiver_beacons) ON COMMIT DROP;") - cursor.copy_from(file=self.aircraft_beacons_buffer, table="aircraft_beacons_temp", sep=",", columns=AIRCRAFT_BEACON_FIELDS) - cursor.copy_from(file=self.receiver_beacons_buffer, table="receiver_beacons_temp", sep=",", columns=RECEIVER_BEACON_FIELDS) + cursor.copy_from(file=self.aircraft_position_beacons_buffer, table="aircraft_position_beacons_temp", sep=",", columns=AIRCRAFT_POSITION_BEACON_FIELDS) + cursor.copy_from(file=self.receiver_position_beacons_buffer, table="receiver_position_beacons_temp", sep=",", columns=RECEIVER_POSITION_BEACON_FIELDS) # Update receivers cursor.execute(""" INSERT INTO receivers AS r (name, location, altitude, firstseen, lastseen, timestamp) - SELECT DISTINCT ON (rbt.name) - rbt.name, - rbt.location, - rbt.altitude, + SELECT DISTINCT ON (rpbt.name) + rpbt.name, + rpbt.location, + rpbt.altitude, timezone('utc', NOW()) AS firstseen, timezone('utc', NOW()) AS lastseen, - rbt.timestamp - FROM receiver_beacons_temp AS rbt, + rpbt.timestamp + FROM receiver_position_beacons_temp AS rpbt, ( SELECT - rbt.name, + rpbt.name, MAX(timestamp) AS timestamp - FROM receiver_beacons_temp AS rbt - GROUP BY rbt.name + FROM receiver_position_beacons_temp AS rpbt + GROUP BY rpbt.name ) AS sq - WHERE rbt.name = sq.name AND rbt.timestamp = sq.timestamp + WHERE rpbt.name = sq.name AND rpbt.timestamp = sq.timestamp ON CONFLICT (name) DO UPDATE SET location = EXCLUDED.location, @@ -303,33 +340,65 @@ class DbFeeder(StringConverter): # Update agl cursor.execute(""" - UPDATE aircraft_beacons_temp AS abt + UPDATE aircraft_position_beacons_temp AS apbt SET - agl = ST_Value(e.rast, abt.location) + agl = ST_Value(e.rast, apbt.location) FROM elevation AS e - WHERE ST_Intersects(abt.location, e.rast) + WHERE ST_Intersects(apbt.location, e.rast) """) # ... update receiver related attributes: distance, radial, quality cursor.execute(""" - UPDATE aircraft_beacons_temp AS abt + UPDATE aircraft_position_beacons_temp AS apbt SET - distance = CAST(ST_DistanceSphere(r.location, abt.location) AS REAL), - radial = CASE WHEN Degrees(ST_Azimuth(r.location, abt.location)) >= 359.5 THEN 0 ELSE CAST(Degrees(ST_Azimuth(r.location, abt.location)) AS INT) END, - quality = CASE WHEN ST_DistanceSphere(r.location, abt.location) > 0 THEN CAST(abt.signal_quality + 20.0 * LOG(ST_DistanceSphere(r.location, abt.location) / 10000) AS REAL) ELSE NULL END + distance = CAST(ST_DistanceSphere(r.location, apbt.location) AS REAL), + radial = CASE WHEN Degrees(ST_Azimuth(r.location, apbt.location)) >= 359.5 THEN 0 ELSE CAST(Degrees(ST_Azimuth(r.location, apbt.location)) AS INT) END, + quality = CASE WHEN ST_DistanceSphere(r.location, apbt.location) > 0 THEN CAST(apbt.signal_quality + 20.0 * LOG(ST_DistanceSphere(r.location, apbt.location) / 10000) AS REAL) ELSE NULL END FROM receivers AS r - WHERE abt.receiver_name = r.name + WHERE apbt.receiver_name = r.name + """) + + # Update devices + cursor.execute(""" + INSERT INTO devices AS d (name, address, firstseen, lastseen, aircraft_type, stealth, software_version, hardware_version, real_address) + SELECT DISTINCT ON (apbt.name) + apbt.name, + apbt.address, + timezone('utc', NOW()) AS firstseen, + timezone('utc', NOW()) AS lastseen, + apbt.aircraft_type, + apbt.stealth, + apbt.software_version, + apbt.hardware_version, + apbt.real_address + FROM aircraft_position_beacons_temp AS apbt, + ( + SELECT + apbt.name, + MAX(timestamp) AS timestamp + FROM aircraft_position_beacons_temp AS apbt + GROUP BY apbt.name + ) AS sq + WHERE apbt.name = sq.name AND apbt.timestamp = sq.timestamp + ON CONFLICT (name) DO UPDATE + SET + lastseen = timezone('utc', NOW()), + aircraft_type = EXCLUDED.aircraft_type, + stealth = EXCLUDED.stealth, + software_version = COALESCE(EXCLUDED.software_version, d.software_version), + hardware_version = COALESCE(EXCLUDED.hardware_version, d.hardware_version), + real_address = COALESCE(EXCLUDED.real_address, d.real_address); """) # Insert all the beacons cursor.execute(""" INSERT INTO aircraft_beacons - SELECT * FROM aircraft_beacons_temp + SELECT * FROM aircraft_position_beacons_temp ON CONFLICT DO NOTHING; """) cursor.execute(""" INSERT INTO receiver_beacons - SELECT * FROM receiver_beacons_temp + SELECT * FROM receiver_position_beacons_temp ON CONFLICT DO NOTHING; """) connection.commit() @@ -337,9 +406,80 @@ class DbFeeder(StringConverter): cursor.close() connection.close() - self.aircraft_beacons_buffer = StringIO() - self.receiver_beacons_buffer = StringIO() + self.aircraft_position_beacons_buffer = StringIO() + self.receiver_position_beacons_buffer = StringIO() + def _flush_status_beacons(self): + connection = db.engine.raw_connection() + cursor = connection.cursor() + + self.aircraft_status_beacons_buffer.seek(0) + self.receiver_status_beacons_buffer.seek(0) + + cursor.execute("CREATE TEMPORARY TABLE aircraft_status_beacons_temp (LIKE aircraft_beacons) ON COMMIT DROP;") + cursor.execute("CREATE TEMPORARY TABLE receiver_status_beacons_temp (LIKE receiver_beacons) ON COMMIT DROP;") + + #cursor.copy_from(file=self.aircraft_status_beacons_buffer, table="aircraft_status_beacons_temp", sep=",", columns=AIRCRAFT_STATUS_BEACON_FIELDS) + cursor.copy_from(file=self.receiver_status_beacons_buffer, table="receiver_status_beacons_temp", sep=",", columns=RECEIVER_STATUS_BEACON_FIELDS) + + # Update receivers + cursor.execute(""" + INSERT INTO receivers AS r (name, timestamp, version, platform) + SELECT DISTINCT ON (rsbt.name) + rsbt.name, + rsbt.timestamp, + rsbt.version, + rsbt.platform + FROM receiver_status_beacons_temp AS rsbt, + ( + SELECT + rsbt.name, + MAX(timestamp) AS timestamp + FROM receiver_status_beacons_temp AS rsbt + GROUP BY rsbt.name + ) AS sq + WHERE rsbt.name = sq.name AND rsbt.timestamp = sq.timestamp + ON CONFLICT (name) DO UPDATE + SET + version = EXCLUDED.version, + platform = EXCLUDED.platform; + """) + + # Update receiver_beacons + cursor.execute(""" + INSERT INTO receiver_beacons AS rb (name, dstcall, receiver_name, timestamp, version, platform) + SELECT DISTINCT ON (rsbt.name) + rsbt.name, + rsbt.dstcall, + rsbt.receiver_name, + rsbt.timestamp, + rsbt.version, + rsbt.platform + FROM receiver_status_beacons_temp AS rsbt, + ( + SELECT + rsbt.name, + MAX(timestamp) AS timestamp + FROM receiver_status_beacons_temp AS rsbt + GROUP BY rsbt.name + ) AS sq + WHERE rsbt.name = sq.name AND rsbt.timestamp = sq.timestamp + ON CONFLICT (name, receiver_name, timestamp) DO UPDATE + SET + version = EXCLUDED.version, + platform = EXCLUDED.platform; + """) + connection.commit() + + cursor.close() + connection.close() + + self.aircraft_status_beacons_buffer = StringIO() + self.receiver_status_beacons_buffer = StringIO() + + def flush(self): + self._flush_position_beacons() + self._flush_status_beacons() def convert(sourcefile): print("Fast scan of file '{}'...".format(sourcefile), end='') diff --git a/app/model/beacon.py b/app/model/beacon.py index a7cfc2d..9bb4ab6 100644 --- a/app/model/beacon.py +++ b/app/model/beacon.py @@ -30,4 +30,4 @@ class Beacon(AbstractConcreteBase, db.Model): # Debug information raw_message = None - reference_timestamp = None + reference_timestamp = db.Column(db.DateTime, nullable=False) diff --git a/app/model/receiver_beacon.py b/app/model/receiver_beacon.py index 509d17e..4374e82 100644 --- a/app/model/receiver_beacon.py +++ b/app/model/receiver_beacon.py @@ -1,3 +1,5 @@ +from app import db + from .beacon import Beacon @@ -9,12 +11,18 @@ class ReceiverBeacon(Beacon): track = None ground_speed = None + # Receiver specific data + version = db.Column(db.String) + platform = db.Column(db.String) + def __repr__(self): - return "" % ( + return "" % ( self.name, self.location, self.altitude, self.dstcall, self.receiver_name, self.timestamp, + self.version, + self.platform )