2015-12-09 02:41:58 +00:00
from celery . utils . log import get_task_logger
2016-02-03 22:09:11 +00:00
2017-12-08 18:24:33 +00:00
from sqlalchemy import insert , distinct
from sqlalchemy . sql import null , and_ , or_ , func , not_
from sqlalchemy . sql . expression import case
2016-02-03 22:09:11 +00:00
2017-12-16 20:08:01 +00:00
from ogn . collect . celery import app
2017-12-08 18:24:33 +00:00
from ogn . model import DeviceInfo , DeviceInfoOrigin , AircraftBeacon , ReceiverBeacon , Device , Receiver
from ogn . utils import get_ddb , get_country_code
2015-12-09 02:41:58 +00:00
2016-02-03 22:09:11 +00:00
2015-12-09 02:41:58 +00:00
logger = get_task_logger ( __name__ )
2016-05-22 05:26:02 +00:00
2016-07-17 20:57:54 +00:00
def update_device_infos ( session , address_origin , csvfile = None ) :
device_infos = get_ddb ( csvfile = csvfile , address_origin = address_origin )
2016-06-21 17:34:05 +00:00
session . query ( DeviceInfo ) \
. filter ( DeviceInfo . address_origin == address_origin ) \
2016-05-22 05:26:02 +00:00
. delete ( )
2016-06-21 17:34:05 +00:00
session . bulk_save_objects ( device_infos )
2015-12-09 02:41:58 +00:00
session . commit ( )
2016-06-21 17:34:05 +00:00
return len ( device_infos )
2015-12-09 02:41:58 +00:00
@app.task
2017-12-16 20:08:01 +00:00
def import_ddb ( session = None ) :
2015-12-09 02:41:58 +00:00
""" Import registered devices from the DDB. """
2017-12-16 20:08:01 +00:00
if session is None :
session = app . session
2015-12-09 02:41:58 +00:00
logger . info ( " Import registered devices fom the DDB... " )
2017-12-02 12:08:44 +00:00
address_origin = DeviceInfoOrigin . ogn_ddb
2016-07-01 05:00:17 +00:00
2017-12-16 20:08:01 +00:00
counter = update_device_infos ( session , address_origin )
2016-02-03 22:19:25 +00:00
logger . info ( " Imported {} devices. " . format ( counter ) )
2015-12-09 02:41:58 +00:00
2017-12-17 13:34:14 +00:00
return " Imported {} devices. " . format ( counter )
2015-12-09 02:41:58 +00:00
@app.task
2017-12-19 06:50:12 +00:00
def import_ddb_file ( session = None , path = ' tests/custom_ddb.txt ' ) :
2015-12-09 02:41:58 +00:00
""" Import registered devices from a local file. """
2017-12-16 20:08:01 +00:00
if session is None :
session = app . session
2015-12-09 02:41:58 +00:00
logger . info ( " Import registered devices from ' {} ' ... " . format ( path ) )
2017-12-02 12:08:44 +00:00
address_origin = DeviceInfoOrigin . user_defined
2016-07-01 05:00:17 +00:00
2017-12-16 20:08:01 +00:00
counter = update_device_infos ( session , address_origin , csvfile = path )
2016-02-03 22:19:25 +00:00
logger . info ( " Imported {} devices. " . format ( counter ) )
2017-12-08 18:24:33 +00:00
2017-12-17 13:34:14 +00:00
return " Imported {} devices. " . format ( counter )
2017-12-08 18:24:33 +00:00
@app.task
2017-12-16 20:08:01 +00:00
def update_devices ( session = None ) :
2017-12-08 18:24:33 +00:00
""" Add/update entries in devices table and update foreign keys in aircraft beacons. """
2017-12-16 20:08:01 +00:00
if session is None :
session = app . session
2017-12-08 18:24:33 +00:00
# Create missing Device from AircraftBeacon
2017-12-16 20:08:01 +00:00
available_devices = session . query ( Device . address ) \
2017-12-08 18:24:33 +00:00
. subquery ( )
2017-12-16 20:08:01 +00:00
missing_devices_query = session . query ( distinct ( AircraftBeacon . address ) ) \
2017-12-16 12:21:25 +00:00
. filter ( and_ ( AircraftBeacon . device_id == null ( ) , not_ ( AircraftBeacon . address . like ( ' 00 % ' ) ) , AircraftBeacon . error_count == 0 ) ) \
2017-12-08 18:24:33 +00:00
. filter ( ~ AircraftBeacon . address . in_ ( available_devices ) )
ins = insert ( Device ) . from_select ( [ Device . address ] , missing_devices_query )
2017-12-16 20:08:01 +00:00
res = session . execute ( ins )
2017-12-08 18:24:33 +00:00
insert_count = res . rowcount
2017-12-16 20:08:01 +00:00
session . commit ( )
2017-12-08 18:24:33 +00:00
# For each address in the new beacons: get firstseen, lastseen and last values != NULL
2017-12-16 20:08:01 +00:00
last_valid_values = session . query (
2017-12-08 20:32:50 +00:00
distinct ( AircraftBeacon . address ) . label ( ' address ' ) ,
2017-12-08 18:24:33 +00:00
func . first_value ( AircraftBeacon . timestamp )
2017-12-16 20:08:01 +00:00
. over ( partition_by = AircraftBeacon . address , order_by = case ( [ ( AircraftBeacon . timestamp == null ( ) , None ) ] , else_ = AircraftBeacon . timestamp ) . asc ( ) . nullslast ( ) )
2017-12-08 18:24:33 +00:00
. label ( ' firstseen ' ) ,
2017-12-16 20:08:01 +00:00
func . first_value ( AircraftBeacon . timestamp )
. over ( partition_by = AircraftBeacon . address , order_by = case ( [ ( AircraftBeacon . timestamp == null ( ) , None ) ] , else_ = AircraftBeacon . timestamp ) . desc ( ) . nullslast ( ) )
2017-12-08 18:24:33 +00:00
. label ( ' lastseen ' ) ,
func . first_value ( AircraftBeacon . aircraft_type )
2017-12-16 20:08:01 +00:00
. over ( partition_by = AircraftBeacon . address , order_by = case ( [ ( AircraftBeacon . aircraft_type == null ( ) , None ) ] , else_ = AircraftBeacon . timestamp ) . desc ( ) . nullslast ( ) )
2017-12-08 18:24:33 +00:00
. label ( ' aircraft_type ' ) ,
func . first_value ( AircraftBeacon . stealth )
2017-12-16 20:08:01 +00:00
. over ( partition_by = AircraftBeacon . address , order_by = case ( [ ( AircraftBeacon . stealth == null ( ) , None ) ] , else_ = AircraftBeacon . timestamp ) . desc ( ) . nullslast ( ) )
2017-12-08 18:24:33 +00:00
. label ( ' stealth ' ) ,
func . first_value ( AircraftBeacon . software_version )
2017-12-16 20:08:01 +00:00
. over ( partition_by = AircraftBeacon . address , order_by = case ( [ ( AircraftBeacon . software_version == null ( ) , None ) ] , else_ = AircraftBeacon . timestamp ) . desc ( ) . nullslast ( ) )
2017-12-08 18:24:33 +00:00
. label ( ' software_version ' ) ,
func . first_value ( AircraftBeacon . hardware_version )
2017-12-16 20:08:01 +00:00
. over ( partition_by = AircraftBeacon . address , order_by = case ( [ ( AircraftBeacon . hardware_version == null ( ) , None ) ] , else_ = AircraftBeacon . timestamp ) . desc ( ) . nullslast ( ) )
2017-12-08 18:24:33 +00:00
. label ( ' hardware_version ' ) ,
func . first_value ( AircraftBeacon . real_address )
2017-12-16 20:08:01 +00:00
. over ( partition_by = AircraftBeacon . address , order_by = case ( [ ( AircraftBeacon . real_address == null ( ) , None ) ] , else_ = AircraftBeacon . timestamp ) . desc ( ) . nullslast ( ) )
2017-12-08 18:24:33 +00:00
. label ( ' real_address ' ) ) \
2017-12-08 20:32:50 +00:00
. filter ( and_ ( AircraftBeacon . device_id == null ( ) , AircraftBeacon . error_count == 0 ) ) \
. subquery ( )
2017-12-16 20:08:01 +00:00
update_values = session . query (
2017-12-08 20:32:50 +00:00
Device . address ,
case ( [ ( or_ ( Device . firstseen == null ( ) , Device . firstseen > last_valid_values . c . firstseen ) , last_valid_values . c . firstseen ) ,
( Device . firstseen < = last_valid_values . c . firstseen , Device . firstseen ) ] ) . label ( ' firstseen ' ) ,
case ( [ ( or_ ( Device . lastseen == null ( ) , Device . lastseen < last_valid_values . c . lastseen ) , last_valid_values . c . lastseen ) ,
( Device . lastseen > = last_valid_values . c . lastseen , Device . lastseen ) ] ) . label ( ' lastseen ' ) ,
case ( [ ( or_ ( Device . aircraft_type == null ( ) , Device . lastseen < last_valid_values . c . lastseen ) , last_valid_values . c . aircraft_type ) ,
( Device . lastseen > = last_valid_values . c . lastseen , Device . aircraft_type ) ] ) . label ( ' aircraft_type ' ) ,
case ( [ ( or_ ( Device . stealth == null ( ) , Device . lastseen < last_valid_values . c . lastseen ) , last_valid_values . c . stealth ) ,
( Device . lastseen > = last_valid_values . c . lastseen , Device . stealth ) ] ) . label ( ' stealth ' ) ,
case ( [ ( or_ ( Device . software_version == null ( ) , Device . lastseen < last_valid_values . c . lastseen ) , last_valid_values . c . software_version ) ,
( Device . lastseen > = last_valid_values . c . lastseen , Device . software_version ) ] ) . label ( ' software_version ' ) ,
case ( [ ( or_ ( Device . hardware_version == null ( ) , Device . lastseen < last_valid_values . c . lastseen ) , last_valid_values . c . hardware_version ) ,
( Device . lastseen > = last_valid_values . c . lastseen , Device . hardware_version ) ] ) . label ( ' hardware_version ' ) ,
case ( [ ( or_ ( Device . real_address == null ( ) , Device . lastseen < last_valid_values . c . lastseen ) , last_valid_values . c . real_address ) ,
( Device . lastseen > = last_valid_values . c . lastseen , Device . real_address ) ] ) . label ( ' real_address ' ) ) \
. filter ( Device . address == last_valid_values . c . address ) \
. subquery ( )
2017-12-08 18:24:33 +00:00
2017-12-16 20:08:01 +00:00
update_receivers = session . query ( Device ) \
2017-12-08 18:24:33 +00:00
. filter ( Device . address == update_values . c . address ) \
2017-12-08 20:32:50 +00:00
. update ( {
Device . firstseen : update_values . c . firstseen ,
Device . lastseen : update_values . c . lastseen ,
Device . aircraft_type : update_values . c . aircraft_type ,
Device . stealth : update_values . c . stealth ,
Device . software_version : update_values . c . software_version ,
Device . hardware_version : update_values . c . hardware_version ,
Device . real_address : update_values . c . real_address } ,
synchronize_session = ' fetch ' )
2017-12-08 18:24:33 +00:00
# Update relations to aircraft beacons
2017-12-16 20:08:01 +00:00
upd = session . query ( AircraftBeacon ) \
2017-12-08 18:24:33 +00:00
. filter ( AircraftBeacon . device_id == null ( ) ) \
. filter ( AircraftBeacon . address == Device . address ) \
2017-12-08 20:32:50 +00:00
. update ( {
AircraftBeacon . device_id : Device . id } ,
synchronize_session = ' fetch ' )
2017-12-08 18:24:33 +00:00
2017-12-16 20:08:01 +00:00
session . commit ( )
2017-12-10 16:30:27 +00:00
logger . info ( " Devices: {} inserted, {} updated " . format ( insert_count , update_receivers ) )
logger . info ( " Updated {} AircraftBeacons " . format ( upd ) )
2017-12-08 18:24:33 +00:00
2017-12-12 08:15:31 +00:00
return " {} Devices inserted, {} Devices updated, {} AircraftBeacons updated " \
. format ( insert_count , update_receivers , upd )
2017-12-08 18:24:33 +00:00
@app.task
2017-12-16 20:08:01 +00:00
def update_receivers ( session = None ) :
2017-12-08 18:24:33 +00:00
""" Add/update_receivers entries in receiver table and update receivers foreign keys and distance in aircraft beacons and update foreign keys in receiver beacons. """
2017-12-16 20:08:01 +00:00
if session is None :
session = app . session
2017-12-08 18:24:33 +00:00
# Create missing Receiver from ReceiverBeacon
2017-12-16 20:08:01 +00:00
available_receivers = session . query ( Receiver . name ) \
2017-12-08 18:24:33 +00:00
. subquery ( )
2017-12-16 20:08:01 +00:00
missing_receiver_query = session . query ( distinct ( ReceiverBeacon . name ) ) \
2017-12-08 18:24:33 +00:00
. filter ( ReceiverBeacon . receiver_id == null ( ) ) \
. filter ( ~ ReceiverBeacon . name . in_ ( available_receivers ) )
ins = insert ( Receiver ) . from_select ( [ Receiver . name ] , missing_receiver_query )
2017-12-16 20:08:01 +00:00
res = session . execute ( ins )
2017-12-08 18:24:33 +00:00
insert_count = res . rowcount
# For each name in the new beacons: get firstseen, lastseen and last values != NULL
2017-12-16 20:08:01 +00:00
last_valid_values = session . query (
2017-12-08 20:32:50 +00:00
distinct ( ReceiverBeacon . name ) . label ( ' name ' ) ,
2017-12-08 18:24:33 +00:00
func . first_value ( ReceiverBeacon . timestamp )
2017-12-16 20:40:12 +00:00
. over ( partition_by = ReceiverBeacon . name , order_by = case ( [ ( ReceiverBeacon . timestamp == null ( ) , None ) ] , else_ = ReceiverBeacon . timestamp ) . desc ( ) . nullslast ( ) )
2017-12-08 18:24:33 +00:00
. label ( ' firstseen ' ) ,
func . last_value ( ReceiverBeacon . timestamp )
2017-12-16 20:40:12 +00:00
. over ( partition_by = ReceiverBeacon . name , order_by = case ( [ ( ReceiverBeacon . timestamp == null ( ) , None ) ] , else_ = ReceiverBeacon . timestamp ) . asc ( ) . nullslast ( ) )
2017-12-08 18:24:33 +00:00
. label ( ' lastseen ' ) ,
func . first_value ( ReceiverBeacon . location_wkt )
2017-12-16 20:40:12 +00:00
. over ( partition_by = ReceiverBeacon . name , order_by = case ( [ ( ReceiverBeacon . location_wkt == null ( ) , None ) ] , else_ = ReceiverBeacon . timestamp ) . desc ( ) . nullslast ( ) )
2017-12-08 18:24:33 +00:00
. label ( ' location_wkt ' ) ,
func . first_value ( ReceiverBeacon . altitude )
2017-12-16 20:40:12 +00:00
. over ( partition_by = ReceiverBeacon . name , order_by = case ( [ ( ReceiverBeacon . altitude == null ( ) , None ) ] , else_ = ReceiverBeacon . timestamp ) . desc ( ) . nullslast ( ) )
2017-12-08 18:24:33 +00:00
. label ( ' altitude ' ) ,
func . first_value ( ReceiverBeacon . version )
2017-12-16 20:40:12 +00:00
. over ( partition_by = ReceiverBeacon . name , order_by = case ( [ ( ReceiverBeacon . version == null ( ) , None ) ] , else_ = ReceiverBeacon . timestamp ) . desc ( ) . nullslast ( ) )
2017-12-08 18:24:33 +00:00
. label ( ' version ' ) ,
func . first_value ( ReceiverBeacon . platform )
2017-12-16 20:40:12 +00:00
. over ( partition_by = ReceiverBeacon . name , order_by = case ( [ ( ReceiverBeacon . platform == null ( ) , None ) ] , else_ = ReceiverBeacon . timestamp ) . desc ( ) . nullslast ( ) )
2017-12-08 18:24:33 +00:00
. label ( ' platform ' ) ) \
. filter ( ReceiverBeacon . receiver_id == null ( ) ) \
. subquery ( )
2017-12-16 20:08:01 +00:00
update_values = session . query (
2017-12-08 20:32:50 +00:00
Receiver . name ,
case ( [ ( or_ ( Receiver . firstseen == null ( ) , Receiver . firstseen > last_valid_values . c . firstseen ) , last_valid_values . c . firstseen ) ,
( Receiver . firstseen < = last_valid_values . c . firstseen , Receiver . firstseen ) ] ) . label ( ' firstseen ' ) ,
case ( [ ( or_ ( Receiver . lastseen == null ( ) , Receiver . lastseen < last_valid_values . c . lastseen ) , last_valid_values . c . lastseen ) ,
( Receiver . firstseen > = last_valid_values . c . firstseen , Receiver . firstseen ) ] ) . label ( ' lastseen ' ) ,
case ( [ ( or_ ( Receiver . lastseen == null ( ) , Receiver . lastseen < last_valid_values . c . lastseen ) , func . ST_Transform ( last_valid_values . c . location_wkt , 4326 ) ) ,
( Receiver . lastseen > = last_valid_values . c . lastseen , func . ST_Transform ( Receiver . location_wkt , 4326 ) ) ] ) . label ( ' location_wkt ' ) ,
case ( [ ( or_ ( Receiver . lastseen == null ( ) , Receiver . lastseen < last_valid_values . c . lastseen ) , last_valid_values . c . altitude ) ,
( Receiver . lastseen > = last_valid_values . c . lastseen , Receiver . altitude ) ] ) . label ( ' altitude ' ) ,
case ( [ ( or_ ( Receiver . lastseen == null ( ) , Receiver . lastseen < last_valid_values . c . lastseen ) , last_valid_values . c . version ) ,
( Receiver . lastseen > = last_valid_values . c . lastseen , Receiver . version ) ] ) . label ( ' version ' ) ,
case ( [ ( or_ ( Receiver . lastseen == null ( ) , Receiver . lastseen < last_valid_values . c . lastseen ) , last_valid_values . c . platform ) ,
( Receiver . lastseen > = last_valid_values . c . lastseen , Receiver . platform ) ] ) . label ( ' platform ' ) ,
case ( [ ( or_ ( Receiver . location_wkt == null ( ) , not_ ( func . ST_Equals ( Receiver . location_wkt , last_valid_values . c . location_wkt ) ) ) , None ) , # set country code to None if location changed
( func . ST_Equals ( Receiver . location_wkt , last_valid_values . c . location_wkt ) , Receiver . country_code ) ] ) . label ( ' country_code ' ) ) \
. filter ( Receiver . name == last_valid_values . c . name ) \
. subquery ( )
2017-12-08 18:24:33 +00:00
2017-12-16 20:08:01 +00:00
update_receivers = session . query ( Receiver ) \
2017-12-08 18:24:33 +00:00
. filter ( Receiver . name == update_values . c . name ) \
2017-12-08 20:32:50 +00:00
. update ( {
Receiver . firstseen : update_values . c . firstseen ,
Receiver . lastseen : update_values . c . lastseen ,
Receiver . location_wkt : update_values . c . location_wkt ,
Receiver . altitude : update_values . c . altitude ,
Receiver . version : update_values . c . version ,
Receiver . platform : update_values . c . platform ,
Receiver . country_code : update_values . c . country_code } ,
synchronize_session = ' fetch ' )
2017-12-08 18:24:33 +00:00
# Update relations to aircraft beacons
2017-12-16 20:08:01 +00:00
update_aircraft_beacons = session . query ( AircraftBeacon ) \
2017-12-08 18:24:33 +00:00
. filter ( and_ ( AircraftBeacon . receiver_id == null ( ) , AircraftBeacon . receiver_name == Receiver . name ) ) \
. update ( { AircraftBeacon . receiver_id : Receiver . id ,
AircraftBeacon . distance : func . ST_Distance_Sphere ( AircraftBeacon . location_wkt , Receiver . location_wkt ) } ,
synchronize_session = ' fetch ' )
# Update relations to receiver beacons
2017-12-16 20:08:01 +00:00
update_receiver_beacons = session . query ( ReceiverBeacon ) \
2017-12-08 18:24:33 +00:00
. filter ( and_ ( ReceiverBeacon . receiver_id == null ( ) , ReceiverBeacon . name == Receiver . name ) ) \
. update ( { ReceiverBeacon . receiver_id : Receiver . id } ,
synchronize_session = ' fetch ' )
2017-12-16 20:08:01 +00:00
session . commit ( )
2017-12-08 18:24:33 +00:00
2017-12-10 16:30:27 +00:00
logger . info ( " Receivers: {} inserted, {} updated. " . format ( insert_count , update_receivers ) )
logger . info ( " Updated relations: {} aircraft beacons, {} receiver beacons " . format ( update_aircraft_beacons , update_receiver_beacons ) )
2017-12-08 18:24:33 +00:00
2017-12-12 08:15:31 +00:00
return " {} Receivers inserted, {} Receivers updated, {} AircraftBeacons updated, {} ReceiverBeacons updated " \
. format ( insert_count , update_receivers , update_aircraft_beacons , update_receiver_beacons )
2017-12-08 18:24:33 +00:00
@app.task
2017-12-16 20:08:01 +00:00
def update_country_code ( session = None ) :
2017-12-16 21:18:04 +00:00
""" Update country code in receivers table if None. """
2017-12-16 20:08:01 +00:00
if session is None :
session = app . session
unknown_country_query = session . query ( Receiver ) \
2017-12-08 18:24:33 +00:00
. filter ( Receiver . country_code == null ( ) ) \
. filter ( Receiver . location_wkt != null ( ) ) \
. order_by ( Receiver . name )
2017-12-12 08:15:31 +00:00
counter = 0
2017-12-08 18:24:33 +00:00
for receiver in unknown_country_query . all ( ) :
location = receiver . location
country_code = get_country_code ( location . latitude , location . longitude )
if country_code is not None :
receiver . country_code = country_code
2017-12-12 08:15:31 +00:00
logger . info ( " Updated country_code for {} to {} " . format ( receiver . name , receiver . country_code ) )
counter + = 1
2017-12-08 18:24:33 +00:00
2017-12-16 20:08:01 +00:00
session . commit ( )
2017-12-12 08:15:31 +00:00
2017-12-13 09:58:48 +00:00
return " Updated country_code for {} Receivers " . format ( counter )