2019-02-10 17:39:06 +00:00
from flask . cli import AppGroup
import click
2018-11-28 06:37:35 +00:00
import psycopg2
from tqdm import tqdm
from io import StringIO
2019-02-10 17:39:06 +00:00
from ogn_python . model import AircraftBeacon , ReceiverBeacon
from ogn_python . utils import open_file
from ogn_python . commands . database import get_database_days
2017-10-03 11:31:24 +00:00
2019-02-10 17:39:06 +00:00
from ogn_python import db
user_cli = AppGroup ( ' bulkimport ' )
user_cli . help = " Tools for accelerated data import. "
ALEMBIC_CONFIG_FILE = " alembic.ini "
2017-06-19 18:59:00 +00:00
2019-01-01 19:13:08 +00:00
2018-11-28 06:37:35 +00:00
class LogfileDbSaver ( ) :
def __init__ ( self ) :
""" Establish the database connection. """
try :
2019-01-01 19:13:08 +00:00
self . conn = psycopg2 . connect ( database = " ogn " , user = " postgres " , password = " postgres " , host = " localhost " , port = " 5432 " )
except Exception as e :
2018-11-28 06:37:35 +00:00
raise Exception ( " I am unable to connect to the database " )
self . cur = self . conn . cursor ( )
2017-10-06 19:38:08 +00:00
2018-11-28 06:37:35 +00:00
def __enter__ ( self ) :
return self
2017-06-19 18:59:00 +00:00
2018-11-28 06:37:35 +00:00
def __exit__ ( self , type , value , traceback ) :
""" Closes the database connection. """
2017-06-23 06:13:51 +00:00
2018-11-28 06:37:35 +00:00
self . cur . close ( )
self . conn . close ( )
2018-09-03 17:58:35 +00:00
2018-11-28 06:37:35 +00:00
def set_datestr ( self , datestr ) :
""" Sets the datestr of the current tables. """
2017-06-23 06:13:51 +00:00
2018-11-28 06:37:35 +00:00
self . prefix = datestr . replace ( ' - ' , ' _ ' )
self . aircraft_table = ' aircraft_beacons_ {} ' . format ( self . prefix )
self . receiver_table = ' receiver_beacons_ {} ' . format ( self . prefix )
self . aircraft_buffer = StringIO ( )
self . receiver_buffer = StringIO ( )
2017-06-23 06:13:51 +00:00
2018-11-28 06:37:35 +00:00
def get_datestrs ( self , no_index_only = False ) :
""" Get the date strings from imported log files. """
2017-06-23 06:13:51 +00:00
2019-01-30 19:29:56 +00:00
index_clause = " AND hasindexes = FALSE " if no_index_only else " "
2017-12-18 21:54:00 +00:00
2019-01-01 19:13:08 +00:00
self . cur . execute ( ( """
SELECT DISTINCT ( RIGHT ( tablename , 10 ) )
FROM pg_catalog . pg_tables
WHERE schemaname = ' public ' AND tablename LIKE ' aircraft_beacons_ % ' { }
ORDER BY RIGHT ( tablename , 10 ) ;
""" .format(index_clause)))
2017-10-06 19:38:08 +00:00
2018-11-28 06:37:35 +00:00
return [ datestr [ 0 ] . replace ( ' _ ' , ' - ' ) for datestr in self . cur . fetchall ( ) ]
def create_tables ( self ) :
2019-01-30 19:29:56 +00:00
""" Create date dependent tables for log file import. """
2017-10-06 19:38:08 +00:00
2018-09-03 17:58:35 +00:00
try :
2018-11-28 06:37:35 +00:00
self . cur . execute ( ' CREATE EXTENSION IF NOT EXISTS postgis; ' )
self . cur . execute ( ' CREATE EXTENSION IF NOT EXISTS btree_gist; ' )
2019-02-02 09:46:04 +00:00
self . cur . execute ( ' DROP TABLE IF EXISTS " {0} " ; CREATE TABLE {0} AS TABLE aircraft_beacons WITH NO DATA; ' . format ( self . aircraft_table ) )
self . cur . execute ( ' DROP TABLE IF EXISTS " {0} " ; CREATE TABLE {0} AS TABLE receiver_beacons WITH NO DATA; ' . format ( self . receiver_table ) )
2018-11-28 06:37:35 +00:00
self . conn . commit ( )
2019-01-01 19:13:08 +00:00
except Exception as e :
2018-11-28 06:37:35 +00:00
raise Exception ( " I can ' t create the tables " )
def add ( self , beacon ) :
""" Adds the values of the beacon to the buffer. """
value_string = ' , ' . join ( [ str ( value ) for value in beacon . get_values ( ) ] ) + ' \n '
if isinstance ( beacon , AircraftBeacon ) :
self . aircraft_buffer . write ( value_string )
elif isinstance ( beacon , ReceiverBeacon ) :
self . receiver_buffer . write ( value_string )
def flush ( self ) :
""" Writes the buffer into the tables and reset the buffer. """
self . aircraft_buffer . seek ( 0 )
self . receiver_buffer . seek ( 0 )
self . cur . copy_from ( self . aircraft_buffer , self . aircraft_table , sep = ' , ' , null = ' None ' , columns = AircraftBeacon . get_columns ( ) )
self . cur . copy_from ( self . receiver_buffer , self . receiver_table , sep = ' , ' , null = ' None ' , columns = ReceiverBeacon . get_columns ( ) )
self . conn . commit ( )
self . aircraft_buffer = StringIO ( )
self . receiver_buffer = StringIO ( )
def export_to_path ( self , path ) :
import os , gzip
aircraft_beacons_file = os . path . join ( path , self . aircraft_table + ' .csv.gz ' )
2018-12-08 08:00:11 +00:00
with gzip . open ( aircraft_beacons_file , ' wt ' , encoding = ' utf-8 ' ) as gzip_file :
self . cur . copy_expert ( " COPY ( {} ) TO STDOUT WITH (DELIMITER ' , ' , FORMAT CSV, HEADER, ENCODING ' UTF-8 ' ); " . format ( self . get_merged_aircraft_beacons_subquery ( ) ) , gzip_file )
2018-11-28 06:37:35 +00:00
receiver_beacons_file = os . path . join ( path , self . receiver_table + ' .csv.gz ' )
2018-12-08 08:00:11 +00:00
with gzip . open ( receiver_beacons_file , ' wt ' ) as gzip_file :
self . cur . copy_expert ( " COPY ( {} ) TO STDOUT WITH (DELIMITER ' , ' , FORMAT CSV, HEADER, ENCODING ' UTF-8 ' ); " . format ( self . get_merged_receiver_beacons_subquery ( ) ) , gzip_file )
2018-11-28 06:37:35 +00:00
def create_indices ( self ) :
2019-01-01 19:13:08 +00:00
""" Creates indices for aircraft- and receiver-beacons. """
2018-11-28 06:37:35 +00:00
self . cur . execute ( """
CREATE INDEX IF NOT EXISTS ix_ { 0 } _timestamp_name_receiver_name ON " {0} " ( timestamp , name , receiver_name ) ;
2019-01-01 19:13:08 +00:00
CREATE INDEX IF NOT EXISTS ix_ { 0 } _device_id_timestamp_error_count ON " {0} " ( device_id , timestamp , error_count ) ;
2018-11-28 06:37:35 +00:00
CREATE INDEX IF NOT EXISTS ix_ { 1 } _timestamp_name_receiver_name ON " {1} " ( timestamp , name , receiver_name ) ;
""" .format(self.aircraft_table, self.receiver_table))
self . conn . commit ( )
def add_missing_devices ( self ) :
""" Add missing devices. """
self . cur . execute ( """
INSERT INTO devices ( address )
SELECT DISTINCT ( ab . address )
FROM " {} " AS ab
WHERE NOT EXISTS ( SELECT 1 FROM devices AS d WHERE d . address = ab . address )
ORDER BY ab . address ;
""" .format(self.aircraft_table))
self . conn . commit ( )
def add_missing_receivers ( self ) :
""" Add missing receivers. """
self . cur . execute ( """
INSERT INTO receivers ( name )
SELECT DISTINCT ( rb . name )
FROM " {0} " AS rb
WHERE NOT EXISTS ( SELECT 1 FROM receivers AS r WHERE r . name = rb . name )
ORDER BY name ;
""" .format(self.receiver_table))
self . conn . commit ( )
def update_receiver_location ( self ) :
""" Updates the receiver location. We need this because we want the actual location for distance calculations. """
self . cur . execute ( """
UPDATE receivers AS r
2019-01-01 19:13:08 +00:00
SET location = sq . location ,
altitude = sq . altitude
2018-11-28 06:37:35 +00:00
FROM
2019-01-01 19:13:08 +00:00
( SELECT DISTINCT ON ( rb . receiver_id ) rb . receiver_id , rb . location , rb . altitude
2018-11-28 06:37:35 +00:00
FROM " {1} " AS rb
2019-01-01 19:13:08 +00:00
WHERE rb . location IS NOT NULL
ORDER BY rb . receiver_id , rb . timestamp
2018-11-28 06:37:35 +00:00
) AS sq
2019-01-01 19:13:08 +00:00
WHERE r . id = sq . receiver_id ;
2018-11-28 06:37:35 +00:00
""" .format(self.aircraft_table, self.receiver_table))
self . conn . commit ( )
def update_receiver_beacons ( self ) :
""" Updates the foreign keys. Due to performance reasons we use a new table instead of updating the old. """
self . cur . execute ( """
SELECT
rb . location , rb . altitude , rb . name , rb . receiver_name , rb . dstcall , rb . timestamp ,
2019-02-11 21:13:09 +00:00
rb . version , rb . platform , rb . cpu_load , rb . free_ram , rb . total_ram , rb . ntp_error , rb . rt_crystal_correction , rb . voltage , rb . amperage ,
rb . cpu_temp , rb . senders_visible , rb . senders_total , rb . rec_input_noise , rb . senders_signal , rb . senders_messages , rb . good_senders_signal ,
2018-11-28 06:37:35 +00:00
rb . good_senders , rb . good_and_bad_senders ,
r . id AS receiver_id
INTO " {0} _temp "
FROM " {0} " AS rb , receivers AS r
WHERE rb . name = r . name ;
DROP TABLE IF EXISTS " {0} " ;
ALTER TABLE " {0} _temp " RENAME TO " {0} " ;
""" .format(self.receiver_table))
self . conn . commit ( )
def update_aircraft_beacons ( self ) :
""" Updates the foreign keys and calculates distance/radial and quality and computes the altitude above ground level.
Elevation data has to be in the table ' elevation ' with srid 4326.
Due to performance reasons we use a new table instead of updating the old . """
self . cur . execute ( """
SELECT
ab . location , ab . altitude , ab . name , ab . dstcall , ab . relay , ab . receiver_name , ab . timestamp , ab . track , ab . ground_speed ,
ab . address_type , ab . aircraft_type , ab . stealth , ab . address , ab . climb_rate , ab . turn_rate , ab . signal_quality , ab . error_count ,
ab . frequency_offset , ab . gps_quality_horizontal , ab . gps_quality_vertical , ab . software_version , ab . hardware_version , ab . real_address , ab . signal_power ,
ab . location_mgrs ,
2019-01-23 12:06:50 +00:00
ab . location_mgrs_short ,
2018-11-28 06:37:35 +00:00
d . id AS device_id ,
r . id AS receiver_id ,
CASE WHEN ab . location IS NOT NULL AND r . location IS NOT NULL THEN CAST ( ST_DistanceSphere ( ab . location , r . location ) AS REAL ) ELSE NULL END AS distance ,
CASE WHEN ab . location IS NOT NULL AND r . location IS NOT NULL THEN CAST ( degrees ( ST_Azimuth ( ab . location , r . location ) ) AS SMALLINT ) ELSE NULL END AS radial ,
CASE WHEN ab . location IS NOT NULL AND r . location IS NOT NULL AND ST_DistanceSphere ( ab . location , r . location ) > 0 AND ab . signal_quality IS NOT NULL
THEN CAST ( signal_quality + 20 * log ( ST_DistanceSphere ( ab . location , r . location ) / 10000 ) AS REAL )
ELSE NULL
END AS quality ,
CAST ( ab . altitude - ST_Value ( e . rast , ab . location ) AS REAL ) AS agl
2019-01-01 19:13:08 +00:00
2018-11-28 06:37:35 +00:00
INTO " {0} _temp "
FROM " {0} " AS ab , devices AS d , receivers AS r , elevation AS e
WHERE ab . address = d . address AND receiver_name = r . name AND ST_Intersects ( e . rast , ab . location ) ;
DROP TABLE IF EXISTS " {0} " ;
ALTER TABLE " {0} _temp " RENAME TO " {0} " ;
""" .format(self.aircraft_table))
self . conn . commit ( )
def get_merged_aircraft_beacons_subquery ( self ) :
""" Some beacons are split into position and status beacon. With this query we merge them into one beacon. """
return """
SELECT
2018-12-08 08:00:11 +00:00
ST_AsEWKT ( MAX ( location ) ) AS location ,
2018-11-28 06:37:35 +00:00
MAX ( altitude ) AS altitude ,
name ,
MAX ( dstcall ) AS dstcall ,
MAX ( relay ) AS relay ,
receiver_name ,
timestamp ,
MAX ( track ) AS track ,
MAX ( ground_speed ) AS ground_speed ,
MAX ( address_type ) AS address_type ,
MAX ( aircraft_type ) AS aircraft_type ,
CAST ( MAX ( CAST ( stealth AS int ) ) AS boolean ) AS stealth ,
MAX ( address ) AS address ,
MAX ( climb_rate ) AS climb_rate ,
MAX ( turn_rate ) AS turn_rate ,
MAX ( signal_quality ) AS signal_quality ,
MAX ( error_count ) AS error_count ,
MAX ( frequency_offset ) AS frequency_offset ,
MAX ( gps_quality_horizontal ) AS gps_quality_horizontal ,
MAX ( gps_quality_vertical ) AS gps_quality_vertical ,
MAX ( software_version ) AS software_version ,
MAX ( hardware_version ) AS hardware_version ,
MAX ( real_address ) AS real_address ,
MAX ( signal_power ) AS signal_power ,
CAST ( MAX ( distance ) AS REAL ) AS distance ,
CAST ( MAX ( radial ) AS REAL ) AS radial ,
CAST ( MAX ( quality ) AS REAL ) AS quality ,
CAST ( MAX ( agl ) AS REAL ) AS agl ,
MAX ( location_mgrs ) AS location_mgrs ,
2019-01-23 12:06:50 +00:00
MAX ( location_mgrs_short ) AS location_mgrs_short ,
2018-11-28 06:37:35 +00:00
MAX ( receiver_id ) AS receiver_id ,
MAX ( device_id ) AS device_id
FROM " {0} " AS ab
GROUP BY timestamp , name , receiver_name
ORDER BY timestamp , name , receiver_name
""" .format(self.aircraft_table)
def get_merged_receiver_beacons_subquery ( self ) :
""" Some beacons are split into position and status beacon. With this query we merge them into one beacon. """
return """
SELECT
2018-12-08 08:00:11 +00:00
ST_AsEWKT ( MAX ( location ) ) AS location ,
2018-11-28 06:37:35 +00:00
MAX ( altitude ) AS altitude ,
name ,
receiver_name ,
MAX ( dstcall ) AS dstcall ,
timestamp ,
MAX ( version ) AS version ,
MAX ( platform ) AS platform ,
MAX ( cpu_load ) AS cpu_load ,
MAX ( free_ram ) AS free_ram ,
MAX ( total_ram ) AS total_ram ,
MAX ( ntp_error ) AS ntp_error ,
MAX ( rt_crystal_correction ) AS rt_crystal_correction ,
MAX ( voltage ) AS voltage ,
MAX ( amperage ) AS amperage ,
MAX ( cpu_temp ) AS cpu_temp ,
MAX ( senders_visible ) AS senders_visible ,
MAX ( senders_total ) AS senders_total ,
MAX ( rec_input_noise ) AS rec_input_noise ,
MAX ( senders_signal ) AS senders_signal ,
MAX ( senders_messages ) AS senders_messages ,
MAX ( good_senders_signal ) AS good_senders_signal ,
MAX ( good_senders ) AS good_senders ,
MAX ( good_and_bad_senders ) AS good_and_bad_senders ,
MAX ( receiver_id ) AS receiver_id
FROM " {0} " AS rb
GROUP BY timestamp , name , receiver_name
ORDER BY timestamp , name , receiver_name
""" .format(self.receiver_table)
def is_transfered ( self ) :
query = """
SELECT
1
FROM ( { } LIMIT 1 ) AS sq , aircraft_beacons AS ab
WHERE ab . timestamp = sq . timestamp AND ab . name = sq . name AND ab . receiver_name = sq . receiver_name ;
""" .format(self.get_merged_aircraft_beacons_subquery())
self . cur . execute ( query )
return len ( self . cur . fetchall ( ) ) == 1
2019-01-01 19:13:08 +00:00
def transfer_aircraft_beacons ( self ) :
2018-11-28 06:37:35 +00:00
query = """
INSERT INTO aircraft_beacons ( location , altitude , name , dstcall , relay , receiver_name , timestamp , track , ground_speed ,
address_type , aircraft_type , stealth , address , climb_rate , turn_rate , signal_quality , error_count , frequency_offset , gps_quality_horizontal , gps_quality_vertical , software_version , hardware_version , real_address , signal_power ,
2019-01-23 12:06:50 +00:00
distance , radial , quality , agl , location_mgrs , location_mgrs_short ,
2018-11-28 06:37:35 +00:00
receiver_id , device_id )
{ }
ON CONFLICT DO NOTHING ;
""" .format(self.get_merged_aircraft_beacons_subquery())
self . cur . execute ( query )
self . conn . commit ( )
2019-01-01 19:13:08 +00:00
def transfer_receiver_beacons ( self ) :
query = """
INSERT INTO receiver_beacons ( location , altitude , name , receiver_name , dstcall , timestamp ,
version , platform , cpu_load , free_ram , total_ram , ntp_error , rt_crystal_correction , voltage ,
amperage , cpu_temp , senders_visible , senders_total , rec_input_noise , senders_signal ,
senders_messages , good_senders_signal , good_senders , good_and_bad_senders ,
receiver_id )
{ }
ON CONFLICT DO NOTHING ;
""" .format(self.get_merged_receiver_beacons_subquery())
self . cur . execute ( query )
self . conn . commit ( )
2018-11-28 06:37:35 +00:00
def create_flights2d ( self ) :
query = """
INSERT INTO flights2d
(
date ,
device_id ,
path
)
SELECT sq5 . date ,
sq5 . device_id ,
st_collect ( sq5 . linestring order BY sq5 . part ) multilinestring
FROM (
SELECT sq4 . timestamp : : date AS date ,
sq4 . device_id ,
sq4 . part ,
st_makeline ( sq4 . location ORDER BY sq4 . timestamp ) linestring
FROM (
SELECT sq3 . timestamp ,
sq3 . location ,
sq3 . device_id ,
sum ( sq3 . ping ) OVER ( partition BY sq3 . timestamp : : date , sq3 . device_id ORDER BY sq3 . timestamp ) part
FROM (
SELECT sq2 . t1 AS timestamp ,
sq2 . l1 AS location ,
sq2 . d1 device_id ,
CASE
WHEN sq2 . t1 - sq2 . t2 < interval ' 100s '
AND st_distancesphere ( sq2 . l1 , sq2 . l2 ) < 1000 THEN 0
ELSE 1
END AS ping
FROM (
2019-01-01 19:13:08 +00:00
SELECT sq . timestamp t1 ,
lag ( sq . timestamp ) OVER ( partition BY sq . device_id ORDER BY sq . timestamp ) t2 ,
sq . location l1 ,
lag ( sq . location ) OVER ( partition BY sq . device_id ORDER BY sq . timestamp ) l2 ,
sq . device_id d1 ,
lag ( sq . device_id ) OVER ( partition BY sq . device_id ORDER BY sq . timestamp ) d2
2018-11-28 06:37:35 +00:00
FROM (
2019-01-01 19:13:08 +00:00
SELECT DISTINCT ON ( device_id , timestamp ) timestamp , device_id , location
2018-11-28 06:37:35 +00:00
FROM { }
2019-01-01 19:13:08 +00:00
WHERE device_id IS NOT NULL AND ground_speed > 250 AND agl < 100
ORDER BY device_id , timestamp , error_count ) sq ) sq2 ) sq3 ) sq4
2018-11-28 06:37:35 +00:00
GROUP BY sq4 . timestamp : : date ,
sq4 . device_id ,
sq4 . part ) sq5
GROUP BY sq5 . date ,
sq5 . device_id
ON CONFLICT DO NOTHING ;
""" .format(self.aircraft_table)
self . cur . execute ( query )
self . conn . commit ( )
def create_gaps2d ( self ) :
query = """
INSERT INTO gaps2d ( date , device_id , path )
SELECT sq3 . date ,
sq3 . device_id ,
ST_Collect ( sq3 . path )
FROM (
SELECT
sq2 . t1 : : DATE AS date ,
sq2 . d1 device_id ,
2018-12-04 06:27:56 +00:00
ST_MakeLine ( sq2 . l1 , sq2 . l2 ) path
2018-11-28 06:37:35 +00:00
FROM
(
SELECT sq . timestamp t1 ,
2018-12-04 06:27:56 +00:00
LAG ( sq . timestamp ) OVER ( PARTITION BY sq . timestamp : : DATE , sq . device_id ORDER BY sq . timestamp ) t2 ,
2018-11-28 06:37:35 +00:00
sq . location l1 ,
2018-12-04 06:27:56 +00:00
LAG ( sq . location ) OVER ( PARTITION BY sq . timestamp : : DATE , sq . device_id ORDER BY sq . timestamp ) l2 ,
2018-11-28 06:37:35 +00:00
sq . device_id d1 ,
2018-12-04 06:27:56 +00:00
LAG ( sq . device_id ) OVER ( PARTITION BY sq . timestamp : : DATE , sq . device_id ORDER BY sq . timestamp ) d2 ,
sq . agl a1 ,
2019-01-01 19:13:08 +00:00
LAG ( sq . agl ) over ( PARTITION BY sq . timestamp : : DATE , sq . device_id ORDER BY sq . timestamp ) a2
2018-11-28 06:37:35 +00:00
FROM
(
2019-01-01 19:13:08 +00:00
SELECT DISTINCT ON ( device_id , timestamp ) timestamp , device_id , location , agl
2018-11-28 06:37:35 +00:00
FROM { }
2019-01-01 19:13:08 +00:00
ORDER BY device_id , timestamp , error_count
) sq
) sq2
2018-12-04 06:27:56 +00:00
WHERE EXTRACT ( epoch FROM sq2 . t1 - sq2 . t2 ) > 300
AND ST_DistanceSphere ( sq2 . l1 , sq2 . l2 ) / EXTRACT ( epoch FROM sq2 . t1 - sq2 . t2 ) BETWEEN 15 AND 50
AND sq2 . a1 > 300 AND sq2 . a2 > 300
2018-11-28 06:37:35 +00:00
) sq3
GROUP BY sq3 . date , sq3 . device_id
ON CONFLICT DO NOTHING ;
""" .format(self.aircraft_table)
self . cur . execute ( query )
self . conn . commit ( )
2019-01-01 19:13:08 +00:00
2018-11-28 06:37:35 +00:00
def convert ( sourcefile , datestr , saver ) :
2019-02-10 17:39:06 +00:00
from ogn_python . gateway . process import string_to_message
from ogn_python . gateway . process_tools import AIRCRAFT_BEACON_TYPES , RECEIVER_BEACON_TYPES
2018-11-28 06:37:35 +00:00
from datetime import datetime
2017-06-23 06:13:51 +00:00
2018-11-28 06:37:35 +00:00
fin = open_file ( sourcefile )
2017-06-23 06:13:51 +00:00
# get total lines of the input file
2018-11-28 06:37:35 +00:00
total_lines = 0
for line in fin :
total_lines + = 1
fin . seek ( 0 )
2017-06-23 06:13:51 +00:00
2018-09-03 17:58:35 +00:00
current_line = 0
2018-11-28 06:37:35 +00:00
steps = 100000
reference_date = datetime . strptime ( datestr + ' 12:00:00 ' , ' % Y- % m- %d % H: % M: % S ' )
pbar = tqdm ( fin , total = total_lines )
for line in pbar :
pbar . set_description ( ' Importing {} ' . format ( sourcefile ) )
2017-06-23 06:13:51 +00:00
2018-09-03 17:58:35 +00:00
current_line + = 1
2018-11-28 06:37:35 +00:00
if current_line % steps == 0 :
2018-10-24 18:03:29 +00:00
saver . flush ( )
2018-09-03 17:58:35 +00:00
message = string_to_message ( line . strip ( ) , reference_date = reference_date )
if message is None :
continue
2018-10-21 15:34:03 +00:00
2019-01-01 19:13:08 +00:00
dictfilt = lambda x , y : dict ( [ ( i , x [ i ] ) for i in x if i in set ( y ) ] )
2018-11-28 06:37:35 +00:00
2018-10-21 15:34:03 +00:00
try :
2019-01-28 21:06:38 +00:00
if message [ ' beacon_type ' ] in AIRCRAFT_BEACON_TYPES :
2018-11-28 06:37:35 +00:00
message = dictfilt ( message , ( ' beacon_type ' , ' aprs_type ' , ' location_wkt ' , ' altitude ' , ' name ' , ' dstcall ' , ' relay ' , ' receiver_name ' , ' timestamp ' , ' track ' , ' ground_speed ' ,
' address_type ' , ' aircraft_type ' , ' stealth ' , ' address ' , ' climb_rate ' , ' turn_rate ' , ' signal_quality ' , ' error_count ' , ' frequency_offset ' , ' gps_quality_horizontal ' , ' gps_quality_vertical ' , ' software_version ' , ' hardware_version ' , ' real_address ' , ' signal_power ' ,
2019-01-23 12:06:50 +00:00
' distance ' , ' radial ' , ' quality ' , ' agl ' , ' location_mgrs ' , ' location_mgrs_short ' ,
2018-11-28 06:37:35 +00:00
' receiver_id ' , ' device_id ' ) )
beacon = AircraftBeacon ( * * message )
2019-01-28 21:06:38 +00:00
elif message [ ' beacon_type ' ] in RECEIVER_BEACON_TYPES :
2018-11-28 06:37:35 +00:00
if ' rec_crystal_correction ' in message :
del message [ ' rec_crystal_correction ' ]
del message [ ' rec_crystal_correction_fine ' ]
beacon = ReceiverBeacon ( * * message )
saver . add ( beacon )
2018-10-21 15:34:03 +00:00
except Exception as e :
print ( e )
2018-09-03 17:58:35 +00:00
2018-11-28 06:37:35 +00:00
saver . flush ( )
2017-06-23 06:13:51 +00:00
fin . close ( )
2019-02-10 17:39:06 +00:00
@user_cli.command ( ' file_import ' )
@click.argument ( ' path ' )
2018-11-28 06:37:35 +00:00
def file_import ( path ) :
""" Import APRS logfiles into separate logfile tables. """
import os
import re
# Get Filepaths and dates to import
results = list ( )
for ( root , dirs , files ) in os . walk ( path ) :
for file in sorted ( files ) :
match = re . match ( ' OGN_log \ .txt_([0-9] {4} \ -[0-9] {2} \ -[0-9] {2} ) \ .gz$ ' , file )
if match :
results . append ( { ' filepath ' : os . path . join ( root , file ) ,
' datestr ' : match . group ( 1 ) } )
with LogfileDbSaver ( ) as saver :
already_imported = saver . get_datestrs ( )
results = list ( filter ( lambda x : x [ ' datestr ' ] not in already_imported , results ) )
pbar = tqdm ( results )
for result in pbar :
filepath = result [ ' filepath ' ]
datestr = result [ ' datestr ' ]
pbar . set_description ( " Importing data for {} " . format ( datestr ) )
saver . set_datestr ( datestr )
saver . create_tables ( )
convert ( filepath , datestr , saver )
saver . add_missing_devices ( )
saver . add_missing_receivers ( )
2018-01-11 19:04:21 +00:00
2017-06-19 18:59:00 +00:00
2019-02-10 17:39:06 +00:00
@user_cli.command ( ' update ' )
2018-11-28 06:37:35 +00:00
def update ( ) :
""" Update beacons (add foreign keys, compute distance, bearing, ags, etc.) in separate logfile tables. """
with LogfileDbSaver ( ) as saver :
datestrs = saver . get_datestrs ( no_index_only = True )
2019-02-11 21:13:09 +00:00
datestrs = saver . get_datestrs ( )
2018-11-28 06:37:35 +00:00
pbar = tqdm ( datestrs )
for datestr in pbar :
pbar . set_description ( " Updating relations for {} " . format ( datestr ) )
saver . set_datestr ( datestr )
saver . update_receiver_location ( )
saver . update_aircraft_beacons ( )
2019-02-11 21:13:09 +00:00
saver . update_receiver_beacons ( )
2018-11-28 06:37:35 +00:00
saver . create_indices ( )
2018-01-11 19:04:21 +00:00
2019-01-01 19:13:08 +00:00
2019-02-10 17:39:06 +00:00
@user_cli.command ( ' transfer ' )
@click.argument ( ' start ' )
@click.argument ( ' end ' )
def transfer ( start , end ) :
2018-11-28 06:37:35 +00:00
""" Transfer beacons from separate logfile tables to beacon table. """
with LogfileDbSaver ( ) as saver :
2019-01-01 19:13:08 +00:00
if start is not None and end is not None :
dates = get_database_days ( start , end )
datestrs = [ date . strftime ( ' % Y_ % m_ %d ' ) for date in dates ]
else :
datestrs = saver . get_datestrs ( )
2018-11-28 06:37:35 +00:00
pbar = tqdm ( datestrs )
for datestr in pbar :
pbar . set_description ( " Transfer beacons for {} " . format ( datestr ) )
saver . set_datestr ( datestr )
if not saver . is_transfered ( ) :
2019-01-01 19:13:08 +00:00
saver . transfer_aircraft_beacons ( )
saver . transfer_receiver_beacons ( )
2017-06-19 18:59:00 +00:00
2019-02-10 17:39:06 +00:00
@user_cli.command ( ' create_flights2d ' )
2018-11-28 06:37:35 +00:00
def create_flights2d ( ) :
""" Create complete flight traces from logfile tables. """
2017-06-19 18:59:00 +00:00
2018-11-28 06:37:35 +00:00
with LogfileDbSaver ( ) as saver :
datestrs = saver . get_datestrs ( )
pbar = tqdm ( datestrs )
for datestr in pbar :
pbar . set_description ( " Create Flights2D for {} " . format ( datestr ) )
saver . set_datestr ( datestr )
saver . create_flights2d ( )
2017-06-19 18:59:00 +00:00
2019-01-01 19:13:08 +00:00
2019-02-10 17:39:06 +00:00
@user_cli.command ( ' create_gaps2d ' )
2018-11-28 06:37:35 +00:00
def create_gaps2d ( ) :
""" Create ' gaps ' from logfile tables. """
2017-06-19 18:59:00 +00:00
2018-11-28 06:37:35 +00:00
with LogfileDbSaver ( ) as saver :
datestrs = saver . get_datestrs ( )
pbar = tqdm ( datestrs )
for datestr in pbar :
pbar . set_description ( " Create Gaps2D for {} " . format ( datestr ) )
saver . set_datestr ( datestr )
saver . create_gaps2d ( )
2017-06-19 18:59:00 +00:00
2019-01-01 19:13:08 +00:00
2019-02-10 17:39:06 +00:00
@user_cli.command ( ' file_export ' )
@click.argument ( ' path ' )
2018-11-28 06:37:35 +00:00
def file_export ( path ) :
""" Export separate logfile tables to csv files. They can be used for fast bulk import with sql COPY command. """
2017-06-19 18:59:00 +00:00
import os
2018-11-28 06:37:35 +00:00
if not os . path . isdir ( path ) :
print ( " ' {} ' is not a path. Exiting " )
2017-06-19 18:59:00 +00:00
return
2018-11-28 06:37:35 +00:00
with LogfileDbSaver ( ) as saver :
datestrs = saver . get_datestrs ( )
2019-01-01 19:13:08 +00:00
datestrs = filter ( lambda x : x . startswith ( ' 2018-12 ' ) , datestrs )
2018-11-28 06:37:35 +00:00
pbar = tqdm ( datestrs )
for datestr in pbar :
pbar . set_description ( " Exporting data for {} " . format ( datestr ) )
saver . set_datestr ( datestr )
saver . export_to_path ( path )