2015-12-01 22:04:13 +00:00
from ogn . commands . dbutils import engine , session
2016-10-31 12:47:27 +00:00
from ogn . model import Base , AddressOrigin , AircraftBeacon , ReceiverBeacon , Device , Receiver
2017-06-12 17:25:24 +00:00
from ogn . utils import get_airports , open_file
2016-06-21 17:34:05 +00:00
from ogn . collect . database import update_device_infos
2015-11-15 08:10:46 +00:00
2016-10-31 12:47:27 +00:00
from sqlalchemy import insert , distinct
from sqlalchemy . sql import null
2015-11-15 08:10:46 +00:00
from manager import Manager
manager = Manager ( )
2016-01-31 01:25:21 +00:00
ALEMBIC_CONFIG_FILE = " alembic.ini "
2015-11-15 18:31:58 +00:00
2015-11-15 08:10:46 +00:00
@manager.command
def init ( ) :
""" Initialize the database. """
2015-12-09 02:37:25 +00:00
2016-01-29 01:38:55 +00:00
from alembic . config import Config
from alembic import command
2016-06-05 06:33:13 +00:00
session . execute ( ' CREATE EXTENSION IF NOT EXISTS postgis; ' )
session . commit ( )
2015-11-15 08:10:46 +00:00
Base . metadata . create_all ( engine )
2016-01-31 01:25:21 +00:00
alembic_cfg = Config ( ALEMBIC_CONFIG_FILE )
2016-01-12 17:35:33 +00:00
command . stamp ( alembic_cfg , " head " )
2015-11-15 08:10:46 +00:00
print ( " Done. " )
2015-11-15 08:23:57 +00:00
2016-01-31 01:25:21 +00:00
@manager.command
def upgrade ( ) :
""" Upgrade database to the latest version. """
from alembic . config import Config
from alembic import command
alembic_cfg = Config ( ALEMBIC_CONFIG_FILE )
command . upgrade ( alembic_cfg , ' head ' )
2016-01-12 17:36:08 +00:00
@manager.command
def drop ( sure = ' n ' ) :
""" Drop all tables. """
if sure == ' y ' :
Base . metadata . drop_all ( engine )
print ( ' Dropped all tables. ' )
else :
print ( " Add argument ' --sure y ' to drop all tables. " )
2015-11-15 08:23:57 +00:00
@manager.command
2015-12-09 02:41:58 +00:00
def import_ddb ( ) :
""" Import registered devices from the DDB. """
2015-11-24 07:20:28 +00:00
2015-12-09 02:41:58 +00:00
print ( " Import registered devices fom the DDB... " )
2016-06-21 17:34:05 +00:00
address_origin = AddressOrigin . ogn_ddb
2016-07-17 20:57:54 +00:00
counter = update_device_infos ( session ,
address_origin )
2015-11-15 11:10:20 +00:00
print ( " Imported %i devices. " % counter )
2015-12-01 22:04:13 +00:00
@manager.command
2015-12-09 02:41:58 +00:00
def import_file ( path = ' tests/custom_ddb.txt ' ) :
""" Import registered devices from a local file. """
# (flushes previously manually imported entries)
2015-12-01 22:04:13 +00:00
2015-12-09 02:41:58 +00:00
print ( " Import registered devices from ' {} ' ... " . format ( path ) )
2016-06-21 17:34:05 +00:00
address_origin = AddressOrigin . user_defined
2016-07-17 20:57:54 +00:00
counter = update_device_infos ( session ,
address_origin ,
csvfile = path )
2015-12-09 02:41:58 +00:00
print ( " Imported %i devices. " % counter )
2016-04-22 08:44:39 +00:00
@manager.command
2016-05-22 05:27:21 +00:00
def import_airports ( path = ' tests/SeeYou.cup ' ) :
2016-04-22 08:44:39 +00:00
""" Import airports from a " .cup " file """
print ( " Import airports from ' {} ' ... " . format ( path ) )
airports = get_airports ( path )
session . bulk_save_objects ( airports )
session . commit ( )
print ( " Imported {} airports. " . format ( len ( airports ) ) )
2016-10-31 12:47:27 +00:00
@manager.command
def update_relations ( ) :
""" Update AircraftBeacon and ReceiverBeacon relations """
# Create missing Receiver from ReceiverBeacon
available_receivers = session . query ( Receiver . name ) \
. subquery ( )
missing_receiver_query = session . query ( distinct ( ReceiverBeacon . name ) ) \
. filter ( ReceiverBeacon . receiver_id == null ( ) ) \
. filter ( ~ ReceiverBeacon . name . in_ ( available_receivers ) )
ins = insert ( Receiver ) . from_select ( [ Receiver . name ] , missing_receiver_query )
session . execute ( ins )
# Create missing Device from AircraftBeacon
available_addresses = session . query ( Device . address ) \
. subquery ( )
missing_addresses_query = session . query ( distinct ( AircraftBeacon . address ) ) \
. filter ( AircraftBeacon . device_id == null ( ) ) \
. filter ( ~ AircraftBeacon . address . in_ ( available_addresses ) )
ins2 = insert ( Device ) . from_select ( [ Device . address ] , missing_addresses_query )
session . execute ( ins2 )
2017-06-03 09:38:55 +00:00
session . commit ( )
print ( " Inserted {} Receivers and {} Devices " . format ( ins , ins2 ) )
return
2016-10-31 12:47:27 +00:00
# Update AircraftBeacons
upd = session . query ( AircraftBeacon ) \
. filter ( AircraftBeacon . device_id == null ( ) ) \
. filter ( AircraftBeacon . receiver_id == null ( ) ) \
. filter ( AircraftBeacon . address == Device . address ) \
. filter ( AircraftBeacon . receiver_name == Receiver . name ) \
. update ( { AircraftBeacon . device_id : Device . id ,
AircraftBeacon . receiver_id : Receiver . id } ,
synchronize_session = ' fetch ' )
upd2 = session . query ( ReceiverBeacon ) \
. filter ( ReceiverBeacon . receiver_id == null ( ) ) \
. filter ( ReceiverBeacon . receiver_name == Receiver . name ) \
. update ( { Receiver . name : ReceiverBeacon . receiver_name } ,
synchronize_session = ' fetch ' )
session . commit ( )
print ( " Updated {} AircraftBeacons and {} ReceiverBeacons " .
format ( upd , upd2 ) )
2017-06-03 09:38:55 +00:00
@manager.command
2017-06-04 09:52:48 +00:00
def import_csv_logfile ( path , logfile = ' main.log ' , loglevel = ' INFO ' ) :
2017-06-03 09:38:55 +00:00
""" Import csv logfile <arg: csv logfile>. """
2017-06-04 09:52:48 +00:00
import datetime
import os
if os . path . isfile ( path ) :
print ( " {} : Importing file: {} " . format ( datetime . datetime . now ( ) , path ) )
import_logfile ( path )
elif os . path . isdir ( path ) :
print ( " {} : Scanning path: {} " . format ( datetime . datetime . now ( ) , path ) )
for filename in os . listdir ( path ) :
print ( " {} : Importing file: {} " . format ( datetime . datetime . now ( ) , filename ) )
import_logfile ( os . path . join ( path , filename ) )
else :
print ( " {} : Path {} not found. " . format ( datetime . datetime . now ( ) , path ) )
print ( " {} : Finished. " . format ( datetime . datetime . now ( ) ) )
def import_logfile ( path ) :
2017-06-12 18:52:19 +00:00
import os
import re
head , tail = os . path . split ( path )
match = re . search ( ' ^.+ \ .csv \ _( \ d {4} \ - \ d {2} \ - \ d {2} ).+?$ ' , tail )
if match :
reference_date_string = match . group ( 1 )
else :
print ( " filename ' {} ' does not match pattern. Skipping " . format ( path ) )
return
2017-06-12 17:25:24 +00:00
f = open_file ( path )
header = f . readline ( ) . strip ( )
2017-06-04 09:52:48 +00:00
f . close ( )
aircraft_beacon_header = ' , ' . join ( AircraftBeacon . get_csv_columns ( ) )
receiver_beacon_header = ' , ' . join ( ReceiverBeacon . get_csv_columns ( ) )
if header == aircraft_beacon_header :
2017-06-12 18:52:19 +00:00
if check_no_beacons ( ' aircraft_beacon ' , reference_date_string ) :
import_aircraft_beacon_logfile ( path )
else :
print ( " For {} beacons already exist. Skipping " . format ( reference_date_string ) )
2017-06-04 09:52:48 +00:00
elif header == receiver_beacon_header :
2017-06-12 18:52:19 +00:00
if check_no_beacons ( ' receiver_beacon ' , reference_date_string ) :
import_receiver_beacon_logfile ( path )
else :
print ( " For {} beacons already exist. Skipping " . format ( reference_date_string ) )
2017-06-04 09:52:48 +00:00
else :
print ( " Unknown file type: {} " . format ( ) )
2017-06-12 18:52:19 +00:00
def check_no_beacons ( tablename , reference_date_string ) :
result = session . execute ( """ SELECT * FROM {} WHERE timestamp BETWEEN ' {} 00:00:00 ' AND ' {} 23:59:59 ' LIMIT 1 """ . format ( tablename , reference_date_string , reference_date_string ) )
if result . fetchall ( ) :
return False
else :
return True
2017-06-04 09:52:48 +00:00
def import_aircraft_beacon_logfile ( csv_logfile ) :
2017-06-03 09:38:55 +00:00
SQL_TEMPTABLE_STATEMENT = """
2017-06-04 11:38:49 +00:00
DROP TABLE IF EXISTS aircraft_beacon_temp ;
2017-06-03 09:38:55 +00:00
CREATE TABLE aircraft_beacon_temp (
location geometry ,
altitude integer ,
name character varying ,
receiver_name character varying ( 9 ) ,
" timestamp " timestamp without time zone ,
track integer ,
ground_speed double precision ,
address_type smallint ,
aircraft_type smallint ,
stealth boolean ,
address character varying ( 6 ) ,
climb_rate double precision ,
turn_rate double precision ,
flightlevel double precision ,
signal_quality double precision ,
error_count integer ,
frequency_offset double precision ,
gps_status character varying ,
software_version double precision ,
hardware_version smallint ,
real_address character varying ( 6 ) ,
signal_power double precision
2017-06-04 11:38:49 +00:00
) ;
2017-06-03 09:38:55 +00:00
"""
session . execute ( SQL_TEMPTABLE_STATEMENT )
SQL_COPY_STATEMENT = """
COPY aircraft_beacon_temp ( % s ) FROM STDIN WITH
CSV
HEADER
DELIMITER AS ' , '
"""
2017-06-12 17:25:24 +00:00
file = open_file ( csv_logfile )
2017-06-03 09:38:55 +00:00
column_names = ' , ' . join ( AircraftBeacon . get_csv_columns ( ) )
sql = SQL_COPY_STATEMENT % column_names
2017-06-04 09:52:48 +00:00
print ( " Start importing logfile: {} " . format ( csv_logfile ) )
2017-06-03 09:38:55 +00:00
conn = session . connection ( ) . connection
cursor = conn . cursor ( )
cursor . copy_expert ( sql = sql , file = file )
conn . commit ( )
cursor . close ( )
2017-06-12 17:25:24 +00:00
file . close ( )
2017-06-03 09:38:55 +00:00
print ( " Read logfile into temporary table " )
# create device if not exist
session . execute ( """
INSERT INTO device ( address )
SELECT DISTINCT ( t . address )
FROM aircraft_beacon_temp t
WHERE NOT EXISTS ( SELECT 1 FROM device d WHERE d . address = t . address )
""" )
print ( " Inserted missing Devices " )
# create receiver if not exist
session . execute ( """
INSERT INTO receiver ( name )
SELECT DISTINCT ( t . receiver_name )
FROM aircraft_beacon_temp t
WHERE NOT EXISTS ( SELECT 1 FROM receiver r WHERE r . name = t . receiver_name )
""" )
print ( " Inserted missing Receivers " )
session . execute ( """
INSERT INTO aircraft_beacon ( location , altitude , name , receiver_name , timestamp , track , ground_speed ,
address_type , aircraft_type , stealth , address , climb_rate , turn_rate , flightlevel , signal_quality , error_count , frequency_offset , gps_status , software_version , hardware_version , real_address , signal_power ,
status , receiver_id , device_id )
SELECT t . location , t . altitude , t . name , t . receiver_name , t . timestamp , t . track , t . ground_speed ,
t . address_type , t . aircraft_type , t . stealth , t . address , t . climb_rate , t . turn_rate , t . flightlevel , t . signal_quality , t . error_count , t . frequency_offset , t . gps_status , t . software_version , t . hardware_version , t . real_address , t . signal_power ,
0 , r . id , d . id
FROM aircraft_beacon_temp t , receiver r , device d
WHERE t . receiver_name = r . name AND t . address = d . address
""" )
print ( " Wrote AircraftBeacons from temporary table into final table " )
session . execute ( """ DROP TABLE aircraft_beacon_temp """ )
print ( " Dropped temporary table " )
session . commit ( )
print ( " Finished " )
2017-06-04 09:52:48 +00:00
def import_receiver_beacon_logfile ( csv_logfile ) :
2017-06-03 09:38:55 +00:00
""" Import csv logfile <arg: csv logfile>. """
SQL_TEMPTABLE_STATEMENT = """
2017-06-04 11:38:49 +00:00
DROP TABLE IF EXISTS receiver_beacon_temp ;
2017-06-03 09:38:55 +00:00
CREATE TABLE receiver_beacon_temp (
location geometry ,
altitude integer ,
name character varying ,
receiver_name character varying ( 9 ) ,
" timestamp " timestamp without time zone ,
track integer ,
ground_speed double precision ,
version character varying ,
platform character varying ,
cpu_load double precision ,
free_ram double precision ,
total_ram double precision ,
ntp_error double precision ,
rt_crystal_correction double precision ,
voltage double precision ,
amperage double precision ,
cpu_temp double precision ,
senders_visible integer ,
senders_total integer ,
rec_input_noise double precision ,
senders_signal double precision ,
senders_messages integer ,
good_senders_signal double precision ,
good_senders integer ,
good_and_bad_senders integer
2017-06-04 11:38:49 +00:00
) ;
2017-06-03 09:38:55 +00:00
"""
session . execute ( SQL_TEMPTABLE_STATEMENT )
SQL_COPY_STATEMENT = """
COPY receiver_beacon_temp ( % s ) FROM STDIN WITH
CSV
HEADER
DELIMITER AS ' , '
"""
2017-06-12 17:25:24 +00:00
file = open_file ( csv_logfile )
2017-06-03 09:38:55 +00:00
column_names = ' , ' . join ( ReceiverBeacon . get_csv_columns ( ) )
sql = SQL_COPY_STATEMENT % column_names
2017-06-04 09:52:48 +00:00
print ( " Start importing logfile: {} " . format ( csv_logfile ) )
2017-06-03 09:38:55 +00:00
conn = session . connection ( ) . connection
cursor = conn . cursor ( )
cursor . copy_expert ( sql = sql , file = file )
conn . commit ( )
cursor . close ( )
2017-06-12 17:25:24 +00:00
file . close ( )
2017-06-03 09:38:55 +00:00
print ( " Read logfile into temporary table " )
# create receiver if not exist
session . execute ( """
INSERT INTO receiver ( name )
SELECT DISTINCT ( t . name )
FROM receiver_beacon_temp t
WHERE NOT EXISTS ( SELECT 1 FROM receiver r WHERE r . name = t . name )
""" )
print ( " Inserted missing Receivers " )
session . execute ( """
INSERT INTO receiver_beacon ( location , altitude , name , receiver_name , timestamp , track , ground_speed ,
version , platform , cpu_load , free_ram , total_ram , ntp_error , rt_crystal_correction , voltage , amperage , cpu_temp , senders_visible , senders_total , rec_input_noise , senders_signal , senders_messages , good_senders_signal , good_senders , good_and_bad_senders ,
status , receiver_id )
SELECT t . location , t . altitude , t . name , t . receiver_name , t . timestamp , t . track , t . ground_speed ,
t . version , t . platform , t . cpu_load , t . free_ram , t . total_ram , t . ntp_error , t . rt_crystal_correction , t . voltage , amperage , t . cpu_temp , t . senders_visible , t . senders_total , t . rec_input_noise , t . senders_signal , t . senders_messages , t . good_senders_signal , t . good_senders , t . good_and_bad_senders ,
0 , r . id
FROM receiver_beacon_temp t , receiver r
WHERE t . name = r . name
""" )
print ( " Wrote ReceiverBeacons from temporary table into final table " )
session . execute ( """ DROP TABLE receiver_beacon_temp """ )
print ( " Dropped temporary table " )
session . commit ( )
print ( " Finished " )