ogn-python/ogn/collect/logbook.py

310 wiersze
16 KiB
Python
Czysty Zwykły widok Historia

2016-04-29 12:03:21 +00:00
from datetime import timedelta
2015-11-11 07:04:42 +00:00
from celery.utils.log import get_task_logger
from ogn.collect.celery import app
2016-06-29 21:26:30 +00:00
from sqlalchemy.sql import func, null
from sqlalchemy import and_, or_, insert, update, between, exists
from sqlalchemy.sql.expression import case, true, false, label
2015-11-11 07:04:42 +00:00
2016-06-29 21:26:30 +00:00
from ogn.model import AircraftBeacon, TakeoffLanding, Airport, Logbook
2015-11-11 07:04:42 +00:00
logger = get_task_logger(__name__)
2015-11-11 07:04:42 +00:00
2016-06-30 19:10:46 +00:00
def get_aircraft_beacon_start_id():
# returns the last AircraftBeacon used for TakeoffLanding
last_takeoff_landing_query = app.session.query(func.max(TakeoffLanding.id).label('max_id')) \
.subquery()
2015-11-11 07:04:42 +00:00
last_used_aircraft_beacon_query = app.session.query(AircraftBeacon.id) \
.filter(TakeoffLanding.id == last_takeoff_landing_query.c.max_id) \
.filter(and_(AircraftBeacon.timestamp == TakeoffLanding.timestamp,
AircraftBeacon.device_id == TakeoffLanding.device_id))
last_used_aircraft_beacon_id = last_used_aircraft_beacon_query.first()
if last_used_aircraft_beacon_id is None:
min_aircraft_beacon_id = app.session.query(func.min(AircraftBeacon.id)).first()
if min_aircraft_beacon_id is None:
return 0
else:
2016-06-30 19:10:46 +00:00
start_id = min_aircraft_beacon_id[0]
else:
2016-06-30 19:10:46 +00:00
start_id = last_used_aircraft_beacon_id[0] + 1
return start_id
@app.task
def compute_takeoff_and_landing():
logger.info("Compute takeoffs and landings.")
# takeoff / landing detection is based on 3 consecutive points
takeoff_speed = 55 # takeoff detection: 1st point below, 2nd and 3rd above this limit
landing_speed = 40 # landing detection: 1st point above, 2nd and 3rd below this limit
duration = 100 # the points must not exceed this duration
radius = 0.05 # the points must not exceed this radius (degree!) around the 2nd point
# takeoff / landing has to be near an airport
airport_radius = 0.025 # takeoff / landing must not exceed this radius (degree!) around the airport
airport_delta = 100 # takeoff / landing must not exceed this altitude offset above/below the airport
# AircraftBeacon start id and max id offset
aircraft_beacon_start_id = get_aircraft_beacon_start_id()
max_id_offset = 500000
2016-04-29 12:03:21 +00:00
2016-06-30 18:31:24 +00:00
# 'wo' is the window order for the sql window function
wo = and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)
2016-04-29 12:03:21 +00:00
# make a query with current, previous and next position
2015-11-15 18:31:58 +00:00
sq = app.session.query(
AircraftBeacon.id,
2015-11-15 18:31:58 +00:00
AircraftBeacon.timestamp,
2016-06-30 18:31:24 +00:00
func.lag(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_prev'),
func.lead(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_next'),
AircraftBeacon.location_wkt,
2016-06-30 18:31:24 +00:00
func.lag(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_prev'),
func.lead(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_next'),
2015-11-15 18:31:58 +00:00
AircraftBeacon.track,
2016-06-30 18:31:24 +00:00
func.lag(AircraftBeacon.track).over(order_by=wo).label('track_prev'),
func.lead(AircraftBeacon.track).over(order_by=wo).label('track_next'),
2015-11-15 18:31:58 +00:00
AircraftBeacon.ground_speed,
2016-06-30 18:31:24 +00:00
func.lag(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_prev'),
func.lead(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_next'),
2015-11-15 18:31:58 +00:00
AircraftBeacon.altitude,
2016-06-30 18:31:24 +00:00
func.lag(AircraftBeacon.altitude).over(order_by=wo).label('altitude_prev'),
func.lead(AircraftBeacon.altitude).over(order_by=wo).label('altitude_next'),
AircraftBeacon.device_id,
2016-06-30 18:31:24 +00:00
func.lag(AircraftBeacon.device_id).over(order_by=wo).label('device_id_prev'),
func.lead(AircraftBeacon.device_id).over(order_by=wo).label('device_id_next')) \
2016-06-30 19:10:46 +00:00
.filter(between(AircraftBeacon.id, aircraft_beacon_start_id, aircraft_beacon_start_id + max_id_offset)) \
2015-11-11 07:04:42 +00:00
.subquery()
# find possible takeoffs and landings
sq2 = app.session.query(
sq.c.id,
2015-11-15 18:31:58 +00:00
sq.c.timestamp,
2016-05-30 17:28:50 +00:00
case([(sq.c.ground_speed > takeoff_speed, sq.c.location_wkt_prev), # on takeoff we take the location from the previous fix because it is nearer to the airport
(sq.c.ground_speed < landing_speed, sq.c.location)]).label('location'),
case([(sq.c.ground_speed > takeoff_speed, sq.c.track),
(sq.c.ground_speed < landing_speed, sq.c.track_prev)]).label('track'), # on landing we take the track from the previous fix because gliders tend to leave the runway quickly
2015-11-15 18:31:58 +00:00
sq.c.ground_speed,
sq.c.altitude,
case([(sq.c.ground_speed > takeoff_speed, True),
(sq.c.ground_speed < landing_speed, False)]).label('is_takeoff'),
sq.c.device_id) \
.filter(sq.c.device_id_prev == sq.c.device_id == sq.c.device_id_next) \
2015-11-11 07:04:42 +00:00
.filter(or_(and_(sq.c.ground_speed_prev < takeoff_speed, # takeoff
sq.c.ground_speed > takeoff_speed,
sq.c.ground_speed_next > takeoff_speed),
and_(sq.c.ground_speed_prev > landing_speed, # landing
sq.c.ground_speed < landing_speed,
sq.c.ground_speed_next < landing_speed))) \
2016-04-29 12:03:21 +00:00
.filter(sq.c.timestamp_next - sq.c.timestamp_prev < timedelta(seconds=duration)) \
.filter(and_(func.ST_DFullyWithin(sq.c.location, sq.c.location_wkt_prev, radius),
func.ST_DFullyWithin(sq.c.location, sq.c.location_wkt_next, radius))) \
.subquery()
# consider them if they are near a airport
takeoff_landing_query = app.session.query(
sq2.c.timestamp,
sq2.c.track,
sq2.c.is_takeoff,
sq2.c.device_id,
Airport.id) \
.filter(and_(func.ST_DFullyWithin(sq2.c.location, Airport.location_wkt, airport_radius),
2016-06-02 17:53:53 +00:00
between(sq2.c.altitude, Airport.altitude - airport_delta, Airport.altitude + airport_delta))) \
.filter(between(Airport.style, 2, 5)) \
.order_by(sq2.c.id)
2015-11-11 07:04:42 +00:00
2016-04-29 12:03:21 +00:00
# ... and save them
ins = insert(TakeoffLanding).from_select((TakeoffLanding.timestamp,
TakeoffLanding.track,
TakeoffLanding.is_takeoff,
TakeoffLanding.device_id,
TakeoffLanding.airport_id),
takeoff_landing_query)
2015-11-24 22:29:27 +00:00
result = app.session.execute(ins)
counter = result.rowcount
app.session.commit()
logger.debug("New takeoffs and landings: {}".format(counter))
2015-11-24 22:29:27 +00:00
return counter
2016-06-29 21:26:30 +00:00
@app.task
def compute_logbook():
logger.info("Compute logbook.")
or_args = [between(TakeoffLanding.timestamp, '2016-06-28 00:00:00', '2016-06-28 23:59:59')]
or_args = []
# 'wo' is the window order for the sql window function
wo = and_(func.date(TakeoffLanding.timestamp),
TakeoffLanding.device_id,
TakeoffLanding.timestamp)
# make a query with current, previous and next "takeoff_landing" event, so we can find complete flights
sq = app.session.query(
2016-06-30 19:10:46 +00:00
TakeoffLanding.device_id,
func.lag(TakeoffLanding.device_id).over(order_by=wo).label('device_id_prev'),
func.lead(TakeoffLanding.device_id).over(order_by=wo).label('device_id_next'),
TakeoffLanding.timestamp,
func.lag(TakeoffLanding.timestamp).over(order_by=wo).label('timestamp_prev'),
func.lead(TakeoffLanding.timestamp).over(order_by=wo).label('timestamp_next'),
TakeoffLanding.track,
func.lag(TakeoffLanding.track).over(order_by=wo).label('track_prev'),
func.lead(TakeoffLanding.track).over(order_by=wo).label('track_next'),
TakeoffLanding.is_takeoff,
func.lag(TakeoffLanding.is_takeoff).over(order_by=wo).label('is_takeoff_prev'),
func.lead(TakeoffLanding.is_takeoff).over(order_by=wo).label('is_takeoff_next'),
TakeoffLanding.airport_id,
func.lag(TakeoffLanding.airport_id).over(order_by=wo).label('airport_id_prev'),
func.lead(TakeoffLanding.airport_id).over(order_by=wo).label('airport_id_next')) \
2016-06-29 21:26:30 +00:00
.filter(*or_args) \
.subquery()
# find complete flights (with takeoff and landing on the same day)
complete_flight_query = app.session.query(
sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
2016-06-30 20:43:09 +00:00
sq.c.timestamp.label('takeoff_timestamp'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'),
sq.c.timestamp_next.label('landing_timestamp'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'),
2016-06-29 21:26:30 +00:00
label('duration', sq.c.timestamp_next - sq.c.timestamp)) \
.filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \
.filter(sq.c.device_id == sq.c.device_id_next) \
.filter(func.date(sq.c.timestamp_next) == func.date(sq.c.timestamp))
# split complete flights (with takeoff and landing on different days) into one takeoff and one landing
split_start_query = app.session.query(
sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
2016-06-30 20:43:09 +00:00
sq.c.timestamp.label('takeoff_timestamp'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'),
null().label('landing_timestamp'), null().label('landing_track'), null().label('landing_airport_id'),
2016-06-29 21:26:30 +00:00
null().label('duration')) \
.filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \
.filter(sq.c.device_id == sq.c.device_id_next) \
.filter(func.date(sq.c.timestamp_next) != func.date(sq.c.timestamp))
split_landing_query = app.session.query(
sq.c.timestamp_next.label('reftime'),
sq.c.device_id.label('device_id'),
2016-06-30 20:43:09 +00:00
null().label('takeoff_timestamp'), null().label('takeoff_track'), null().label('takeoff_airport_id'),
sq.c.timestamp_next.label('landing_timestamp'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'),
2016-06-29 21:26:30 +00:00
null().label('duration')) \
.filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \
.filter(sq.c.device_id == sq.c.device_id_next) \
.filter(func.date(sq.c.timestamp_next) != func.date(sq.c.timestamp))
# find landings without start
only_landings_query = app.session.query(
sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
2016-06-30 20:43:09 +00:00
null().label('takeoff_timestamp'), null().label('takeoff_track'), null().label('takeoff_airport_id'),
sq.c.timestamp.label('landing_timestamp'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'),
2016-06-29 21:26:30 +00:00
null().label('duration')) \
.filter(sq.c.is_takeoff == false()) \
.filter(or_(sq.c.device_id != sq.c.device_id_prev,
sq.c.is_takeoff_prev == false()))
# find starts without landing
only_starts_query = app.session.query(
sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
2016-06-30 20:43:09 +00:00
sq.c.timestamp.label('takeoff_timestamp'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'),
null().label('landing_timestamp'), null().label('landing_track'), null().label('landing_airport_id'),
2016-06-29 21:26:30 +00:00
null().label('duration')) \
.filter(sq.c.is_takeoff == true()) \
.filter(or_(sq.c.device_id != sq.c.device_id_next,
sq.c.is_takeoff_next == true()))
2016-06-30 20:43:09 +00:00
complete_flights = complete_flight_query.subquery()
upd = update(Logbook) \
.where(and_(Logbook.reftime == complete_flights.c.reftime,
Logbook.device_id == complete_flights.c.device_id,
or_(Logbook.takeoff_airport_id == complete_flights.c.takeoff_airport_id,
and_(Logbook.takeoff_airport_id == null(),
complete_flights.c.takeoff_airport_id == null())),
or_(Logbook.landing_airport_id == complete_flights.c.landing_airport_id,
and_(Logbook.landing_airport_id == null(),
complete_flights.c.landing_airport_id == null())))) \
.values({"takeoff_timestamp": complete_flights.c.takeoff_timestamp,
"takeoff_track": complete_flights.c.takeoff_track,
"takeoff_airport_id": complete_flights.c.takeoff_airport_id,
"landing_timestamp": complete_flights.c.landing_timestamp,
"landing_track": complete_flights.c.landing_track,
"landing_airport_id": complete_flights.c.landing_airport_id,
"duration": complete_flights.c.duration})
result = app.session.execute(upd)
counter = result.rowcount
app.session.commit()
logger.debug("Updated logbook entries: {}".format(counter))
2016-06-29 21:26:30 +00:00
# unite all
union_query = complete_flight_query.union(
split_start_query,
split_landing_query,
only_landings_query,
only_starts_query) \
.subquery()
# consider only if not already stored
new_logbook_entries = app.session.query(union_query) \
.filter(~exists().where(
and_(Logbook.reftime == union_query.c.reftime,
Logbook.device_id == union_query.c.device_id,
or_(Logbook.takeoff_airport_id == union_query.c.takeoff_airport_id,
and_(Logbook.takeoff_airport_id == null(),
union_query.c.takeoff_airport_id == null())),
or_(Logbook.landing_airport_id == union_query.c.landing_airport_id,
and_(Logbook.landing_airport_id == null(),
union_query.c.landing_airport_id == null())))))
# ... and save them
ins = insert(Logbook).from_select((Logbook.reftime,
Logbook.device_id,
Logbook.takeoff_timestamp,
Logbook.takeoff_track,
Logbook.takeoff_airport_id,
Logbook.landing_timestamp,
Logbook.landing_track,
Logbook.landing_airport_id,
Logbook.duration),
new_logbook_entries)
result = app.session.execute(ins)
counter = result.rowcount
app.session.commit()
logger.debug("New logbook entries: {}".format(counter))
return counter
@app.task
def compute_altitudes():
logger.info("Compute maximum altitudes.")
2016-06-30 05:13:36 +00:00
logbook_query = app.session.query(Logbook.id, Logbook.device_id, Logbook.takeoff_timestamp, Logbook.landing_timestamp) \
2016-06-29 21:26:30 +00:00
.filter(and_(Logbook.takeoff_airport_id != null(),
Logbook.landing_airport_id != null())) \
2016-06-30 05:13:36 +00:00
.limit(100) \
.subquery()
max_altitude_query = app.session.query(logbook_query.c.id, func.max(AircraftBeacon.altitude).label('max_altitude')) \
.filter(and_(between(AircraftBeacon.timestamp, logbook_query.c.takeoff_timestamp, logbook_query.c.landing_timestamp),
AircraftBeacon.device_id == logbook_query.c.device_id)) \
.group_by(logbook_query.c.id) \
2016-06-29 21:26:30 +00:00
.subquery()
upd = update(Logbook) \
2016-06-30 05:13:36 +00:00
.values({'max_altitude': max_altitude_query.c.max_altitude}) \
.where(Logbook.id == max_altitude_query.c.id)
2016-06-29 21:26:30 +00:00
result = app.session.execute(upd)
counter = result.rowcount
app.session.commit()
logger.debug("Updated logbook entries: {}".format(counter))
return counter