diff --git a/app/gateway/bulkimport.py b/app/gateway/bulkimport.py index 7ba2cef..fba9f6c 100644 --- a/app/gateway/bulkimport.py +++ b/app/gateway/bulkimport.py @@ -1,6 +1,7 @@ import os import re from datetime import datetime, timedelta +import time from io import StringIO import gzip @@ -290,28 +291,31 @@ class DbFeeder(StringConverter): self.aircraft_position_beacons_buffer.seek(0) self.receiver_position_beacons_buffer.seek(0) - cursor.execute("CREATE TEMPORARY TABLE aircraft_position_beacons_temp (LIKE aircraft_beacons) ON COMMIT DROP;") - cursor.execute("CREATE TEMPORARY TABLE receiver_position_beacons_temp (LIKE receiver_beacons) ON COMMIT DROP;") + aircraft_position_beacons_temp_table_name = f"aircraft_position_beacons_temp_{str(time.time()).replace('.', '_')}" + receiver_position_beacons_temp_table_name = f"receiver_position_beacons_temp_{str(time.time()).replace('.', '_')}" + + cursor.execute(f"CREATE TEMPORARY TABLE {aircraft_position_beacons_temp_table_name} (LIKE aircraft_beacons) ON COMMIT DROP;") + cursor.execute(f"CREATE TEMPORARY TABLE {receiver_position_beacons_temp_table_name} (LIKE receiver_beacons) ON COMMIT DROP;") - cursor.copy_from(file=self.aircraft_position_beacons_buffer, table="aircraft_position_beacons_temp", sep=",", columns=AIRCRAFT_POSITION_BEACON_FIELDS) - cursor.copy_from(file=self.receiver_position_beacons_buffer, table="receiver_position_beacons_temp", sep=",", columns=RECEIVER_POSITION_BEACON_FIELDS) + cursor.copy_from(file=self.aircraft_position_beacons_buffer, table=aircraft_position_beacons_temp_table_name, sep=",", columns=AIRCRAFT_POSITION_BEACON_FIELDS) + cursor.copy_from(file=self.receiver_position_beacons_buffer, table=receiver_position_beacons_temp_table_name, sep=",", columns=RECEIVER_POSITION_BEACON_FIELDS) # Update receivers - cursor.execute(""" + cursor.execute(f""" INSERT INTO receivers AS r (name, location, altitude, firstseen, lastseen, timestamp) SELECT DISTINCT ON (rpbt.name) rpbt.name, rpbt.location, rpbt.altitude, - timezone('utc', NOW()) AS firstseen, - timezone('utc', NOW()) AS lastseen, + rpbt.reference_timestamp AS firstseen, + rpbt.reference_timestamp AS lastseen, rpbt.timestamp - FROM receiver_position_beacons_temp AS rpbt, + FROM {receiver_position_beacons_temp_table_name} AS rpbt, ( SELECT rpbt.name, MAX(timestamp) AS timestamp - FROM receiver_position_beacons_temp AS rpbt + FROM {receiver_position_beacons_temp_table_name} AS rpbt GROUP BY rpbt.name ) AS sq WHERE rpbt.name = sq.name AND rpbt.timestamp = sq.timestamp @@ -319,13 +323,13 @@ class DbFeeder(StringConverter): SET location = EXCLUDED.location, altitude = EXCLUDED.altitude, - lastseen = timezone('utc', NOW()), + lastseen = EXCLUDED.lastseen, timestamp = EXCLUDED.timestamp """) # Update agl - cursor.execute(""" - UPDATE aircraft_position_beacons_temp AS apbt + cursor.execute(f""" + UPDATE {aircraft_position_beacons_temp_table_name} AS apbt SET agl = ST_Value(e.rast, apbt.location) FROM elevation AS e @@ -333,8 +337,8 @@ class DbFeeder(StringConverter): """) # ... update receiver related attributes: distance, radial, quality - cursor.execute(""" - UPDATE aircraft_position_beacons_temp AS apbt + cursor.execute(f""" + UPDATE {aircraft_position_beacons_temp_table_name} AS apbt SET distance = CAST(ST_DistanceSphere(r.location, apbt.location) AS REAL), radial = CASE WHEN Degrees(ST_Azimuth(r.location, apbt.location)) >= 359.5 THEN 0 ELSE CAST(Degrees(ST_Azimuth(r.location, apbt.location)) AS INT) END, @@ -344,30 +348,30 @@ class DbFeeder(StringConverter): """) # Update devices - cursor.execute(""" + cursor.execute(f""" INSERT INTO devices AS d (name, address, firstseen, lastseen, aircraft_type, stealth, software_version, hardware_version, real_address) SELECT DISTINCT ON (apbt.name) apbt.name, apbt.address, - timezone('utc', NOW()) AS firstseen, - timezone('utc', NOW()) AS lastseen, + apbt.reference_timestamp AS firstseen, + apbt.reference_timestamp AS lastseen, apbt.aircraft_type, apbt.stealth, apbt.software_version, apbt.hardware_version, apbt.real_address - FROM aircraft_position_beacons_temp AS apbt, + FROM {aircraft_position_beacons_temp_table_name} AS apbt, ( SELECT apbt.name, MAX(timestamp) AS timestamp - FROM aircraft_position_beacons_temp AS apbt + FROM {aircraft_position_beacons_temp_table_name} AS apbt GROUP BY apbt.name ) AS sq WHERE apbt.name = sq.name AND apbt.timestamp = sq.timestamp ON CONFLICT (name) DO UPDATE SET - lastseen = timezone('utc', NOW()), + lastseen = EXCLUDED.lastseen, aircraft_type = EXCLUDED.aircraft_type, stealth = EXCLUDED.stealth, software_version = COALESCE(EXCLUDED.software_version, d.software_version), @@ -376,14 +380,14 @@ class DbFeeder(StringConverter): """) # Insert all the beacons - cursor.execute(""" + cursor.execute(f""" INSERT INTO aircraft_beacons - SELECT * FROM aircraft_position_beacons_temp + SELECT * FROM {aircraft_position_beacons_temp_table_name} ON CONFLICT DO NOTHING; """) - cursor.execute(""" + cursor.execute(f""" INSERT INTO receiver_beacons - SELECT * FROM receiver_position_beacons_temp + SELECT * FROM {receiver_position_beacons_temp_table_name} ON CONFLICT DO NOTHING; """) connection.commit() @@ -401,26 +405,29 @@ class DbFeeder(StringConverter): self.aircraft_status_beacons_buffer.seek(0) self.receiver_status_beacons_buffer.seek(0) - cursor.execute("CREATE TEMPORARY TABLE aircraft_status_beacons_temp (LIKE aircraft_beacons) ON COMMIT DROP;") - cursor.execute("CREATE TEMPORARY TABLE receiver_status_beacons_temp (LIKE receiver_beacons) ON COMMIT DROP;") + aircraft_status_beacons_temp_table_name = f"aircraft_status_beacons_temp_{str(time.time()).replace('.', '_')}" + receiver_status_beacons_temp_table_name = f"receiver_status_beacons_temp_{str(time.time()).replace('.', '_')}" + + cursor.execute(f"CREATE TEMPORARY TABLE {aircraft_status_beacons_temp_table_name} (LIKE aircraft_beacons) ON COMMIT DROP;") + cursor.execute(f"CREATE TEMPORARY TABLE {receiver_status_beacons_temp_table_name} (LIKE receiver_beacons) ON COMMIT DROP;") #cursor.copy_from(file=self.aircraft_status_beacons_buffer, table="aircraft_status_beacons_temp", sep=",", columns=AIRCRAFT_STATUS_BEACON_FIELDS) - cursor.copy_from(file=self.receiver_status_beacons_buffer, table="receiver_status_beacons_temp", sep=",", columns=RECEIVER_STATUS_BEACON_FIELDS) + cursor.copy_from(file=self.receiver_status_beacons_buffer, table=receiver_status_beacons_temp_table_name, sep=",", columns=RECEIVER_STATUS_BEACON_FIELDS) # Update receivers - cursor.execute(""" + cursor.execute(f""" INSERT INTO receivers AS r (name, timestamp, version, platform) SELECT DISTINCT ON (rsbt.name) rsbt.name, rsbt.timestamp, rsbt.version, rsbt.platform - FROM receiver_status_beacons_temp AS rsbt, + FROM {receiver_status_beacons_temp_table_name} AS rsbt, ( SELECT rsbt.name, MAX(timestamp) AS timestamp - FROM receiver_status_beacons_temp AS rsbt + FROM {receiver_status_beacons_temp_table_name} AS rsbt GROUP BY rsbt.name ) AS sq WHERE rsbt.name = sq.name AND rsbt.timestamp = sq.timestamp @@ -431,7 +438,7 @@ class DbFeeder(StringConverter): """) # Update receiver_beacons - cursor.execute(""" + cursor.execute(f""" INSERT INTO receiver_beacons AS rb (name, dstcall, receiver_name, timestamp, version, platform, reference_timestamp) SELECT DISTINCT ON (rsbt.name) rsbt.name, @@ -441,12 +448,12 @@ class DbFeeder(StringConverter): rsbt.version, rsbt.platform, rsbt.reference_timestamp - FROM receiver_status_beacons_temp AS rsbt, + FROM {receiver_status_beacons_temp_table_name} AS rsbt, ( SELECT rsbt.name, MAX(timestamp) AS timestamp - FROM receiver_status_beacons_temp AS rsbt + FROM {receiver_status_beacons_temp_table_name} AS rsbt GROUP BY rsbt.name ) AS sq WHERE rsbt.name = sq.name AND rsbt.timestamp = sq.timestamp