ogn-python/ogn/collect/logbook.py

108 wiersze
6.3 KiB
Python
Czysty Zwykły widok Historia

2016-04-29 12:03:21 +00:00
from datetime import timedelta
2015-11-11 07:04:42 +00:00
from celery.utils.log import get_task_logger
from ogn.collect.celery import app
2015-11-15 18:54:11 +00:00
from sqlalchemy.sql import func
from sqlalchemy import and_, or_, insert
from sqlalchemy.sql.expression import case
2015-11-11 07:04:42 +00:00
2015-11-15 18:31:58 +00:00
from ogn.model import AircraftBeacon, TakeoffLanding
2015-11-11 07:04:42 +00:00
logger = get_task_logger(__name__)
2015-11-11 07:04:42 +00:00
@app.task
2015-11-11 07:04:42 +00:00
def compute_takeoff_and_landing():
2015-11-24 22:29:27 +00:00
logger.info("Compute takeoffs and landings.")
2016-04-29 12:03:21 +00:00
# takeoff / landing detection is based on 3 consecutive points
takeoff_speed = 55 # takeoff detection: 1st point below, 2nd and 3rd above this limit
landing_speed = 40 # landing detection: 1st point above, 2nd and 3rd below this limit
duration = 100 # the points must not exceed this duration
radius = 0.05 # the points must not exceed this radius (degree!) around the 2nd point
2015-11-11 07:04:42 +00:00
2015-11-24 22:29:27 +00:00
# calculate the time where the computation starts
last_takeoff_landing_query = app.session.query(func.max(TakeoffLanding.timestamp))
2016-04-29 12:03:21 +00:00
begin_computation = last_takeoff_landing_query.one()[0]
if begin_computation is None:
2015-11-24 22:29:27 +00:00
# if the table is empty
2016-04-29 12:03:21 +00:00
last_takeoff_landing_query = app.session.query(func.min(AircraftBeacon.timestamp))
begin_computation = last_takeoff_landing_query.one()[0]
if begin_computation is None:
return 0
2015-11-24 22:29:27 +00:00
else:
2016-04-29 12:03:21 +00:00
# we get the beacons async. to be safe we delete takeoffs/landings from last 24 hours and recalculate from then
begin_computation = begin_computation - timedelta(hours=24)
2015-11-24 22:29:27 +00:00
app.session.query(TakeoffLanding) \
2016-04-29 12:03:21 +00:00
.filter(TakeoffLanding.timestamp >= begin_computation) \
2015-11-24 22:29:27 +00:00
.delete()
2016-04-29 12:03:21 +00:00
end_computation = begin_computation + timedelta(days=30)
2015-11-11 07:04:42 +00:00
2016-04-29 12:03:21 +00:00
logger.debug("Calculate takeoffs and landings between {} and {}"
.format(begin_computation, end_computation))
# make a query with current, previous and next position
2015-11-15 18:31:58 +00:00
sq = app.session.query(
AircraftBeacon.address,
func.lag(AircraftBeacon.address).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('address_prev'),
func.lead(AircraftBeacon.address).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('address_next'),
AircraftBeacon.timestamp,
func.lag(AircraftBeacon.timestamp).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('timestamp_prev'),
func.lead(AircraftBeacon.timestamp).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('timestamp_next'),
2016-04-28 10:38:48 +00:00
AircraftBeacon.name,
func.lag(AircraftBeacon.name).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('name_prev'),
func.lead(AircraftBeacon.name).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('name_next'),
AircraftBeacon.receiver_name,
func.lag(AircraftBeacon.receiver_name).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('receiver_name_prev'),
func.lead(AircraftBeacon.receiver_name).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('receiver_name_next'),
AircraftBeacon.location_wkt,
func.lag(AircraftBeacon.location_wkt).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('location_wkt_prev'),
func.lead(AircraftBeacon.location_wkt).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('location_wkt_next'),
2015-11-15 18:31:58 +00:00
AircraftBeacon.track,
func.lag(AircraftBeacon.track).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('track_prev'),
func.lead(AircraftBeacon.track).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('track_next'),
AircraftBeacon.ground_speed,
func.lag(AircraftBeacon.ground_speed).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('ground_speed_prev'),
func.lead(AircraftBeacon.ground_speed).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('ground_speed_next'),
AircraftBeacon.altitude,
func.lag(AircraftBeacon.altitude).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('altitude_prev'),
func.lead(AircraftBeacon.altitude).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('altitude_next')) \
2016-04-29 12:03:21 +00:00
.filter(AircraftBeacon.timestamp >= begin_computation) \
.filter(AircraftBeacon.timestamp <= end_computation) \
.order_by(func.date(AircraftBeacon.timestamp), AircraftBeacon.address, AircraftBeacon.timestamp) \
2015-11-11 07:04:42 +00:00
.subquery()
2016-04-29 12:03:21 +00:00
# find takeoffs and landings
2015-11-15 18:31:58 +00:00
takeoff_landing_query = app.session.query(
sq.c.address,
2016-04-28 10:38:48 +00:00
sq.c.name,
sq.c.receiver_name,
2015-11-15 18:31:58 +00:00
sq.c.timestamp,
sq.c.location,
2015-11-15 18:31:58 +00:00
sq.c.track,
sq.c.ground_speed,
sq.c.altitude,
case([(sq.c.ground_speed > takeoff_speed, True),
(sq.c.ground_speed < landing_speed, False)]).label('is_takeoff')) \
2015-11-11 07:04:42 +00:00
.filter(sq.c.address_prev == sq.c.address == sq.c.address_next) \
.filter(or_(and_(sq.c.ground_speed_prev < takeoff_speed, # takeoff
sq.c.ground_speed > takeoff_speed,
sq.c.ground_speed_next > takeoff_speed),
and_(sq.c.ground_speed_prev > landing_speed, # landing
sq.c.ground_speed < landing_speed,
sq.c.ground_speed_next < landing_speed))) \
2016-04-29 12:03:21 +00:00
.filter(sq.c.timestamp_next - sq.c.timestamp_prev < timedelta(seconds=duration)) \
.filter(and_(func.ST_DFullyWithin(sq.c.location, sq.c.location_wkt_prev, radius),
func.ST_DFullyWithin(sq.c.location, sq.c.location_wkt_next, radius))) \
2015-11-11 07:04:42 +00:00
.order_by(func.date(sq.c.timestamp), sq.c.timestamp)
2016-04-29 12:03:21 +00:00
# ... and save them
2016-04-28 10:38:48 +00:00
ins = insert(TakeoffLanding).from_select((TakeoffLanding.address, TakeoffLanding.name, TakeoffLanding.receiver_name, TakeoffLanding.timestamp, TakeoffLanding.location_wkt, TakeoffLanding.track, TakeoffLanding.ground_speed, TakeoffLanding.altitude, TakeoffLanding.is_takeoff), takeoff_landing_query)
2015-11-24 22:29:27 +00:00
result = app.session.execute(ins)
counter = result.rowcount
app.session.commit()
logger.debug("New/recalculated takeoffs and landings: {}".format(counter))
2015-11-24 22:29:27 +00:00
return counter