2015-12-09 02:41:58 +00:00
from celery . utils . log import get_task_logger
2016-02-03 22:09:11 +00:00
2017-12-08 18:24:33 +00:00
from sqlalchemy import insert , distinct
2019-01-01 19:13:08 +00:00
from sqlalchemy . sql import null , and_ , func , not_
2016-02-03 22:09:11 +00:00
2017-12-16 20:08:01 +00:00
from ogn . collect . celery import app
2018-12-08 19:04:41 +00:00
from ogn . model import Country , DeviceInfo , DeviceInfoOrigin , AircraftBeacon , ReceiverBeacon , Device , Receiver
from ogn . utils import get_ddb , get_flarmnet
2015-12-09 02:41:58 +00:00
2016-02-03 22:09:11 +00:00
2015-12-09 02:41:58 +00:00
logger = get_task_logger ( __name__ )
2016-05-22 05:26:02 +00:00
2018-10-21 15:34:03 +00:00
def update_device_infos ( session , address_origin , path = None ) :
if address_origin == DeviceInfoOrigin . flarmnet :
device_infos = get_flarmnet ( fln_file = path )
else :
device_infos = get_ddb ( csv_file = path )
2016-07-17 20:57:54 +00:00
2016-06-21 17:34:05 +00:00
session . query ( DeviceInfo ) \
. filter ( DeviceInfo . address_origin == address_origin ) \
2017-12-29 22:54:11 +00:00
. delete ( synchronize_session = ' fetch ' )
2018-10-21 15:34:03 +00:00
session . commit ( )
for device_info in device_infos :
device_info . address_origin = address_origin
2016-05-22 05:26:02 +00:00
2016-06-21 17:34:05 +00:00
session . bulk_save_objects ( device_infos )
2015-12-09 02:41:58 +00:00
session . commit ( )
2016-06-21 17:34:05 +00:00
return len ( device_infos )
2015-12-09 02:41:58 +00:00
@app.task
2017-12-16 20:08:01 +00:00
def import_ddb ( session = None ) :
2015-12-09 02:41:58 +00:00
""" Import registered devices from the DDB. """
2017-12-16 20:08:01 +00:00
if session is None :
session = app . session
2015-12-09 02:41:58 +00:00
logger . info ( " Import registered devices fom the DDB... " )
2018-10-21 15:34:03 +00:00
counter = update_device_infos ( session , DeviceInfoOrigin . ogn_ddb )
2016-02-03 22:19:25 +00:00
logger . info ( " Imported {} devices. " . format ( counter ) )
2017-12-08 18:24:33 +00:00
2017-12-17 13:34:14 +00:00
return " Imported {} devices. " . format ( counter )
2017-12-08 18:24:33 +00:00
@app.task
2018-11-28 06:37:35 +00:00
def add_missing_devices ( session = None ) :
2017-12-08 18:24:33 +00:00
""" Add/update entries in devices table and update foreign keys in aircraft beacons. """
2017-12-16 20:08:01 +00:00
if session is None :
session = app . session
2017-12-08 18:24:33 +00:00
# Create missing Device from AircraftBeacon
2017-12-16 20:08:01 +00:00
available_devices = session . query ( Device . address ) \
2017-12-08 18:24:33 +00:00
. subquery ( )
2017-12-16 20:08:01 +00:00
missing_devices_query = session . query ( distinct ( AircraftBeacon . address ) ) \
2017-12-16 12:21:25 +00:00
. filter ( and_ ( AircraftBeacon . device_id == null ( ) , not_ ( AircraftBeacon . address . like ( ' 00 % ' ) ) , AircraftBeacon . error_count == 0 ) ) \
2017-12-08 18:24:33 +00:00
. filter ( ~ AircraftBeacon . address . in_ ( available_devices ) )
ins = insert ( Device ) . from_select ( [ Device . address ] , missing_devices_query )
2017-12-16 20:08:01 +00:00
res = session . execute ( ins )
2017-12-08 18:24:33 +00:00
insert_count = res . rowcount
2017-12-16 20:08:01 +00:00
session . commit ( )
2017-12-08 18:24:33 +00:00
# Update relations to aircraft beacons
2017-12-16 20:08:01 +00:00
upd = session . query ( AircraftBeacon ) \
2017-12-08 18:24:33 +00:00
. filter ( AircraftBeacon . device_id == null ( ) ) \
. filter ( AircraftBeacon . address == Device . address ) \
2017-12-08 20:32:50 +00:00
. update ( {
AircraftBeacon . device_id : Device . id } ,
synchronize_session = ' fetch ' )
2017-12-08 18:24:33 +00:00
2017-12-16 20:08:01 +00:00
session . commit ( )
2018-11-28 06:37:35 +00:00
logger . info ( " Devices: {} inserted, {} updated " . format ( insert_count , add_missing_receivers ) )
2017-12-10 16:30:27 +00:00
logger . info ( " Updated {} AircraftBeacons " . format ( upd ) )
2017-12-08 18:24:33 +00:00
2017-12-12 08:15:31 +00:00
return " {} Devices inserted, {} Devices updated, {} AircraftBeacons updated " \
2018-11-28 06:37:35 +00:00
. format ( insert_count , add_missing_receivers , upd )
2017-12-12 08:15:31 +00:00
2017-12-08 18:24:33 +00:00
@app.task
2018-11-28 06:37:35 +00:00
def add_missing_receivers ( session = None ) :
""" Add/add_missing_receivers entries in receiver table and update receivers foreign keys and distance in aircraft beacons and update foreign keys in receiver beacons. """
2017-12-16 20:08:01 +00:00
if session is None :
session = app . session
2017-12-08 18:24:33 +00:00
# Create missing Receiver from ReceiverBeacon
2017-12-16 20:08:01 +00:00
available_receivers = session . query ( Receiver . name ) \
2017-12-08 18:24:33 +00:00
. subquery ( )
2017-12-16 20:08:01 +00:00
missing_receiver_query = session . query ( distinct ( ReceiverBeacon . name ) ) \
2017-12-08 18:24:33 +00:00
. filter ( ReceiverBeacon . receiver_id == null ( ) ) \
. filter ( ~ ReceiverBeacon . name . in_ ( available_receivers ) )
ins = insert ( Receiver ) . from_select ( [ Receiver . name ] , missing_receiver_query )
2017-12-16 20:08:01 +00:00
res = session . execute ( ins )
2017-12-08 18:24:33 +00:00
insert_count = res . rowcount
# Update relations to aircraft beacons
2017-12-16 20:08:01 +00:00
update_aircraft_beacons = session . query ( AircraftBeacon ) \
2017-12-08 18:24:33 +00:00
. filter ( and_ ( AircraftBeacon . receiver_id == null ( ) , AircraftBeacon . receiver_name == Receiver . name ) ) \
. update ( { AircraftBeacon . receiver_id : Receiver . id ,
AircraftBeacon . distance : func . ST_Distance_Sphere ( AircraftBeacon . location_wkt , Receiver . location_wkt ) } ,
synchronize_session = ' fetch ' )
# Update relations to receiver beacons
2017-12-16 20:08:01 +00:00
update_receiver_beacons = session . query ( ReceiverBeacon ) \
2017-12-08 18:24:33 +00:00
. filter ( and_ ( ReceiverBeacon . receiver_id == null ( ) , ReceiverBeacon . name == Receiver . name ) ) \
. update ( { ReceiverBeacon . receiver_id : Receiver . id } ,
synchronize_session = ' fetch ' )
2017-12-16 20:08:01 +00:00
session . commit ( )
2017-12-08 18:24:33 +00:00
2018-11-28 06:37:35 +00:00
logger . info ( " Receivers: {} inserted, {} updated. " . format ( insert_count , add_missing_receivers ) )
2017-12-10 16:30:27 +00:00
logger . info ( " Updated relations: {} aircraft beacons, {} receiver beacons " . format ( update_aircraft_beacons , update_receiver_beacons ) )
2017-12-08 18:24:33 +00:00
2017-12-12 08:15:31 +00:00
return " {} Receivers inserted, {} Receivers updated, {} AircraftBeacons updated, {} ReceiverBeacons updated " \
2018-11-28 06:37:35 +00:00
. format ( insert_count , add_missing_receivers , update_aircraft_beacons , update_receiver_beacons )
2017-12-12 08:15:31 +00:00
2017-12-08 18:24:33 +00:00
@app.task
2017-12-16 20:08:01 +00:00
def update_country_code ( session = None ) :
2017-12-16 21:18:04 +00:00
""" Update country code in receivers table if None. """
2017-12-16 20:08:01 +00:00
if session is None :
session = app . session
2018-12-08 19:04:41 +00:00
update_receivers = session . query ( Receiver ) \
. filter ( and_ ( Receiver . country_id == null ( ) , Receiver . location_wkt != null ( ) , func . st_within ( Receiver . location_wkt , Country . geom ) ) ) \
. update ( { Receiver . country_id : Country . gid } ,
synchronize_session = ' fetch ' )
2017-12-08 18:24:33 +00:00
2017-12-16 20:08:01 +00:00
session . commit ( )
2018-12-08 19:04:41 +00:00
logger . info ( " Updated {} AircraftBeacons " . format ( update_receivers ) )
2017-12-12 08:15:31 +00:00
2018-12-08 19:04:41 +00:00
return " Updated country for {} Receivers " . format ( update_receivers )