2016-07-02 18:07:22 +00:00
from datetime import timedelta
from celery . utils . log import get_task_logger
2016-07-17 08:25:56 +00:00
from sqlalchemy import and_ , or_ , insert , between , exists
2016-11-01 07:30:54 +00:00
from sqlalchemy . sql import func , null
2016-07-02 18:07:22 +00:00
from sqlalchemy . sql . expression import case
from ogn . collect . celery import app
from ogn . model import AircraftBeacon , TakeoffLanding , Airport
logger = get_task_logger ( __name__ )
@app.task
2017-12-10 16:30:27 +00:00
def update_takeoff_landing ( session = None ) :
2016-07-02 18:07:22 +00:00
logger . info ( " Compute takeoffs and landings. " )
if session is None :
session = app . session
2016-11-01 07:30:54 +00:00
# check if we have any airport
airports_query = session . query ( Airport )
if not airports_query . all ( ) :
logger . warn ( " Cannot calculate takeoff and landings without any airport! Please import airports first. " )
return
2016-07-02 18:07:22 +00:00
# takeoff / landing detection is based on 3 consecutive points
takeoff_speed = 55 # takeoff detection: 1st point below, 2nd and 3rd above this limit
landing_speed = 40 # landing detection: 1st point above, 2nd and 3rd below this limit
duration = 100 # the points must not exceed this duration
2017-12-12 02:47:28 +00:00
radius = 5000 # the points must not exceed this radius around the 2nd point
2016-07-02 18:07:22 +00:00
# takeoff / landing has to be near an airport
2017-12-12 02:47:28 +00:00
airport_radius = 2500 # takeoff / landing must not exceed this radius around the airport
2016-07-02 18:07:22 +00:00
airport_delta = 100 # takeoff / landing must not exceed this altitude offset above/below the airport
# 'wo' is the window order for the sql window function
2016-07-14 19:03:55 +00:00
wo = and_ ( AircraftBeacon . device_id ,
AircraftBeacon . timestamp ,
AircraftBeacon . receiver_id )
2016-07-02 18:07:22 +00:00
# make a query with current, previous and next position
2017-12-10 19:07:37 +00:00
beacon_selection = session . query ( AircraftBeacon . id ) \
. filter ( AircraftBeacon . status == null ( ) ) \
. order_by ( AircraftBeacon . timestamp ) \
. limit ( 1000000 ) \
. subquery ( )
2016-07-02 18:07:22 +00:00
sq = session . query (
AircraftBeacon . id ,
2017-12-10 16:30:27 +00:00
func . lag ( AircraftBeacon . id ) . over ( order_by = wo ) . label ( ' id_prev ' ) ,
func . lead ( AircraftBeacon . id ) . over ( order_by = wo ) . label ( ' id_next ' ) ,
2016-07-02 18:07:22 +00:00
AircraftBeacon . device_id ,
func . lag ( AircraftBeacon . device_id ) . over ( order_by = wo ) . label ( ' device_id_prev ' ) ,
2017-12-11 18:16:03 +00:00
func . lead ( AircraftBeacon . device_id ) . over ( order_by = wo ) . label ( ' device_id_next ' ) ,
AircraftBeacon . timestamp ,
func . lag ( AircraftBeacon . timestamp ) . over ( order_by = wo ) . label ( ' timestamp_prev ' ) ,
func . lead ( AircraftBeacon . timestamp ) . over ( order_by = wo ) . label ( ' timestamp_next ' ) ,
AircraftBeacon . location_wkt ,
func . lag ( AircraftBeacon . location_wkt ) . over ( order_by = wo ) . label ( ' location_wkt_prev ' ) ,
func . lead ( AircraftBeacon . location_wkt ) . over ( order_by = wo ) . label ( ' location_wkt_next ' ) ,
AircraftBeacon . track ,
func . lag ( AircraftBeacon . track ) . over ( order_by = wo ) . label ( ' track_prev ' ) ,
func . lead ( AircraftBeacon . track ) . over ( order_by = wo ) . label ( ' track_next ' ) ,
AircraftBeacon . ground_speed ,
func . lag ( AircraftBeacon . ground_speed ) . over ( order_by = wo ) . label ( ' ground_speed_prev ' ) ,
func . lead ( AircraftBeacon . ground_speed ) . over ( order_by = wo ) . label ( ' ground_speed_next ' ) ,
AircraftBeacon . altitude ,
func . lag ( AircraftBeacon . altitude ) . over ( order_by = wo ) . label ( ' altitude_prev ' ) ,
func . lead ( AircraftBeacon . altitude ) . over ( order_by = wo ) . label ( ' altitude_next ' ) ) \
2017-12-10 19:07:37 +00:00
. filter ( AircraftBeacon . id == beacon_selection . c . id ) \
2016-07-02 18:07:22 +00:00
. subquery ( )
2017-12-10 16:30:27 +00:00
# consider only positions with the same device id
2016-11-01 07:30:54 +00:00
sq2 = session . query ( sq ) \
2017-12-10 16:30:27 +00:00
. filter ( sq . c . device_id_prev == sq . c . device_id == sq . c . device_id_next ) \
. subquery ( )
2017-12-11 18:16:03 +00:00
# find possible takeoffs and landings
2016-07-17 08:25:56 +00:00
sq3 = session . query (
2016-11-01 07:30:54 +00:00
sq2 . c . id ,
2017-12-11 18:16:03 +00:00
sq2 . c . timestamp ,
case ( [ ( sq2 . c . ground_speed > takeoff_speed , sq2 . c . location_wkt_prev ) , # on takeoff we take the location from the previous fix because it is nearer to the airport
( sq2 . c . ground_speed < landing_speed , sq2 . c . location ) ] ) . label ( ' location ' ) ,
case ( [ ( sq2 . c . ground_speed > takeoff_speed , sq2 . c . track ) ,
( sq2 . c . ground_speed < landing_speed , sq2 . c . track_prev ) ] ) . label ( ' track ' ) , # on landing we take the track from the previous fix because gliders tend to leave the runway quickly
sq2 . c . ground_speed ,
sq2 . c . altitude ,
case ( [ ( sq2 . c . ground_speed > takeoff_speed , True ) ,
( sq2 . c . ground_speed < landing_speed , False ) ] ) . label ( ' is_takeoff ' ) ,
sq2 . c . device_id ) \
. filter ( sq2 . c . timestamp_next - sq2 . c . timestamp_prev < timedelta ( seconds = duration ) ) \
2017-12-12 02:47:28 +00:00
. filter ( and_ ( func . ST_Distance_Sphere ( sq2 . c . location , sq2 . c . location_wkt_prev ) < radius ,
func . ST_Distance_Sphere ( sq2 . c . location , sq2 . c . location_wkt_next ) < radius ) ) \
2017-12-11 18:16:03 +00:00
. filter ( or_ ( and_ ( sq2 . c . ground_speed_prev < takeoff_speed , # takeoff
sq2 . c . ground_speed > takeoff_speed ,
sq2 . c . ground_speed_next > takeoff_speed ) ,
and_ ( sq2 . c . ground_speed_prev > landing_speed , # landing
sq2 . c . ground_speed < landing_speed ,
sq2 . c . ground_speed_next < landing_speed ) ) ) \
2016-11-01 07:30:54 +00:00
. subquery ( )
2017-12-11 18:16:03 +00:00
# consider them if they are near a airport
2016-11-01 07:30:54 +00:00
sq4 = session . query (
sq3 . c . timestamp ,
2017-12-11 18:16:03 +00:00
sq3 . c . track ,
sq3 . c . is_takeoff ,
sq3 . c . device_id ,
2016-07-17 08:25:56 +00:00
Airport . id . label ( ' airport_id ' ) ) \
2017-12-12 02:47:28 +00:00
. filter ( and_ ( func . ST_Distance_Sphere ( sq3 . c . location , Airport . location_wkt ) < airport_radius ,
2017-12-11 18:16:03 +00:00
between ( sq3 . c . altitude , Airport . altitude - airport_delta , Airport . altitude + airport_delta ) ) ) \
2016-07-02 18:07:22 +00:00
. filter ( between ( Airport . style , 2 , 5 ) ) \
2016-07-17 08:25:56 +00:00
. subquery ( )
# consider them only if they are not already existing in db
2017-12-11 18:16:03 +00:00
takeoff_landing_query = session . query ( sq4 ) \
2016-07-17 08:25:56 +00:00
. filter ( ~ exists ( ) . where (
2017-12-11 18:16:03 +00:00
and_ ( TakeoffLanding . timestamp == sq4 . c . timestamp ,
TakeoffLanding . device_id == sq4 . c . device_id ,
TakeoffLanding . airport_id == sq4 . c . airport_id ) ) )
2016-07-02 18:07:22 +00:00
# ... and save them
ins = insert ( TakeoffLanding ) . from_select ( ( TakeoffLanding . timestamp ,
TakeoffLanding . track ,
TakeoffLanding . is_takeoff ,
TakeoffLanding . device_id ,
TakeoffLanding . airport_id ) ,
takeoff_landing_query )
result = session . execute ( ins )
counter = result . rowcount
2017-12-10 16:30:27 +00:00
# mark the computated AircraftBeacons as 'used'
update_aircraft_beacons = session . query ( AircraftBeacon ) \
. filter ( AircraftBeacon . id == sq2 . c . id ) \
2017-12-12 20:45:19 +00:00
. update ( { AircraftBeacon . status : 1 } ,
2017-12-10 16:30:27 +00:00
synchronize_session = ' fetch ' )
2016-07-02 18:07:22 +00:00
session . commit ( )
2017-12-10 16:30:27 +00:00
logger . debug ( " Inserted {} TakeoffLandings, updated {} AircraftBeacons " . format ( counter , update_aircraft_beacons ) )
2016-07-02 18:07:22 +00:00
return counter