Split logbook and make it testable

pull/56/head
Konstantin Gründger 2016-07-02 20:07:22 +02:00
rodzic 6d1d68e948
commit acd1128dc0
3 zmienionych plików z 156 dodań i 139 usunięć

Wyświetl plik

@ -1,141 +1,22 @@
from datetime import timedelta
from celery.utils.log import get_task_logger
from ogn.collect.celery import app
from sqlalchemy.sql import func, null
from sqlalchemy import and_, or_, insert, update, between, exists
from sqlalchemy.sql.expression import case, true, false, label
from sqlalchemy.sql import func, null
from sqlalchemy.sql.expression import true, false, label
from ogn.model import AircraftBeacon, TakeoffLanding, Airport, Logbook
from ogn.collect.celery import app
from ogn.model import TakeoffLanding, Logbook
logger = get_task_logger(__name__)
def get_aircraft_beacon_start_id():
# returns the last AircraftBeacon used for TakeoffLanding
last_takeoff_landing_query = app.session.query(func.max(TakeoffLanding.id).label('max_id')) \
.subquery()
last_used_aircraft_beacon_query = app.session.query(AircraftBeacon.id) \
.filter(TakeoffLanding.id == last_takeoff_landing_query.c.max_id) \
.filter(and_(AircraftBeacon.timestamp == TakeoffLanding.timestamp,
AircraftBeacon.device_id == TakeoffLanding.device_id))
last_used_aircraft_beacon_id = last_used_aircraft_beacon_query.first()
if last_used_aircraft_beacon_id is None:
min_aircraft_beacon_id = app.session.query(func.min(AircraftBeacon.id)).first()
if min_aircraft_beacon_id is None:
start_id = 0
else:
start_id = min_aircraft_beacon_id[0]
else:
start_id = last_used_aircraft_beacon_id[0] + 1
return start_id
@app.task
def compute_takeoff_and_landing():
logger.info("Compute takeoffs and landings.")
# takeoff / landing detection is based on 3 consecutive points
takeoff_speed = 55 # takeoff detection: 1st point below, 2nd and 3rd above this limit
landing_speed = 40 # landing detection: 1st point above, 2nd and 3rd below this limit
duration = 100 # the points must not exceed this duration
radius = 0.05 # the points must not exceed this radius (degree!) around the 2nd point
# takeoff / landing has to be near an airport
airport_radius = 0.025 # takeoff / landing must not exceed this radius (degree!) around the airport
airport_delta = 100 # takeoff / landing must not exceed this altitude offset above/below the airport
# AircraftBeacon start id and max id offset
aircraft_beacon_start_id = get_aircraft_beacon_start_id()
max_id_offset = 500000
# 'wo' is the window order for the sql window function
wo = and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)
# make a query with current, previous and next position
sq = app.session.query(
AircraftBeacon.id,
AircraftBeacon.timestamp,
func.lag(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_prev'),
func.lead(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_next'),
AircraftBeacon.location_wkt,
func.lag(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_prev'),
func.lead(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_next'),
AircraftBeacon.track,
func.lag(AircraftBeacon.track).over(order_by=wo).label('track_prev'),
func.lead(AircraftBeacon.track).over(order_by=wo).label('track_next'),
AircraftBeacon.ground_speed,
func.lag(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_prev'),
func.lead(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_next'),
AircraftBeacon.altitude,
func.lag(AircraftBeacon.altitude).over(order_by=wo).label('altitude_prev'),
func.lead(AircraftBeacon.altitude).over(order_by=wo).label('altitude_next'),
AircraftBeacon.device_id,
func.lag(AircraftBeacon.device_id).over(order_by=wo).label('device_id_prev'),
func.lead(AircraftBeacon.device_id).over(order_by=wo).label('device_id_next')) \
.filter(between(AircraftBeacon.id, aircraft_beacon_start_id, aircraft_beacon_start_id + max_id_offset)) \
.subquery()
# find possible takeoffs and landings
sq2 = app.session.query(
sq.c.id,
sq.c.timestamp,
case([(sq.c.ground_speed > takeoff_speed, sq.c.location_wkt_prev), # on takeoff we take the location from the previous fix because it is nearer to the airport
(sq.c.ground_speed < landing_speed, sq.c.location)]).label('location'),
case([(sq.c.ground_speed > takeoff_speed, sq.c.track),
(sq.c.ground_speed < landing_speed, sq.c.track_prev)]).label('track'), # on landing we take the track from the previous fix because gliders tend to leave the runway quickly
sq.c.ground_speed,
sq.c.altitude,
case([(sq.c.ground_speed > takeoff_speed, True),
(sq.c.ground_speed < landing_speed, False)]).label('is_takeoff'),
sq.c.device_id) \
.filter(sq.c.device_id_prev == sq.c.device_id == sq.c.device_id_next) \
.filter(or_(and_(sq.c.ground_speed_prev < takeoff_speed, # takeoff
sq.c.ground_speed > takeoff_speed,
sq.c.ground_speed_next > takeoff_speed),
and_(sq.c.ground_speed_prev > landing_speed, # landing
sq.c.ground_speed < landing_speed,
sq.c.ground_speed_next < landing_speed))) \
.filter(sq.c.timestamp_next - sq.c.timestamp_prev < timedelta(seconds=duration)) \
.filter(and_(func.ST_DFullyWithin(sq.c.location, sq.c.location_wkt_prev, radius),
func.ST_DFullyWithin(sq.c.location, sq.c.location_wkt_next, radius))) \
.subquery()
# consider them if they are near a airport
takeoff_landing_query = app.session.query(
sq2.c.timestamp,
sq2.c.track,
sq2.c.is_takeoff,
sq2.c.device_id,
Airport.id) \
.filter(and_(func.ST_DFullyWithin(sq2.c.location, Airport.location_wkt, airport_radius),
between(sq2.c.altitude, Airport.altitude - airport_delta, Airport.altitude + airport_delta))) \
.filter(between(Airport.style, 2, 5)) \
.order_by(sq2.c.id)
# ... and save them
ins = insert(TakeoffLanding).from_select((TakeoffLanding.timestamp,
TakeoffLanding.track,
TakeoffLanding.is_takeoff,
TakeoffLanding.device_id,
TakeoffLanding.airport_id),
takeoff_landing_query)
result = app.session.execute(ins)
counter = result.rowcount
app.session.commit()
logger.debug("New takeoffs and landings: {}".format(counter))
return counter
@app.task
def compute_logbook_entries():
def compute_logbook_entries(session=None):
logger.info("Compute logbook.")
if session is None:
session = app.session
or_args = [between(TakeoffLanding.timestamp, '2016-06-28 00:00:00', '2016-06-28 23:59:59')]
or_args = []
@ -145,7 +26,7 @@ def compute_logbook_entries():
TakeoffLanding.timestamp)
# make a query with current, previous and next "takeoff_landing" event, so we can find complete flights
sq = app.session.query(
sq = session.query(
TakeoffLanding.device_id,
func.lag(TakeoffLanding.device_id).over(order_by=wo).label('device_id_prev'),
func.lead(TakeoffLanding.device_id).over(order_by=wo).label('device_id_next'),
@ -165,7 +46,7 @@ def compute_logbook_entries():
.subquery()
# find complete flights (with takeoff and landing on the same day)
complete_flight_query = app.session.query(
complete_flight_query = session.query(
sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
sq.c.timestamp.label('takeoff_timestamp'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'),
@ -176,7 +57,7 @@ def compute_logbook_entries():
.filter(func.date(sq.c.timestamp_next) == func.date(sq.c.timestamp))
# split complete flights (with takeoff and landing on different days) into one takeoff and one landing
split_start_query = app.session.query(
split_start_query = session.query(
sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
sq.c.timestamp.label('takeoff_timestamp'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'),
@ -186,7 +67,7 @@ def compute_logbook_entries():
.filter(sq.c.device_id == sq.c.device_id_next) \
.filter(func.date(sq.c.timestamp_next) != func.date(sq.c.timestamp))
split_landing_query = app.session.query(
split_landing_query = session.query(
sq.c.timestamp_next.label('reftime'),
sq.c.device_id.label('device_id'),
null().label('takeoff_timestamp'), null().label('takeoff_track'), null().label('takeoff_airport_id'),
@ -197,7 +78,7 @@ def compute_logbook_entries():
.filter(func.date(sq.c.timestamp_next) != func.date(sq.c.timestamp))
# find landings without start
only_landings_query = app.session.query(
only_landings_query = session.query(
sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
null().label('takeoff_timestamp'), null().label('takeoff_track'), null().label('takeoff_airport_id'),
@ -209,7 +90,7 @@ def compute_logbook_entries():
sq.c.is_takeoff_prev == null()))
# find starts without landing
only_starts_query = app.session.query(
only_starts_query = session.query(
sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
sq.c.timestamp.label('takeoff_timestamp'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'),
@ -239,9 +120,9 @@ def compute_logbook_entries():
"landing_airport_id": complete_flights.c.landing_airport_id,
"duration": complete_flights.c.duration})
result = app.session.execute(upd)
result = session.execute(upd)
counter = result.rowcount
app.session.commit()
session.commit()
logger.debug("Updated logbook entries: {}".format(counter))
# unite all computated flights ('incomplete' and 'complete')
@ -253,7 +134,7 @@ def compute_logbook_entries():
.subquery()
# consider only if not already stored
new_logbook_entries = app.session.query(union_query) \
new_logbook_entries = session.query(union_query) \
.filter(~exists().where(
and_(Logbook.reftime == union_query.c.reftime,
Logbook.device_id == union_query.c.device_id,
@ -276,9 +157,9 @@ def compute_logbook_entries():
Logbook.duration),
new_logbook_entries)
result = app.session.execute(ins)
result = session.execute(ins)
counter = result.rowcount
app.session.commit()
session.commit()
logger.debug("New logbook entries: {}".format(counter))
return counter

Wyświetl plik

@ -0,0 +1,135 @@
from datetime import timedelta
from celery.utils.log import get_task_logger
from sqlalchemy import and_, or_, insert, between
from sqlalchemy.sql import func
from sqlalchemy.sql.expression import case
from ogn.collect.celery import app
from ogn.model import AircraftBeacon, TakeoffLanding, Airport
logger = get_task_logger(__name__)
def get_aircraft_beacon_start_id(session):
# returns the last AircraftBeacon used for TakeoffLanding
last_takeoff_landing_query = session.query(func.max(TakeoffLanding.id).label('max_id')) \
.subquery()
last_used_aircraft_beacon_query = session.query(AircraftBeacon.id) \
.filter(TakeoffLanding.id == last_takeoff_landing_query.c.max_id) \
.filter(and_(AircraftBeacon.timestamp == TakeoffLanding.timestamp,
AircraftBeacon.device_id == TakeoffLanding.device_id))
last_used_aircraft_beacon_id = last_used_aircraft_beacon_query.first()
if last_used_aircraft_beacon_id is None:
min_aircraft_beacon_id = session.query(func.min(AircraftBeacon.id)).first()
if min_aircraft_beacon_id is None:
start_id = 0
else:
start_id = min_aircraft_beacon_id[0]
else:
start_id = last_used_aircraft_beacon_id[0] + 1
return start_id
@app.task
def compute_takeoff_and_landing(session=None):
logger.info("Compute takeoffs and landings.")
if session is None:
session = app.session
# takeoff / landing detection is based on 3 consecutive points
takeoff_speed = 55 # takeoff detection: 1st point below, 2nd and 3rd above this limit
landing_speed = 40 # landing detection: 1st point above, 2nd and 3rd below this limit
duration = 100 # the points must not exceed this duration
radius = 0.05 # the points must not exceed this radius (degree!) around the 2nd point
# takeoff / landing has to be near an airport
airport_radius = 0.025 # takeoff / landing must not exceed this radius (degree!) around the airport
airport_delta = 100 # takeoff / landing must not exceed this altitude offset above/below the airport
# AircraftBeacon start id and max id offset
aircraft_beacon_start_id = get_aircraft_beacon_start_id(session)
max_id_offset = 500000
# 'wo' is the window order for the sql window function
wo = and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)
# make a query with current, previous and next position
sq = session.query(
AircraftBeacon.id,
AircraftBeacon.timestamp,
func.lag(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_prev'),
func.lead(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_next'),
AircraftBeacon.location_wkt,
func.lag(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_prev'),
func.lead(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_next'),
AircraftBeacon.track,
func.lag(AircraftBeacon.track).over(order_by=wo).label('track_prev'),
func.lead(AircraftBeacon.track).over(order_by=wo).label('track_next'),
AircraftBeacon.ground_speed,
func.lag(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_prev'),
func.lead(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_next'),
AircraftBeacon.altitude,
func.lag(AircraftBeacon.altitude).over(order_by=wo).label('altitude_prev'),
func.lead(AircraftBeacon.altitude).over(order_by=wo).label('altitude_next'),
AircraftBeacon.device_id,
func.lag(AircraftBeacon.device_id).over(order_by=wo).label('device_id_prev'),
func.lead(AircraftBeacon.device_id).over(order_by=wo).label('device_id_next')) \
.filter(between(AircraftBeacon.id, aircraft_beacon_start_id, aircraft_beacon_start_id + max_id_offset)) \
.subquery()
# find possible takeoffs and landings
sq2 = session.query(
sq.c.id,
sq.c.timestamp,
case([(sq.c.ground_speed > takeoff_speed, sq.c.location_wkt_prev), # on takeoff we take the location from the previous fix because it is nearer to the airport
(sq.c.ground_speed < landing_speed, sq.c.location)]).label('location'),
case([(sq.c.ground_speed > takeoff_speed, sq.c.track),
(sq.c.ground_speed < landing_speed, sq.c.track_prev)]).label('track'), # on landing we take the track from the previous fix because gliders tend to leave the runway quickly
sq.c.ground_speed,
sq.c.altitude,
case([(sq.c.ground_speed > takeoff_speed, True),
(sq.c.ground_speed < landing_speed, False)]).label('is_takeoff'),
sq.c.device_id) \
.filter(sq.c.device_id_prev == sq.c.device_id == sq.c.device_id_next) \
.filter(or_(and_(sq.c.ground_speed_prev < takeoff_speed, # takeoff
sq.c.ground_speed > takeoff_speed,
sq.c.ground_speed_next > takeoff_speed),
and_(sq.c.ground_speed_prev > landing_speed, # landing
sq.c.ground_speed < landing_speed,
sq.c.ground_speed_next < landing_speed))) \
.filter(sq.c.timestamp_next - sq.c.timestamp_prev < timedelta(seconds=duration)) \
.filter(and_(func.ST_DFullyWithin(sq.c.location, sq.c.location_wkt_prev, radius),
func.ST_DFullyWithin(sq.c.location, sq.c.location_wkt_next, radius))) \
.subquery()
# consider them if they are near a airport
takeoff_landing_query = session.query(
sq2.c.timestamp,
sq2.c.track,
sq2.c.is_takeoff,
sq2.c.device_id,
Airport.id) \
.filter(and_(func.ST_DFullyWithin(sq2.c.location, Airport.location_wkt, airport_radius),
between(sq2.c.altitude, Airport.altitude - airport_delta, Airport.altitude + airport_delta))) \
.filter(between(Airport.style, 2, 5)) \
.order_by(sq2.c.id)
# ... and save them
ins = insert(TakeoffLanding).from_select((TakeoffLanding.timestamp,
TakeoffLanding.track,
TakeoffLanding.is_takeoff,
TakeoffLanding.device_id,
TakeoffLanding.airport_id),
takeoff_landing_query)
result = session.execute(ins)
counter = result.rowcount
session.commit()
logger.debug("New takeoffs and landings: {}".format(counter))
return counter

Wyświetl plik

@ -9,7 +9,8 @@ from sqlalchemy.orm import aliased
from ogn.model import Device, DeviceInfo, TakeoffLanding, Airport, Logbook
from ogn.commands.dbutils import session
from ogn.collect.logbook import compute_takeoff_and_landing, compute_logbook_entries
from ogn.collect.takeoff_landing import compute_takeoff_and_landing
from ogn.collect.logbook import compute_logbook_entries
from manager import Manager
manager = Manager()