From 76b5827778301bc031162415fb2affd28fae2c73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Tue, 28 Jun 2016 19:52:49 +0200 Subject: [PATCH 01/26] Bugfix: use AircraftBeacon.id for sorting --- ogn/collect/logbook.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/ogn/collect/logbook.py b/ogn/collect/logbook.py index cb0b9bf..1726b5d 100644 --- a/ogn/collect/logbook.py +++ b/ogn/collect/logbook.py @@ -40,12 +40,17 @@ def compute_takeoff_and_landing(): last_used_aircraft_beacon_id = last_used_aircraft_beacon_query.first() if last_used_aircraft_beacon_id is None: - aircraft_beacon_id_start = 0 + min_aircraft_beacon_id = app.session.query(func.min(AircraftBeacon.id)).first() + if min_aircraft_beacon_id is None: + return 0 + else: + aircraft_beacon_id_start = min_aircraft_beacon_id[0] else: aircraft_beacon_id_start = last_used_aircraft_beacon_id[0] + 1 # make a query with current, previous and next position sq = app.session.query( + AircraftBeacon.id, AircraftBeacon.timestamp, func.lag(AircraftBeacon.timestamp).over(order_by=and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)).label('timestamp_prev'), func.lead(AircraftBeacon.timestamp).over(order_by=and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)).label('timestamp_next'), @@ -69,6 +74,7 @@ def compute_takeoff_and_landing(): # find possible takeoffs and landings sq2 = app.session.query( + sq.c.id, sq.c.timestamp, case([(sq.c.ground_speed > takeoff_speed, sq.c.location_wkt_prev), # on takeoff we take the location from the previous fix because it is nearer to the airport (sq.c.ground_speed < landing_speed, sq.c.location)]).label('location'), @@ -100,7 +106,8 @@ def compute_takeoff_and_landing(): Airport.id) \ .filter(and_(func.ST_DFullyWithin(sq2.c.location, Airport.location_wkt, airport_radius), between(sq2.c.altitude, Airport.altitude - airport_delta, Airport.altitude + airport_delta))) \ - .filter(between(Airport.style, 2, 5)) + .filter(between(Airport.style, 2, 5)) \ + .order_by(sq2.c.id) # ... and save them ins = insert(TakeoffLanding).from_select((TakeoffLanding.timestamp, From 998cbf499062c571cb1e71e4e837ae9f9124257d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Tue, 28 Jun 2016 20:00:24 +0200 Subject: [PATCH 02/26] Use utc_timedelta only for splitting of takeoff and landing --- ogn/commands/logbook.py | 50 ++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/ogn/commands/logbook.py b/ogn/commands/logbook.py index 4daa597..fa526ec 100644 --- a/ogn/commands/logbook.py +++ b/ogn/commands/logbook.py @@ -49,58 +49,58 @@ def show(airport_name, utc_delta_hours=0, date=None): sq = session.query( TakeoffLanding.device_id, func.lag(TakeoffLanding.device_id) - .over(order_by=and_(func.date(TakeoffLanding.timestamp + utc_timedelta), + .over(order_by=and_(func.date(TakeoffLanding.timestamp), TakeoffLanding.device_id, - TakeoffLanding.timestamp + utc_timedelta)) + TakeoffLanding.timestamp)) .label('device_id_prev'), func.lead(TakeoffLanding.device_id) - .over(order_by=and_(func.date(TakeoffLanding.timestamp + utc_timedelta), + .over(order_by=and_(func.date(TakeoffLanding.timestamp), TakeoffLanding.device_id, - TakeoffLanding.timestamp + utc_timedelta)) + TakeoffLanding.timestamp)) .label('device_id_next'), - (TakeoffLanding.timestamp + utc_timedelta).label('timestamp'), + (TakeoffLanding.timestamp).label('timestamp'), func.lag(TakeoffLanding.timestamp) - .over(order_by=and_(func.date(TakeoffLanding.timestamp + utc_timedelta), + .over(order_by=and_(func.date(TakeoffLanding.timestamp), TakeoffLanding.device_id, - TakeoffLanding.timestamp + utc_timedelta)) + TakeoffLanding.timestamp)) .label('timestamp_prev'), - func.lead(TakeoffLanding.timestamp + utc_timedelta) - .over(order_by=and_(func.date(TakeoffLanding.timestamp + utc_timedelta), + func.lead(TakeoffLanding.timestamp) + .over(order_by=and_(func.date(TakeoffLanding.timestamp), TakeoffLanding.device_id, - TakeoffLanding.timestamp + utc_timedelta)) + TakeoffLanding.timestamp)) .label('timestamp_next'), TakeoffLanding.track, func.lag(TakeoffLanding.track) - .over(order_by=and_(func.date(TakeoffLanding.timestamp + utc_timedelta), + .over(order_by=and_(func.date(TakeoffLanding.timestamp), TakeoffLanding.device_id, - TakeoffLanding.timestamp + utc_timedelta)) + TakeoffLanding.timestamp)) .label('track_prev'), func.lead(TakeoffLanding.track) - .over(order_by=and_(func.date(TakeoffLanding.timestamp + utc_timedelta), + .over(order_by=and_(func.date(TakeoffLanding.timestamp), TakeoffLanding.device_id, - TakeoffLanding.timestamp + utc_timedelta)) + TakeoffLanding.timestamp)) .label('track_next'), TakeoffLanding.is_takeoff, func.lag(TakeoffLanding.is_takeoff) - .over(order_by=and_(func.date(TakeoffLanding.timestamp + utc_timedelta), + .over(order_by=and_(func.date(TakeoffLanding.timestamp), TakeoffLanding.device_id, - TakeoffLanding.timestamp + utc_timedelta)) + TakeoffLanding.timestamp)) .label('is_takeoff_prev'), func.lead(TakeoffLanding.is_takeoff) - .over(order_by=and_(func.date(TakeoffLanding.timestamp + utc_timedelta), + .over(order_by=and_(func.date(TakeoffLanding.timestamp), TakeoffLanding.device_id, - TakeoffLanding.timestamp + utc_timedelta)) + TakeoffLanding.timestamp)) .label('is_takeoff_next'), TakeoffLanding.airport_id, func.lag(TakeoffLanding.airport_id) - .over(order_by=and_(func.date(TakeoffLanding.timestamp + utc_timedelta), + .over(order_by=and_(func.date(TakeoffLanding.timestamp), TakeoffLanding.device_id, - TakeoffLanding.timestamp + utc_timedelta)) + TakeoffLanding.timestamp)) .label('airport_id_prev'), func.lead(TakeoffLanding.airport_id) - .over(order_by=and_(func.date(TakeoffLanding.timestamp + utc_timedelta), + .over(order_by=and_(func.date(TakeoffLanding.timestamp), TakeoffLanding.device_id, - TakeoffLanding.timestamp + utc_timedelta)) + TakeoffLanding.timestamp)) .label('airport_id_next')) \ .filter(*or_args) \ .subquery() @@ -113,7 +113,7 @@ def show(airport_name, utc_delta_hours=0, date=None): label('duration', sq.c.timestamp_next - sq.c.timestamp)) \ .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ .filter(sq.c.device_id == sq.c.device_id_next) \ - .filter(func.date(sq.c.timestamp_next) == func.date(sq.c.timestamp)) \ + .filter(func.date(sq.c.timestamp_next + utc_timedelta) == func.date(sq.c.timestamp + utc_timedelta)) \ .filter(or_(sq.c.airport_id == airport.id, sq.c.airport_id_next == airport.id)) @@ -125,7 +125,7 @@ def show(airport_name, utc_delta_hours=0, date=None): null().label('duration')) \ .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ .filter(sq.c.device_id == sq.c.device_id_next) \ - .filter(func.date(sq.c.timestamp_next) != func.date(sq.c.timestamp)) \ + .filter(func.date(sq.c.timestamp_next + utc_timedelta) != func.date(sq.c.timestamp + utc_timedelta)) \ .filter(and_(sq.c.airport_id == airport.id, sq.c.airport_id_next == airport.id)) @@ -136,7 +136,7 @@ def show(airport_name, utc_delta_hours=0, date=None): null().label('duration')) \ .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ .filter(sq.c.device_id == sq.c.device_id_next) \ - .filter(func.date(sq.c.timestamp_next) != func.date(sq.c.timestamp)) \ + .filter(func.date(sq.c.timestamp_next + utc_timedelta) != func.date(sq.c.timestamp + utc_timedelta)) \ .filter(and_(sq.c.airport_id == airport.id, sq.c.airport_id_next == airport.id)) From f08bf220c83b85398f274ff8f032a48a387ac67c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Wed, 29 Jun 2016 23:26:30 +0200 Subject: [PATCH 03/26] Persist logbook --- config/default.py | 10 +- ogn/collect/logbook.py | 167 ++++++++++++++++++++++++++++++++- ogn/commands/logbook.py | 200 ++++++++++------------------------------ ogn/model/logbook.py | 34 +++++++ 4 files changed, 253 insertions(+), 158 deletions(-) create mode 100644 ogn/model/logbook.py diff --git a/config/default.py b/config/default.py index 88d6d7e..9742b01 100644 --- a/config/default.py +++ b/config/default.py @@ -11,10 +11,18 @@ CELERYBEAT_SCHEDULE = { 'task': 'ogn.collect.database.import_ddb', 'schedule': timedelta(minutes=15), }, - 'update-logbook': { + 'update-takeoff-and-landing': { 'task': 'ogn.collect.logbook.compute_takeoff_and_landing', 'schedule': timedelta(minutes=15), }, + 'update-logbook': { + 'task': 'ogn.collect.logbook.compute_logbook', + 'schedule': timedelta(minutes=1), + }, + 'update-altitudes': { + 'task': 'ogn.collect.logbook.compute_altitudes', + 'schedule': timedelta(minutes=1), + }, 'update-receiver-table': { 'task': 'ogn.collect.receiver.update_receivers', 'schedule': timedelta(minutes=15), diff --git a/ogn/collect/logbook.py b/ogn/collect/logbook.py index 1726b5d..23c2c80 100644 --- a/ogn/collect/logbook.py +++ b/ogn/collect/logbook.py @@ -3,11 +3,11 @@ from datetime import timedelta from celery.utils.log import get_task_logger from ogn.collect.celery import app -from sqlalchemy.sql import func -from sqlalchemy import and_, or_, insert, between -from sqlalchemy.sql.expression import case +from sqlalchemy.sql import func, null +from sqlalchemy import and_, or_, insert, update, between, exists +from sqlalchemy.sql.expression import case, true, false, label -from ogn.model import AircraftBeacon, TakeoffLanding, Airport +from ogn.model import AircraftBeacon, TakeoffLanding, Airport, Logbook logger = get_task_logger(__name__) @@ -122,3 +122,162 @@ def compute_takeoff_and_landing(): logger.debug("New takeoffs and landings: {}".format(counter)) return counter + + +@app.task +def compute_logbook(): + logger.info("Compute logbook.") + + or_args = [between(TakeoffLanding.timestamp, '2016-06-28 00:00:00', '2016-06-28 23:59:59')] + or_args = [] + + # 'wo' is the window order for the sql window function + wo = and_(func.date(TakeoffLanding.timestamp), + TakeoffLanding.device_id, + TakeoffLanding.timestamp) + + # make a query with current, previous and next "takeoff_landing" event, so we can find complete flights + sq = app.session.query( + TakeoffLanding.device_id, + func.lag(TakeoffLanding.device_id) + .over(order_by=wo).label('device_id_prev'), + func.lead(TakeoffLanding.device_id) + .over(order_by=wo).label('device_id_next'), + TakeoffLanding.timestamp, + func.lag(TakeoffLanding.timestamp) + .over(order_by=wo).label('timestamp_prev'), + func.lead(TakeoffLanding.timestamp) + .over(order_by=wo).label('timestamp_next'), + TakeoffLanding.track, + func.lag(TakeoffLanding.track) + .over(order_by=wo).label('track_prev'), + func.lead(TakeoffLanding.track) + .over(order_by=wo).label('track_next'), + TakeoffLanding.is_takeoff, + func.lag(TakeoffLanding.is_takeoff) + .over(order_by=wo).label('is_takeoff_prev'), + func.lead(TakeoffLanding.is_takeoff) + .over(order_by=wo).label('is_takeoff_next'), + TakeoffLanding.airport_id, + func.lag(TakeoffLanding.airport_id) + .over(order_by=wo).label('airport_id_prev'), + func.lead(TakeoffLanding.airport_id) + .over(order_by=wo).label('airport_id_next')) \ + .filter(*or_args) \ + .subquery() + + # find complete flights (with takeoff and landing on the same day) + complete_flight_query = app.session.query( + sq.c.timestamp.label('reftime'), + sq.c.device_id.label('device_id'), + sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), + sq.c.timestamp_next.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), + label('duration', sq.c.timestamp_next - sq.c.timestamp)) \ + .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ + .filter(sq.c.device_id == sq.c.device_id_next) \ + .filter(func.date(sq.c.timestamp_next) == func.date(sq.c.timestamp)) + + # split complete flights (with takeoff and landing on different days) into one takeoff and one landing + split_start_query = app.session.query( + sq.c.timestamp.label('reftime'), + sq.c.device_id.label('device_id'), + sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), + null().label('landing'), null().label('landing_track'), null().label('landing_airport_id'), + null().label('duration')) \ + .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ + .filter(sq.c.device_id == sq.c.device_id_next) \ + .filter(func.date(sq.c.timestamp_next) != func.date(sq.c.timestamp)) + + split_landing_query = app.session.query( + sq.c.timestamp_next.label('reftime'), + sq.c.device_id.label('device_id'), + null().label('takeoff'), null().label('takeoff_track'), null().label('takeoff_airport_id'), + sq.c.timestamp_next.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), + null().label('duration')) \ + .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ + .filter(sq.c.device_id == sq.c.device_id_next) \ + .filter(func.date(sq.c.timestamp_next) != func.date(sq.c.timestamp)) + + # find landings without start + only_landings_query = app.session.query( + sq.c.timestamp.label('reftime'), + sq.c.device_id.label('device_id'), + null().label('takeoff'), null().label('takeoff_track'), null().label('takeoff_airport_id'), + sq.c.timestamp.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), + null().label('duration')) \ + .filter(sq.c.is_takeoff == false()) \ + .filter(or_(sq.c.device_id != sq.c.device_id_prev, + sq.c.is_takeoff_prev == false())) + + # find starts without landing + only_starts_query = app.session.query( + sq.c.timestamp.label('reftime'), + sq.c.device_id.label('device_id'), + sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), + null().label('landing'), null().label('landing_track'), null().label('landing_airport_id'), + null().label('duration')) \ + .filter(sq.c.is_takeoff == true()) \ + .filter(or_(sq.c.device_id != sq.c.device_id_next, + sq.c.is_takeoff_next == true())) + + # unite all + union_query = complete_flight_query.union( + split_start_query, + split_landing_query, + only_landings_query, + only_starts_query) \ + .subquery() + + # consider only if not already stored + new_logbook_entries = app.session.query(union_query) \ + .filter(~exists().where( + and_(Logbook.reftime == union_query.c.reftime, + Logbook.device_id == union_query.c.device_id, + or_(Logbook.takeoff_airport_id == union_query.c.takeoff_airport_id, + and_(Logbook.takeoff_airport_id == null(), + union_query.c.takeoff_airport_id == null())), + or_(Logbook.landing_airport_id == union_query.c.landing_airport_id, + and_(Logbook.landing_airport_id == null(), + union_query.c.landing_airport_id == null()))))) + + # ... and save them + ins = insert(Logbook).from_select((Logbook.reftime, + Logbook.device_id, + Logbook.takeoff_timestamp, + Logbook.takeoff_track, + Logbook.takeoff_airport_id, + Logbook.landing_timestamp, + Logbook.landing_track, + Logbook.landing_airport_id, + Logbook.duration), + new_logbook_entries) + + result = app.session.execute(ins) + counter = result.rowcount + app.session.commit() + logger.debug("New logbook entries: {}".format(counter)) + + return counter + + +@app.task +def compute_altitudes(): + logger.info("Compute maximum altitudes.") + + altitude_query = app.session.query(Logbook.id, func.max(AircraftBeacon.altitude).label('max_altitude')) \ + .filter(and_(Logbook.takeoff_airport_id != null(), + Logbook.landing_airport_id != null())) \ + .filter(and_(between(AircraftBeacon.timestamp, Logbook.takeoff_timestamp, Logbook.landing_timestamp))) \ + .group_by(Logbook.id) \ + .subquery() + + upd = update(Logbook) \ + .values({'max_altitude': altitude_query.c.max_altitude}) \ + .where(Logbook.id == altitude_query.c.id) + + result = app.session.execute(upd) + counter = result.rowcount + app.session.commit() + logger.debug("Updated logbook entries: {}".format(counter)) + + return counter \ No newline at end of file diff --git a/ogn/commands/logbook.py b/ogn/commands/logbook.py index fa526ec..ab9450b 100644 --- a/ogn/commands/logbook.py +++ b/ogn/commands/logbook.py @@ -2,31 +2,47 @@ from datetime import timedelta, datetime -from sqlalchemy.sql import func, null +from sqlalchemy.sql import func from sqlalchemy import and_, or_ -from sqlalchemy.sql.expression import true, false, label from sqlalchemy.orm import aliased -from ogn.model import Device, DeviceInfo, TakeoffLanding, Airport +from ogn.model import Device, DeviceInfo, TakeoffLanding, Airport, Logbook from ogn.commands.dbutils import session -from ogn.collect.logbook import compute_takeoff_and_landing +from ogn.collect.logbook import compute_takeoff_and_landing, compute_logbook, compute_altitudes from manager import Manager manager = Manager() @manager.command -def compute(): +def compute_takeoff_landing(): """Compute takeoffs and landings.""" print("Compute takeoffs and landings...") result = compute_takeoff_and_landing.delay() counter = result.get() - print("New/recalculated takeoffs/landings: {}".format(counter)) + print("New takeoffs/landings: {}".format(counter)) + + +@manager.command +def compute_logbook(): + """Compute logbook.""" + print("Compute logbook...") + result = compute_logbook.delay() + counter = result.get() + print("New logbook entries: {}".format(counter)) + + +@manager.command +def compute_altitudes(): + """Compute maximum altitudes.""" + print("Compute maximum altitudes...") + result = compute_altitudes.delay() + counter = result.get() + print("Updated logbook entries: {}".format(counter)) @manager.arg('date', help='date (format: yyyy-mm-dd)') -@manager.arg('utc_delta_hours', help='delta hours to utc (for local time logs)') @manager.command def show(airport_name, utc_delta_hours=0, date=None): """Show a logbook for .""" @@ -38,138 +54,13 @@ def show(airport_name, utc_delta_hours=0, date=None): print('Airport "{}" not found.'.format(airport_name)) return - utc_timedelta = timedelta(hours=utc_delta_hours) or_args = [] if date is not None: date = datetime.strptime(date, "%Y-%m-%d") - or_args = [and_(TakeoffLanding.timestamp >= date + utc_timedelta, - TakeoffLanding.timestamp < date + timedelta(hours=24) + utc_timedelta)] + or_args = [and_(TakeoffLanding.timestamp >= date, + TakeoffLanding.timestamp < date + timedelta(hours=24))] - # make a query with current, previous and next "takeoff_landing" event, so we can find complete flights - sq = session.query( - TakeoffLanding.device_id, - func.lag(TakeoffLanding.device_id) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('device_id_prev'), - func.lead(TakeoffLanding.device_id) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('device_id_next'), - (TakeoffLanding.timestamp).label('timestamp'), - func.lag(TakeoffLanding.timestamp) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('timestamp_prev'), - func.lead(TakeoffLanding.timestamp) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('timestamp_next'), - TakeoffLanding.track, - func.lag(TakeoffLanding.track) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('track_prev'), - func.lead(TakeoffLanding.track) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('track_next'), - TakeoffLanding.is_takeoff, - func.lag(TakeoffLanding.is_takeoff) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('is_takeoff_prev'), - func.lead(TakeoffLanding.is_takeoff) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('is_takeoff_next'), - TakeoffLanding.airport_id, - func.lag(TakeoffLanding.airport_id) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('airport_id_prev'), - func.lead(TakeoffLanding.airport_id) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('airport_id_next')) \ - .filter(*or_args) \ - .subquery() - - # find complete flights (with takeoff and landing on the same day) - complete_flight_query = session.query(sq.c.timestamp.label('reftime'), - sq.c.device_id.label('device_id'), - sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), - sq.c.timestamp_next.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), - label('duration', sq.c.timestamp_next - sq.c.timestamp)) \ - .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ - .filter(sq.c.device_id == sq.c.device_id_next) \ - .filter(func.date(sq.c.timestamp_next + utc_timedelta) == func.date(sq.c.timestamp + utc_timedelta)) \ - .filter(or_(sq.c.airport_id == airport.id, - sq.c.airport_id_next == airport.id)) - - # split complete flights (with takeoff and landing on different days) into one takeoff and one landing - split_start_query = session.query(sq.c.timestamp.label('reftime'), - sq.c.device_id.label('device_id'), - sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), - null().label('landing'), null().label('landing_track'), null().label('landing_airport_id'), - null().label('duration')) \ - .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ - .filter(sq.c.device_id == sq.c.device_id_next) \ - .filter(func.date(sq.c.timestamp_next + utc_timedelta) != func.date(sq.c.timestamp + utc_timedelta)) \ - .filter(and_(sq.c.airport_id == airport.id, - sq.c.airport_id_next == airport.id)) - - split_landing_query = session.query(sq.c.timestamp_next.label('reftime'), - sq.c.device_id.label('device_id'), - null().label('takeoff'), null().label('takeoff_track'), null().label('takeoff_airport_id'), - sq.c.timestamp_next.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), - null().label('duration')) \ - .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ - .filter(sq.c.device_id == sq.c.device_id_next) \ - .filter(func.date(sq.c.timestamp_next + utc_timedelta) != func.date(sq.c.timestamp + utc_timedelta)) \ - .filter(and_(sq.c.airport_id == airport.id, - sq.c.airport_id_next == airport.id)) - - # find landings without start - only_landings_query = session.query(sq.c.timestamp.label('reftime'), - sq.c.device_id.label('device_id'), - null().label('takeoff'), null().label('takeoff_track'), null().label('takeoff_airport_id'), - sq.c.timestamp.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), - null().label('duration')) \ - .filter(sq.c.is_takeoff == false()) \ - .filter(or_(sq.c.device_id != sq.c.device_id_prev, - sq.c.is_takeoff_prev == false())) \ - .filter(sq.c.airport_id_next == airport.id) - - # find starts without landing - only_starts_query = session.query(sq.c.timestamp.label('reftime'), - sq.c.device_id.label('device_id'), - sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), - null().label('landing'), null().label('landing_track'), null().label('landing_airport_id'), - null().label('duration')) \ - .filter(sq.c.is_takeoff == true()) \ - .filter(or_(sq.c.device_id != sq.c.device_id_next, - sq.c.is_takeoff_next == true())) \ - .filter(sq.c.airport_id == airport.id) - - # unite all - union_query = complete_flight_query.union(split_start_query, - split_landing_query, - only_landings_query, - only_starts_query) \ - .subquery() - - # get aircraft and airport informations and sort all entries by the reference time + # get device info with highes priority sq2 = session.query(DeviceInfo.address, func.max(DeviceInfo.address_origin).label('address_origin')) \ .group_by(DeviceInfo.address) \ .subquery() @@ -178,24 +69,23 @@ def show(airport_name, utc_delta_hours=0, date=None): .filter(and_(DeviceInfo.address == sq2.c.address, DeviceInfo.address_origin == sq2.c.address_origin)) \ .subquery() + # get all logbook entries and add device and airport infos takeoff_airport = aliased(Airport, name='takeoff_airport') landing_airport = aliased(Airport, name='landing_airport') - logbook_query = session.query(union_query.c.reftime, - union_query.c.takeoff, - union_query.c.takeoff_track, + logbook_query = session.query(Logbook, takeoff_airport, - union_query.c.landing, - union_query.c.landing_track, landing_airport, - union_query.c.duration, Device, sq3.c.registration, sq3.c.aircraft) \ - .outerjoin(takeoff_airport, union_query.c.takeoff_airport_id == takeoff_airport.id) \ - .outerjoin(landing_airport, union_query.c.landing_airport_id == landing_airport.id) \ - .outerjoin(Device, union_query.c.device_id == Device.id) \ + .filter(or_(Logbook.takeoff_airport_id == airport.id, + Logbook.landing_airport_id == airport.id)) \ + .filter(*or_args) \ + .outerjoin(takeoff_airport, Logbook.takeoff_airport_id == takeoff_airport.id) \ + .outerjoin(landing_airport, Logbook.landing_airport_id == landing_airport.id) \ + .outerjoin(Device, Logbook.device_id == Device.id) \ .outerjoin(sq3, sq3.c.address == Device.address) \ - .order_by(union_query.c.reftime) + .order_by(Logbook.reftime) # ... and finally print out the logbook print('--- Logbook ({}) ---'.format(airport_name)) @@ -223,14 +113,18 @@ def show(airport_name, utc_delta_hours=0, date=None): else: return ('') - for [reftime, takeoff, takeoff_track, takeoff_airport, landing, landing_track, landing_airport, duration, device, registration, aircraft] in logbook_query.all(): - print('%10s %8s (%2s) %8s (%2s) %8s %8s %17s %20s' % ( - reftime.date(), - none_datetime_replacer(takeoff), - none_track_replacer(takeoff_track), - none_datetime_replacer(landing), - none_track_replacer(landing_track), - none_timedelta_replacer(duration), + def none_altitude_replacer(altitude_object, airport_object): + return "?" if altitude_object is None else "{:5d}m ({:+5d}m)".format(altitude_object, altitude_object - airport_object.altitude) + + for [logbook, takeoff_airport, landing_airport, device, registration, aircraft] in logbook_query.all(): + print('%10s %8s (%2s) %8s (%2s) %8s %15s %8s %17s %20s' % ( + logbook.reftime.date(), + none_datetime_replacer(logbook.takeoff_timestamp), + none_track_replacer(logbook.takeoff_track), + none_datetime_replacer(logbook.landing_timestamp), + none_track_replacer(logbook.landing_track), + none_timedelta_replacer(logbook.duration), + none_altitude_replacer(logbook.max_altitude, takeoff_airport), none_registration_replacer(device, registration), none_aircraft_replacer(device, aircraft), airport_marker(takeoff_airport, landing_airport))) diff --git a/ogn/model/logbook.py b/ogn/model/logbook.py new file mode 100644 index 0000000..c235fc9 --- /dev/null +++ b/ogn/model/logbook.py @@ -0,0 +1,34 @@ +from sqlalchemy import Integer, DateTime, Interval, Column, ForeignKey +from sqlalchemy.orm import relationship + +from .base import Base +from sqlalchemy.sql.schema import UniqueConstraint + + +class Logbook(Base): + __tablename__ = 'logbook' + __table_args__ = (UniqueConstraint('reftime', + 'takeoff_airport_id', + 'landing_airport_id', + 'device_id'), + ) + + id = Column(Integer, primary_key=True) + + reftime = Column(DateTime, index=True) + takeoff_timestamp = Column(DateTime) + takeoff_track = Column(Integer) + landing_timestamp = Column(DateTime) + landing_track = Column(Integer) + duration = Column(Interval) + max_altitude = Column(Integer) + + # Relations + takeoff_airport_id = Column(Integer, ForeignKey('airport.id', ondelete='CASCADE'), index=True) + takeoff_airport = relationship('Airport', foreign_keys=[takeoff_airport_id]) + + landing_airport_id = Column(Integer, ForeignKey('airport.id', ondelete='CASCADE'), index=True) + landing_airport = relationship('Airport', foreign_keys=[landing_airport_id]) + + device_id = Column(Integer, ForeignKey('device.id', ondelete='CASCADE'), index=True) + device = relationship('Device', foreign_keys=[device_id]) From 90911bf1b26562ae634d3d3610637a32eb34bdec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Thu, 30 Jun 2016 07:13:36 +0200 Subject: [PATCH 04/26] Fix: max altitude query --- ogn/collect/logbook.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/ogn/collect/logbook.py b/ogn/collect/logbook.py index 23c2c80..a37e4bf 100644 --- a/ogn/collect/logbook.py +++ b/ogn/collect/logbook.py @@ -264,16 +264,21 @@ def compute_logbook(): def compute_altitudes(): logger.info("Compute maximum altitudes.") - altitude_query = app.session.query(Logbook.id, func.max(AircraftBeacon.altitude).label('max_altitude')) \ + logbook_query = app.session.query(Logbook.id, Logbook.device_id, Logbook.takeoff_timestamp, Logbook.landing_timestamp) \ .filter(and_(Logbook.takeoff_airport_id != null(), Logbook.landing_airport_id != null())) \ - .filter(and_(between(AircraftBeacon.timestamp, Logbook.takeoff_timestamp, Logbook.landing_timestamp))) \ - .group_by(Logbook.id) \ + .limit(100) \ + .subquery() + + max_altitude_query = app.session.query(logbook_query.c.id, func.max(AircraftBeacon.altitude).label('max_altitude')) \ + .filter(and_(between(AircraftBeacon.timestamp, logbook_query.c.takeoff_timestamp, logbook_query.c.landing_timestamp), + AircraftBeacon.device_id == logbook_query.c.device_id)) \ + .group_by(logbook_query.c.id) \ .subquery() upd = update(Logbook) \ - .values({'max_altitude': altitude_query.c.max_altitude}) \ - .where(Logbook.id == altitude_query.c.id) + .values({'max_altitude': max_altitude_query.c.max_altitude}) \ + .where(Logbook.id == max_altitude_query.c.id) result = app.session.execute(upd) counter = result.rowcount From 49b8a0b81fdaba6b7006c6a9c7916fca2a3266cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Thu, 30 Jun 2016 20:31:24 +0200 Subject: [PATCH 05/26] Extract window parameters --- ogn/collect/logbook.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/ogn/collect/logbook.py b/ogn/collect/logbook.py index a37e4bf..8de49eb 100644 --- a/ogn/collect/logbook.py +++ b/ogn/collect/logbook.py @@ -48,27 +48,30 @@ def compute_takeoff_and_landing(): else: aircraft_beacon_id_start = last_used_aircraft_beacon_id[0] + 1 + # 'wo' is the window order for the sql window function + wo = and_(AircraftBeacon.device_id, AircraftBeacon.timestamp) + # make a query with current, previous and next position sq = app.session.query( AircraftBeacon.id, AircraftBeacon.timestamp, - func.lag(AircraftBeacon.timestamp).over(order_by=and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)).label('timestamp_prev'), - func.lead(AircraftBeacon.timestamp).over(order_by=and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)).label('timestamp_next'), + func.lag(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_prev'), + func.lead(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_next'), AircraftBeacon.location_wkt, - func.lag(AircraftBeacon.location_wkt).over(order_by=and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)).label('location_wkt_prev'), - func.lead(AircraftBeacon.location_wkt).over(order_by=and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)).label('location_wkt_next'), + func.lag(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_prev'), + func.lead(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_next'), AircraftBeacon.track, - func.lag(AircraftBeacon.track).over(order_by=and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)).label('track_prev'), - func.lead(AircraftBeacon.track).over(order_by=and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)).label('track_next'), + func.lag(AircraftBeacon.track).over(order_by=wo).label('track_prev'), + func.lead(AircraftBeacon.track).over(order_by=wo).label('track_next'), AircraftBeacon.ground_speed, - func.lag(AircraftBeacon.ground_speed).over(order_by=and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)).label('ground_speed_prev'), - func.lead(AircraftBeacon.ground_speed).over(order_by=and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)).label('ground_speed_next'), + func.lag(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_prev'), + func.lead(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_next'), AircraftBeacon.altitude, - func.lag(AircraftBeacon.altitude).over(order_by=and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)).label('altitude_prev'), - func.lead(AircraftBeacon.altitude).over(order_by=and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)).label('altitude_next'), + func.lag(AircraftBeacon.altitude).over(order_by=wo).label('altitude_prev'), + func.lead(AircraftBeacon.altitude).over(order_by=wo).label('altitude_next'), AircraftBeacon.device_id, - func.lag(AircraftBeacon.device_id).over(order_by=and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)).label('device_id_prev'), - func.lead(AircraftBeacon.device_id).over(order_by=and_(AircraftBeacon.device_id, AircraftBeacon.timestamp)).label('device_id_next')) \ + func.lag(AircraftBeacon.device_id).over(order_by=wo).label('device_id_prev'), + func.lead(AircraftBeacon.device_id).over(order_by=wo).label('device_id_next')) \ .filter(between(AircraftBeacon.id, aircraft_beacon_id_start, aircraft_beacon_id_start + max_id_offset)) \ .subquery() From d8d4cd5c41e87ad73e780d07898f29087fedacff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Thu, 30 Jun 2016 21:10:46 +0200 Subject: [PATCH 06/26] Refactoring --- ogn/collect/logbook.py | 87 ++++++++++++++++++++---------------------- 1 file changed, 41 insertions(+), 46 deletions(-) diff --git a/ogn/collect/logbook.py b/ogn/collect/logbook.py index 8de49eb..3d3c839 100644 --- a/ogn/collect/logbook.py +++ b/ogn/collect/logbook.py @@ -12,6 +12,29 @@ from ogn.model import AircraftBeacon, TakeoffLanding, Airport, Logbook logger = get_task_logger(__name__) +def get_aircraft_beacon_start_id(): + # returns the last AircraftBeacon used for TakeoffLanding + last_takeoff_landing_query = app.session.query(func.max(TakeoffLanding.id).label('max_id')) \ + .subquery() + + last_used_aircraft_beacon_query = app.session.query(AircraftBeacon.id) \ + .filter(TakeoffLanding.id == last_takeoff_landing_query.c.max_id) \ + .filter(and_(AircraftBeacon.timestamp == TakeoffLanding.timestamp, + AircraftBeacon.device_id == TakeoffLanding.device_id)) + + last_used_aircraft_beacon_id = last_used_aircraft_beacon_query.first() + if last_used_aircraft_beacon_id is None: + min_aircraft_beacon_id = app.session.query(func.min(AircraftBeacon.id)).first() + if min_aircraft_beacon_id is None: + return 0 + else: + start_id = min_aircraft_beacon_id[0] + else: + start_id = last_used_aircraft_beacon_id[0] + 1 + + return start_id + + @app.task def compute_takeoff_and_landing(): logger.info("Compute takeoffs and landings.") @@ -26,28 +49,10 @@ def compute_takeoff_and_landing(): airport_radius = 0.025 # takeoff / landing must not exceed this radius (degree!) around the airport airport_delta = 100 # takeoff / landing must not exceed this altitude offset above/below the airport - # max AircraftBeacon id offset computed per function call + # AircraftBeacon start id and max id offset + aircraft_beacon_start_id = get_aircraft_beacon_start_id() max_id_offset = 500000 - # get the last AircraftBeacon used for TakeoffLanding and start from there - last_takeoff_landing_query = app.session.query(func.max(TakeoffLanding.id).label('max_id')) \ - .subquery() - - last_used_aircraft_beacon_query = app.session.query(AircraftBeacon.id) \ - .filter(TakeoffLanding.id == last_takeoff_landing_query.c.max_id) \ - .filter(and_(AircraftBeacon.timestamp == TakeoffLanding.timestamp, - AircraftBeacon.device_id == TakeoffLanding.device_id)) - - last_used_aircraft_beacon_id = last_used_aircraft_beacon_query.first() - if last_used_aircraft_beacon_id is None: - min_aircraft_beacon_id = app.session.query(func.min(AircraftBeacon.id)).first() - if min_aircraft_beacon_id is None: - return 0 - else: - aircraft_beacon_id_start = min_aircraft_beacon_id[0] - else: - aircraft_beacon_id_start = last_used_aircraft_beacon_id[0] + 1 - # 'wo' is the window order for the sql window function wo = and_(AircraftBeacon.device_id, AircraftBeacon.timestamp) @@ -72,7 +77,7 @@ def compute_takeoff_and_landing(): AircraftBeacon.device_id, func.lag(AircraftBeacon.device_id).over(order_by=wo).label('device_id_prev'), func.lead(AircraftBeacon.device_id).over(order_by=wo).label('device_id_next')) \ - .filter(between(AircraftBeacon.id, aircraft_beacon_id_start, aircraft_beacon_id_start + max_id_offset)) \ + .filter(between(AircraftBeacon.id, aircraft_beacon_start_id, aircraft_beacon_start_id + max_id_offset)) \ .subquery() # find possible takeoffs and landings @@ -141,31 +146,21 @@ def compute_logbook(): # make a query with current, previous and next "takeoff_landing" event, so we can find complete flights sq = app.session.query( - TakeoffLanding.device_id, - func.lag(TakeoffLanding.device_id) - .over(order_by=wo).label('device_id_prev'), - func.lead(TakeoffLanding.device_id) - .over(order_by=wo).label('device_id_next'), - TakeoffLanding.timestamp, - func.lag(TakeoffLanding.timestamp) - .over(order_by=wo).label('timestamp_prev'), - func.lead(TakeoffLanding.timestamp) - .over(order_by=wo).label('timestamp_next'), - TakeoffLanding.track, - func.lag(TakeoffLanding.track) - .over(order_by=wo).label('track_prev'), - func.lead(TakeoffLanding.track) - .over(order_by=wo).label('track_next'), - TakeoffLanding.is_takeoff, - func.lag(TakeoffLanding.is_takeoff) - .over(order_by=wo).label('is_takeoff_prev'), - func.lead(TakeoffLanding.is_takeoff) - .over(order_by=wo).label('is_takeoff_next'), - TakeoffLanding.airport_id, - func.lag(TakeoffLanding.airport_id) - .over(order_by=wo).label('airport_id_prev'), - func.lead(TakeoffLanding.airport_id) - .over(order_by=wo).label('airport_id_next')) \ + TakeoffLanding.device_id, + func.lag(TakeoffLanding.device_id).over(order_by=wo).label('device_id_prev'), + func.lead(TakeoffLanding.device_id).over(order_by=wo).label('device_id_next'), + TakeoffLanding.timestamp, + func.lag(TakeoffLanding.timestamp).over(order_by=wo).label('timestamp_prev'), + func.lead(TakeoffLanding.timestamp).over(order_by=wo).label('timestamp_next'), + TakeoffLanding.track, + func.lag(TakeoffLanding.track).over(order_by=wo).label('track_prev'), + func.lead(TakeoffLanding.track).over(order_by=wo).label('track_next'), + TakeoffLanding.is_takeoff, + func.lag(TakeoffLanding.is_takeoff).over(order_by=wo).label('is_takeoff_prev'), + func.lead(TakeoffLanding.is_takeoff).over(order_by=wo).label('is_takeoff_next'), + TakeoffLanding.airport_id, + func.lag(TakeoffLanding.airport_id).over(order_by=wo).label('airport_id_prev'), + func.lead(TakeoffLanding.airport_id).over(order_by=wo).label('airport_id_next')) \ .filter(*or_args) \ .subquery() From f9c88a9cd7b72e7d7366c3f4df2f4a8af70f6228 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Thu, 30 Jun 2016 22:43:09 +0200 Subject: [PATCH 07/26] Update existing logbook entries --- ogn/collect/logbook.py | 44 ++++++++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/ogn/collect/logbook.py b/ogn/collect/logbook.py index 3d3c839..fb2a3ee 100644 --- a/ogn/collect/logbook.py +++ b/ogn/collect/logbook.py @@ -168,8 +168,8 @@ def compute_logbook(): complete_flight_query = app.session.query( sq.c.timestamp.label('reftime'), sq.c.device_id.label('device_id'), - sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), - sq.c.timestamp_next.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), + sq.c.timestamp.label('takeoff_timestamp'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), + sq.c.timestamp_next.label('landing_timestamp'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), label('duration', sq.c.timestamp_next - sq.c.timestamp)) \ .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ .filter(sq.c.device_id == sq.c.device_id_next) \ @@ -179,8 +179,8 @@ def compute_logbook(): split_start_query = app.session.query( sq.c.timestamp.label('reftime'), sq.c.device_id.label('device_id'), - sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), - null().label('landing'), null().label('landing_track'), null().label('landing_airport_id'), + sq.c.timestamp.label('takeoff_timestamp'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), + null().label('landing_timestamp'), null().label('landing_track'), null().label('landing_airport_id'), null().label('duration')) \ .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ .filter(sq.c.device_id == sq.c.device_id_next) \ @@ -189,8 +189,8 @@ def compute_logbook(): split_landing_query = app.session.query( sq.c.timestamp_next.label('reftime'), sq.c.device_id.label('device_id'), - null().label('takeoff'), null().label('takeoff_track'), null().label('takeoff_airport_id'), - sq.c.timestamp_next.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), + null().label('takeoff_timestamp'), null().label('takeoff_track'), null().label('takeoff_airport_id'), + sq.c.timestamp_next.label('landing_timestamp'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), null().label('duration')) \ .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ .filter(sq.c.device_id == sq.c.device_id_next) \ @@ -200,8 +200,8 @@ def compute_logbook(): only_landings_query = app.session.query( sq.c.timestamp.label('reftime'), sq.c.device_id.label('device_id'), - null().label('takeoff'), null().label('takeoff_track'), null().label('takeoff_airport_id'), - sq.c.timestamp.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), + null().label('takeoff_timestamp'), null().label('takeoff_track'), null().label('takeoff_airport_id'), + sq.c.timestamp.label('landing_timestamp'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), null().label('duration')) \ .filter(sq.c.is_takeoff == false()) \ .filter(or_(sq.c.device_id != sq.c.device_id_prev, @@ -211,13 +211,37 @@ def compute_logbook(): only_starts_query = app.session.query( sq.c.timestamp.label('reftime'), sq.c.device_id.label('device_id'), - sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), - null().label('landing'), null().label('landing_track'), null().label('landing_airport_id'), + sq.c.timestamp.label('takeoff_timestamp'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), + null().label('landing_timestamp'), null().label('landing_track'), null().label('landing_airport_id'), null().label('duration')) \ .filter(sq.c.is_takeoff == true()) \ .filter(or_(sq.c.device_id != sq.c.device_id_next, sq.c.is_takeoff_next == true())) + complete_flights = complete_flight_query.subquery() + + upd = update(Logbook) \ + .where(and_(Logbook.reftime == complete_flights.c.reftime, + Logbook.device_id == complete_flights.c.device_id, + or_(Logbook.takeoff_airport_id == complete_flights.c.takeoff_airport_id, + and_(Logbook.takeoff_airport_id == null(), + complete_flights.c.takeoff_airport_id == null())), + or_(Logbook.landing_airport_id == complete_flights.c.landing_airport_id, + and_(Logbook.landing_airport_id == null(), + complete_flights.c.landing_airport_id == null())))) \ + .values({"takeoff_timestamp": complete_flights.c.takeoff_timestamp, + "takeoff_track": complete_flights.c.takeoff_track, + "takeoff_airport_id": complete_flights.c.takeoff_airport_id, + "landing_timestamp": complete_flights.c.landing_timestamp, + "landing_track": complete_flights.c.landing_track, + "landing_airport_id": complete_flights.c.landing_airport_id, + "duration": complete_flights.c.duration}) + + result = app.session.execute(upd) + counter = result.rowcount + app.session.commit() + logger.debug("Updated logbook entries: {}".format(counter)) + # unite all union_query = complete_flight_query.union( split_start_query, From 5446ff6ffdc7c12199c6d04871c5d4167223ecf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Fri, 1 Jul 2016 07:00:17 +0200 Subject: [PATCH 08/26] Fix ddb import --- ogn/collect/database.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/ogn/collect/database.py b/ogn/collect/database.py index 40b7b47..2710d88 100644 --- a/ogn/collect/database.py +++ b/ogn/collect/database.py @@ -9,11 +9,14 @@ from ogn.collect.celery import app logger = get_task_logger(__name__) -def update_device_infos(session, address_origin, device_infos): +def delete_device_infos(session, address_origin): session.query(DeviceInfo) \ .filter(DeviceInfo.address_origin == address_origin) \ .delete() + session.commit() + +def update_device_infos(session, device_infos): session.bulk_save_objects(device_infos) session.commit() @@ -25,8 +28,10 @@ def import_ddb(): """Import registered devices from the DDB.""" logger.info("Import registered devices fom the DDB...") - counter = update_device_infos(app.session, AddressOrigin.ogn_ddb, - get_ddb()) + address_origin = AddressOrigin.ogn_ddb + + delete_device_infos(app.session, address_origin) + counter = update_device_infos(app.session, get_ddb(address_origin=address_origin)) logger.info("Imported {} devices.".format(counter)) @@ -35,6 +40,8 @@ def import_file(path='tests/custom_ddb.txt'): """Import registered devices from a local file.""" logger.info("Import registered devices from '{}'...".format(path)) - counter = update_device_infos(app.session, AddressOrigin.user_defined, - get_ddb(path)) + address_origin = AddressOrigin.user_defined + + delete_device_infos(app.session, address_origin) + counter = update_device_infos(app.session, get_ddb(csvfile=path, address_origin=address_origin)) logger.info("Imported {} devices.".format(counter)) From 89a3a659ac1983846a73a3907b74a78a6d71364d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Fri, 1 Jul 2016 07:19:31 +0200 Subject: [PATCH 09/26] Refactoring --- ogn/collect/logbook.py | 63 ++++++++++++----------------------------- ogn/commands/logbook.py | 13 ++------- setup.cfg | 2 +- 3 files changed, 21 insertions(+), 57 deletions(-) diff --git a/ogn/collect/logbook.py b/ogn/collect/logbook.py index fb2a3ee..e68c83e 100644 --- a/ogn/collect/logbook.py +++ b/ogn/collect/logbook.py @@ -133,7 +133,7 @@ def compute_takeoff_and_landing(): @app.task -def compute_logbook(): +def compute_logbook_entries(): logger.info("Compute logbook.") or_args = [between(TakeoffLanding.timestamp, '2016-06-28 00:00:00', '2016-06-28 23:59:59')] @@ -146,21 +146,21 @@ def compute_logbook(): # make a query with current, previous and next "takeoff_landing" event, so we can find complete flights sq = app.session.query( - TakeoffLanding.device_id, - func.lag(TakeoffLanding.device_id).over(order_by=wo).label('device_id_prev'), - func.lead(TakeoffLanding.device_id).over(order_by=wo).label('device_id_next'), - TakeoffLanding.timestamp, - func.lag(TakeoffLanding.timestamp).over(order_by=wo).label('timestamp_prev'), - func.lead(TakeoffLanding.timestamp).over(order_by=wo).label('timestamp_next'), - TakeoffLanding.track, - func.lag(TakeoffLanding.track).over(order_by=wo).label('track_prev'), - func.lead(TakeoffLanding.track).over(order_by=wo).label('track_next'), - TakeoffLanding.is_takeoff, - func.lag(TakeoffLanding.is_takeoff).over(order_by=wo).label('is_takeoff_prev'), - func.lead(TakeoffLanding.is_takeoff).over(order_by=wo).label('is_takeoff_next'), - TakeoffLanding.airport_id, - func.lag(TakeoffLanding.airport_id).over(order_by=wo).label('airport_id_prev'), - func.lead(TakeoffLanding.airport_id).over(order_by=wo).label('airport_id_next')) \ + TakeoffLanding.device_id, + func.lag(TakeoffLanding.device_id).over(order_by=wo).label('device_id_prev'), + func.lead(TakeoffLanding.device_id).over(order_by=wo).label('device_id_next'), + TakeoffLanding.timestamp, + func.lag(TakeoffLanding.timestamp).over(order_by=wo).label('timestamp_prev'), + func.lead(TakeoffLanding.timestamp).over(order_by=wo).label('timestamp_next'), + TakeoffLanding.track, + func.lag(TakeoffLanding.track).over(order_by=wo).label('track_prev'), + func.lead(TakeoffLanding.track).over(order_by=wo).label('track_next'), + TakeoffLanding.is_takeoff, + func.lag(TakeoffLanding.is_takeoff).over(order_by=wo).label('is_takeoff_prev'), + func.lead(TakeoffLanding.is_takeoff).over(order_by=wo).label('is_takeoff_next'), + TakeoffLanding.airport_id, + func.lag(TakeoffLanding.airport_id).over(order_by=wo).label('airport_id_prev'), + func.lead(TakeoffLanding.airport_id).over(order_by=wo).label('airport_id_next')) \ .filter(*or_args) \ .subquery() @@ -218,6 +218,7 @@ def compute_logbook(): .filter(or_(sq.c.device_id != sq.c.device_id_next, sq.c.is_takeoff_next == true())) + # update 'incomplete' logbook entries with 'complete flights' complete_flights = complete_flight_query.subquery() upd = update(Logbook) \ @@ -242,7 +243,7 @@ def compute_logbook(): app.session.commit() logger.debug("Updated logbook entries: {}".format(counter)) - # unite all + # unite all computated flights ('incomplete' and 'complete') union_query = complete_flight_query.union( split_start_query, split_landing_query, @@ -280,31 +281,3 @@ def compute_logbook(): logger.debug("New logbook entries: {}".format(counter)) return counter - - -@app.task -def compute_altitudes(): - logger.info("Compute maximum altitudes.") - - logbook_query = app.session.query(Logbook.id, Logbook.device_id, Logbook.takeoff_timestamp, Logbook.landing_timestamp) \ - .filter(and_(Logbook.takeoff_airport_id != null(), - Logbook.landing_airport_id != null())) \ - .limit(100) \ - .subquery() - - max_altitude_query = app.session.query(logbook_query.c.id, func.max(AircraftBeacon.altitude).label('max_altitude')) \ - .filter(and_(between(AircraftBeacon.timestamp, logbook_query.c.takeoff_timestamp, logbook_query.c.landing_timestamp), - AircraftBeacon.device_id == logbook_query.c.device_id)) \ - .group_by(logbook_query.c.id) \ - .subquery() - - upd = update(Logbook) \ - .values({'max_altitude': max_altitude_query.c.max_altitude}) \ - .where(Logbook.id == max_altitude_query.c.id) - - result = app.session.execute(upd) - counter = result.rowcount - app.session.commit() - logger.debug("Updated logbook entries: {}".format(counter)) - - return counter \ No newline at end of file diff --git a/ogn/commands/logbook.py b/ogn/commands/logbook.py index ab9450b..ec3887c 100644 --- a/ogn/commands/logbook.py +++ b/ogn/commands/logbook.py @@ -9,7 +9,7 @@ from sqlalchemy.orm import aliased from ogn.model import Device, DeviceInfo, TakeoffLanding, Airport, Logbook from ogn.commands.dbutils import session -from ogn.collect.logbook import compute_takeoff_and_landing, compute_logbook, compute_altitudes +from ogn.collect.logbook import compute_takeoff_and_landing, compute_logbook_entries from manager import Manager manager = Manager() @@ -28,20 +28,11 @@ def compute_takeoff_landing(): def compute_logbook(): """Compute logbook.""" print("Compute logbook...") - result = compute_logbook.delay() + result = compute_logbook_entries.delay() counter = result.get() print("New logbook entries: {}".format(counter)) -@manager.command -def compute_altitudes(): - """Compute maximum altitudes.""" - print("Compute maximum altitudes...") - result = compute_altitudes.delay() - counter = result.get() - print("Updated logbook entries: {}".format(counter)) - - @manager.arg('date', help='date (format: yyyy-mm-dd)') @manager.command def show(airport_name, utc_delta_hours=0, date=None): diff --git a/setup.cfg b/setup.cfg index e44b810..24c26dc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,2 @@ [flake8] -ignore = E501 +ignore = E501, E126 From 72d844557065dee1de0f70374b4bf08fc14d4966 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Sat, 2 Jul 2016 16:31:33 +0200 Subject: [PATCH 10/26] Query fixes --- ogn/collect/logbook.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/ogn/collect/logbook.py b/ogn/collect/logbook.py index e68c83e..a4f6aa4 100644 --- a/ogn/collect/logbook.py +++ b/ogn/collect/logbook.py @@ -26,7 +26,7 @@ def get_aircraft_beacon_start_id(): if last_used_aircraft_beacon_id is None: min_aircraft_beacon_id = app.session.query(func.min(AircraftBeacon.id)).first() if min_aircraft_beacon_id is None: - return 0 + start_id = 0 else: start_id = min_aircraft_beacon_id[0] else: @@ -201,11 +201,12 @@ def compute_logbook_entries(): sq.c.timestamp.label('reftime'), sq.c.device_id.label('device_id'), null().label('takeoff_timestamp'), null().label('takeoff_track'), null().label('takeoff_airport_id'), - sq.c.timestamp.label('landing_timestamp'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), + sq.c.timestamp.label('landing_timestamp'), sq.c.track.label('landing_track'), sq.c.airport_id.label('landing_airport_id'), null().label('duration')) \ .filter(sq.c.is_takeoff == false()) \ .filter(or_(sq.c.device_id != sq.c.device_id_prev, - sq.c.is_takeoff_prev == false())) + sq.c.is_takeoff_prev == false(), + sq.c.is_takeoff_prev == null())) # find starts without landing only_starts_query = app.session.query( @@ -216,20 +217,20 @@ def compute_logbook_entries(): null().label('duration')) \ .filter(sq.c.is_takeoff == true()) \ .filter(or_(sq.c.device_id != sq.c.device_id_next, - sq.c.is_takeoff_next == true())) + sq.c.is_takeoff_next == true(), + sq.c.is_takeoff_next == null())) # update 'incomplete' logbook entries with 'complete flights' complete_flights = complete_flight_query.subquery() upd = update(Logbook) \ - .where(and_(Logbook.reftime == complete_flights.c.reftime, - Logbook.device_id == complete_flights.c.device_id, - or_(Logbook.takeoff_airport_id == complete_flights.c.takeoff_airport_id, - and_(Logbook.takeoff_airport_id == null(), - complete_flights.c.takeoff_airport_id == null())), - or_(Logbook.landing_airport_id == complete_flights.c.landing_airport_id, - and_(Logbook.landing_airport_id == null(), - complete_flights.c.landing_airport_id == null())))) \ + .where(and_(Logbook.device_id == complete_flights.c.device_id, + or_(and_(Logbook.takeoff_airport_id == complete_flights.c.takeoff_airport_id, + Logbook.takeoff_timestamp == complete_flights.c.takeoff_timestamp), + Logbook.takeoff_airport_id == null()), + or_(and_(Logbook.landing_airport_id == complete_flights.c.landing_airport_id, + Logbook.landing_timestamp == complete_flights.c.landing_timestamp), + Logbook.landing_airport_id == null()))) \ .values({"takeoff_timestamp": complete_flights.c.takeoff_timestamp, "takeoff_track": complete_flights.c.takeoff_track, "takeoff_airport_id": complete_flights.c.takeoff_airport_id, From 6d1d68e9488e3c384fa2730cbd499490c2a26125 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Sat, 2 Jul 2016 17:59:57 +0200 Subject: [PATCH 11/26] Get airports by relationship --- ogn/commands/logbook.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/ogn/commands/logbook.py b/ogn/commands/logbook.py index ec3887c..8846179 100644 --- a/ogn/commands/logbook.py +++ b/ogn/commands/logbook.py @@ -64,8 +64,6 @@ def show(airport_name, utc_delta_hours=0, date=None): takeoff_airport = aliased(Airport, name='takeoff_airport') landing_airport = aliased(Airport, name='landing_airport') logbook_query = session.query(Logbook, - takeoff_airport, - landing_airport, Device, sq3.c.registration, sq3.c.aircraft) \ @@ -107,7 +105,7 @@ def show(airport_name, utc_delta_hours=0, date=None): def none_altitude_replacer(altitude_object, airport_object): return "?" if altitude_object is None else "{:5d}m ({:+5d}m)".format(altitude_object, altitude_object - airport_object.altitude) - for [logbook, takeoff_airport, landing_airport, device, registration, aircraft] in logbook_query.all(): + for [logbook, device, registration, aircraft] in logbook_query.all(): print('%10s %8s (%2s) %8s (%2s) %8s %15s %8s %17s %20s' % ( logbook.reftime.date(), none_datetime_replacer(logbook.takeoff_timestamp), @@ -115,7 +113,7 @@ def show(airport_name, utc_delta_hours=0, date=None): none_datetime_replacer(logbook.landing_timestamp), none_track_replacer(logbook.landing_track), none_timedelta_replacer(logbook.duration), - none_altitude_replacer(logbook.max_altitude, takeoff_airport), + none_altitude_replacer(logbook.max_altitude, logbook.takeoff_airport), none_registration_replacer(device, registration), none_aircraft_replacer(device, aircraft), - airport_marker(takeoff_airport, landing_airport))) + airport_marker(logbook.takeoff_airport, logbook.landing_airport))) From acd1128dc02b121edd03732081bf011259158c00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Sat, 2 Jul 2016 20:07:22 +0200 Subject: [PATCH 12/26] Split logbook and make it testable --- ogn/collect/logbook.py | 157 ++++----------------------------- ogn/collect/takeoff_landing.py | 135 ++++++++++++++++++++++++++++ ogn/commands/logbook.py | 3 +- 3 files changed, 156 insertions(+), 139 deletions(-) create mode 100644 ogn/collect/takeoff_landing.py diff --git a/ogn/collect/logbook.py b/ogn/collect/logbook.py index a4f6aa4..84a8759 100644 --- a/ogn/collect/logbook.py +++ b/ogn/collect/logbook.py @@ -1,141 +1,22 @@ -from datetime import timedelta - from celery.utils.log import get_task_logger -from ogn.collect.celery import app -from sqlalchemy.sql import func, null from sqlalchemy import and_, or_, insert, update, between, exists -from sqlalchemy.sql.expression import case, true, false, label +from sqlalchemy.sql import func, null +from sqlalchemy.sql.expression import true, false, label -from ogn.model import AircraftBeacon, TakeoffLanding, Airport, Logbook +from ogn.collect.celery import app +from ogn.model import TakeoffLanding, Logbook logger = get_task_logger(__name__) -def get_aircraft_beacon_start_id(): - # returns the last AircraftBeacon used for TakeoffLanding - last_takeoff_landing_query = app.session.query(func.max(TakeoffLanding.id).label('max_id')) \ - .subquery() - - last_used_aircraft_beacon_query = app.session.query(AircraftBeacon.id) \ - .filter(TakeoffLanding.id == last_takeoff_landing_query.c.max_id) \ - .filter(and_(AircraftBeacon.timestamp == TakeoffLanding.timestamp, - AircraftBeacon.device_id == TakeoffLanding.device_id)) - - last_used_aircraft_beacon_id = last_used_aircraft_beacon_query.first() - if last_used_aircraft_beacon_id is None: - min_aircraft_beacon_id = app.session.query(func.min(AircraftBeacon.id)).first() - if min_aircraft_beacon_id is None: - start_id = 0 - else: - start_id = min_aircraft_beacon_id[0] - else: - start_id = last_used_aircraft_beacon_id[0] + 1 - - return start_id - - @app.task -def compute_takeoff_and_landing(): - logger.info("Compute takeoffs and landings.") - - # takeoff / landing detection is based on 3 consecutive points - takeoff_speed = 55 # takeoff detection: 1st point below, 2nd and 3rd above this limit - landing_speed = 40 # landing detection: 1st point above, 2nd and 3rd below this limit - duration = 100 # the points must not exceed this duration - radius = 0.05 # the points must not exceed this radius (degree!) around the 2nd point - - # takeoff / landing has to be near an airport - airport_radius = 0.025 # takeoff / landing must not exceed this radius (degree!) around the airport - airport_delta = 100 # takeoff / landing must not exceed this altitude offset above/below the airport - - # AircraftBeacon start id and max id offset - aircraft_beacon_start_id = get_aircraft_beacon_start_id() - max_id_offset = 500000 - - # 'wo' is the window order for the sql window function - wo = and_(AircraftBeacon.device_id, AircraftBeacon.timestamp) - - # make a query with current, previous and next position - sq = app.session.query( - AircraftBeacon.id, - AircraftBeacon.timestamp, - func.lag(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_prev'), - func.lead(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_next'), - AircraftBeacon.location_wkt, - func.lag(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_prev'), - func.lead(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_next'), - AircraftBeacon.track, - func.lag(AircraftBeacon.track).over(order_by=wo).label('track_prev'), - func.lead(AircraftBeacon.track).over(order_by=wo).label('track_next'), - AircraftBeacon.ground_speed, - func.lag(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_prev'), - func.lead(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_next'), - AircraftBeacon.altitude, - func.lag(AircraftBeacon.altitude).over(order_by=wo).label('altitude_prev'), - func.lead(AircraftBeacon.altitude).over(order_by=wo).label('altitude_next'), - AircraftBeacon.device_id, - func.lag(AircraftBeacon.device_id).over(order_by=wo).label('device_id_prev'), - func.lead(AircraftBeacon.device_id).over(order_by=wo).label('device_id_next')) \ - .filter(between(AircraftBeacon.id, aircraft_beacon_start_id, aircraft_beacon_start_id + max_id_offset)) \ - .subquery() - - # find possible takeoffs and landings - sq2 = app.session.query( - sq.c.id, - sq.c.timestamp, - case([(sq.c.ground_speed > takeoff_speed, sq.c.location_wkt_prev), # on takeoff we take the location from the previous fix because it is nearer to the airport - (sq.c.ground_speed < landing_speed, sq.c.location)]).label('location'), - case([(sq.c.ground_speed > takeoff_speed, sq.c.track), - (sq.c.ground_speed < landing_speed, sq.c.track_prev)]).label('track'), # on landing we take the track from the previous fix because gliders tend to leave the runway quickly - sq.c.ground_speed, - sq.c.altitude, - case([(sq.c.ground_speed > takeoff_speed, True), - (sq.c.ground_speed < landing_speed, False)]).label('is_takeoff'), - sq.c.device_id) \ - .filter(sq.c.device_id_prev == sq.c.device_id == sq.c.device_id_next) \ - .filter(or_(and_(sq.c.ground_speed_prev < takeoff_speed, # takeoff - sq.c.ground_speed > takeoff_speed, - sq.c.ground_speed_next > takeoff_speed), - and_(sq.c.ground_speed_prev > landing_speed, # landing - sq.c.ground_speed < landing_speed, - sq.c.ground_speed_next < landing_speed))) \ - .filter(sq.c.timestamp_next - sq.c.timestamp_prev < timedelta(seconds=duration)) \ - .filter(and_(func.ST_DFullyWithin(sq.c.location, sq.c.location_wkt_prev, radius), - func.ST_DFullyWithin(sq.c.location, sq.c.location_wkt_next, radius))) \ - .subquery() - - # consider them if they are near a airport - takeoff_landing_query = app.session.query( - sq2.c.timestamp, - sq2.c.track, - sq2.c.is_takeoff, - sq2.c.device_id, - Airport.id) \ - .filter(and_(func.ST_DFullyWithin(sq2.c.location, Airport.location_wkt, airport_radius), - between(sq2.c.altitude, Airport.altitude - airport_delta, Airport.altitude + airport_delta))) \ - .filter(between(Airport.style, 2, 5)) \ - .order_by(sq2.c.id) - - # ... and save them - ins = insert(TakeoffLanding).from_select((TakeoffLanding.timestamp, - TakeoffLanding.track, - TakeoffLanding.is_takeoff, - TakeoffLanding.device_id, - TakeoffLanding.airport_id), - takeoff_landing_query) - result = app.session.execute(ins) - counter = result.rowcount - app.session.commit() - logger.debug("New takeoffs and landings: {}".format(counter)) - - return counter - - -@app.task -def compute_logbook_entries(): +def compute_logbook_entries(session=None): logger.info("Compute logbook.") + if session is None: + session = app.session + or_args = [between(TakeoffLanding.timestamp, '2016-06-28 00:00:00', '2016-06-28 23:59:59')] or_args = [] @@ -145,7 +26,7 @@ def compute_logbook_entries(): TakeoffLanding.timestamp) # make a query with current, previous and next "takeoff_landing" event, so we can find complete flights - sq = app.session.query( + sq = session.query( TakeoffLanding.device_id, func.lag(TakeoffLanding.device_id).over(order_by=wo).label('device_id_prev'), func.lead(TakeoffLanding.device_id).over(order_by=wo).label('device_id_next'), @@ -165,7 +46,7 @@ def compute_logbook_entries(): .subquery() # find complete flights (with takeoff and landing on the same day) - complete_flight_query = app.session.query( + complete_flight_query = session.query( sq.c.timestamp.label('reftime'), sq.c.device_id.label('device_id'), sq.c.timestamp.label('takeoff_timestamp'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), @@ -176,7 +57,7 @@ def compute_logbook_entries(): .filter(func.date(sq.c.timestamp_next) == func.date(sq.c.timestamp)) # split complete flights (with takeoff and landing on different days) into one takeoff and one landing - split_start_query = app.session.query( + split_start_query = session.query( sq.c.timestamp.label('reftime'), sq.c.device_id.label('device_id'), sq.c.timestamp.label('takeoff_timestamp'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), @@ -186,7 +67,7 @@ def compute_logbook_entries(): .filter(sq.c.device_id == sq.c.device_id_next) \ .filter(func.date(sq.c.timestamp_next) != func.date(sq.c.timestamp)) - split_landing_query = app.session.query( + split_landing_query = session.query( sq.c.timestamp_next.label('reftime'), sq.c.device_id.label('device_id'), null().label('takeoff_timestamp'), null().label('takeoff_track'), null().label('takeoff_airport_id'), @@ -197,7 +78,7 @@ def compute_logbook_entries(): .filter(func.date(sq.c.timestamp_next) != func.date(sq.c.timestamp)) # find landings without start - only_landings_query = app.session.query( + only_landings_query = session.query( sq.c.timestamp.label('reftime'), sq.c.device_id.label('device_id'), null().label('takeoff_timestamp'), null().label('takeoff_track'), null().label('takeoff_airport_id'), @@ -209,7 +90,7 @@ def compute_logbook_entries(): sq.c.is_takeoff_prev == null())) # find starts without landing - only_starts_query = app.session.query( + only_starts_query = session.query( sq.c.timestamp.label('reftime'), sq.c.device_id.label('device_id'), sq.c.timestamp.label('takeoff_timestamp'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), @@ -239,9 +120,9 @@ def compute_logbook_entries(): "landing_airport_id": complete_flights.c.landing_airport_id, "duration": complete_flights.c.duration}) - result = app.session.execute(upd) + result = session.execute(upd) counter = result.rowcount - app.session.commit() + session.commit() logger.debug("Updated logbook entries: {}".format(counter)) # unite all computated flights ('incomplete' and 'complete') @@ -253,7 +134,7 @@ def compute_logbook_entries(): .subquery() # consider only if not already stored - new_logbook_entries = app.session.query(union_query) \ + new_logbook_entries = session.query(union_query) \ .filter(~exists().where( and_(Logbook.reftime == union_query.c.reftime, Logbook.device_id == union_query.c.device_id, @@ -276,9 +157,9 @@ def compute_logbook_entries(): Logbook.duration), new_logbook_entries) - result = app.session.execute(ins) + result = session.execute(ins) counter = result.rowcount - app.session.commit() + session.commit() logger.debug("New logbook entries: {}".format(counter)) return counter diff --git a/ogn/collect/takeoff_landing.py b/ogn/collect/takeoff_landing.py new file mode 100644 index 0000000..c37db30 --- /dev/null +++ b/ogn/collect/takeoff_landing.py @@ -0,0 +1,135 @@ +from datetime import timedelta + +from celery.utils.log import get_task_logger + +from sqlalchemy import and_, or_, insert, between +from sqlalchemy.sql import func +from sqlalchemy.sql.expression import case + +from ogn.collect.celery import app +from ogn.model import AircraftBeacon, TakeoffLanding, Airport + +logger = get_task_logger(__name__) + + +def get_aircraft_beacon_start_id(session): + # returns the last AircraftBeacon used for TakeoffLanding + last_takeoff_landing_query = session.query(func.max(TakeoffLanding.id).label('max_id')) \ + .subquery() + + last_used_aircraft_beacon_query = session.query(AircraftBeacon.id) \ + .filter(TakeoffLanding.id == last_takeoff_landing_query.c.max_id) \ + .filter(and_(AircraftBeacon.timestamp == TakeoffLanding.timestamp, + AircraftBeacon.device_id == TakeoffLanding.device_id)) + + last_used_aircraft_beacon_id = last_used_aircraft_beacon_query.first() + if last_used_aircraft_beacon_id is None: + min_aircraft_beacon_id = session.query(func.min(AircraftBeacon.id)).first() + if min_aircraft_beacon_id is None: + start_id = 0 + else: + start_id = min_aircraft_beacon_id[0] + else: + start_id = last_used_aircraft_beacon_id[0] + 1 + + return start_id + + +@app.task +def compute_takeoff_and_landing(session=None): + logger.info("Compute takeoffs and landings.") + + if session is None: + session = app.session + + # takeoff / landing detection is based on 3 consecutive points + takeoff_speed = 55 # takeoff detection: 1st point below, 2nd and 3rd above this limit + landing_speed = 40 # landing detection: 1st point above, 2nd and 3rd below this limit + duration = 100 # the points must not exceed this duration + radius = 0.05 # the points must not exceed this radius (degree!) around the 2nd point + + # takeoff / landing has to be near an airport + airport_radius = 0.025 # takeoff / landing must not exceed this radius (degree!) around the airport + airport_delta = 100 # takeoff / landing must not exceed this altitude offset above/below the airport + + # AircraftBeacon start id and max id offset + aircraft_beacon_start_id = get_aircraft_beacon_start_id(session) + max_id_offset = 500000 + + # 'wo' is the window order for the sql window function + wo = and_(AircraftBeacon.device_id, AircraftBeacon.timestamp) + + # make a query with current, previous and next position + sq = session.query( + AircraftBeacon.id, + AircraftBeacon.timestamp, + func.lag(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_prev'), + func.lead(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_next'), + AircraftBeacon.location_wkt, + func.lag(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_prev'), + func.lead(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_next'), + AircraftBeacon.track, + func.lag(AircraftBeacon.track).over(order_by=wo).label('track_prev'), + func.lead(AircraftBeacon.track).over(order_by=wo).label('track_next'), + AircraftBeacon.ground_speed, + func.lag(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_prev'), + func.lead(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_next'), + AircraftBeacon.altitude, + func.lag(AircraftBeacon.altitude).over(order_by=wo).label('altitude_prev'), + func.lead(AircraftBeacon.altitude).over(order_by=wo).label('altitude_next'), + AircraftBeacon.device_id, + func.lag(AircraftBeacon.device_id).over(order_by=wo).label('device_id_prev'), + func.lead(AircraftBeacon.device_id).over(order_by=wo).label('device_id_next')) \ + .filter(between(AircraftBeacon.id, aircraft_beacon_start_id, aircraft_beacon_start_id + max_id_offset)) \ + .subquery() + + # find possible takeoffs and landings + sq2 = session.query( + sq.c.id, + sq.c.timestamp, + case([(sq.c.ground_speed > takeoff_speed, sq.c.location_wkt_prev), # on takeoff we take the location from the previous fix because it is nearer to the airport + (sq.c.ground_speed < landing_speed, sq.c.location)]).label('location'), + case([(sq.c.ground_speed > takeoff_speed, sq.c.track), + (sq.c.ground_speed < landing_speed, sq.c.track_prev)]).label('track'), # on landing we take the track from the previous fix because gliders tend to leave the runway quickly + sq.c.ground_speed, + sq.c.altitude, + case([(sq.c.ground_speed > takeoff_speed, True), + (sq.c.ground_speed < landing_speed, False)]).label('is_takeoff'), + sq.c.device_id) \ + .filter(sq.c.device_id_prev == sq.c.device_id == sq.c.device_id_next) \ + .filter(or_(and_(sq.c.ground_speed_prev < takeoff_speed, # takeoff + sq.c.ground_speed > takeoff_speed, + sq.c.ground_speed_next > takeoff_speed), + and_(sq.c.ground_speed_prev > landing_speed, # landing + sq.c.ground_speed < landing_speed, + sq.c.ground_speed_next < landing_speed))) \ + .filter(sq.c.timestamp_next - sq.c.timestamp_prev < timedelta(seconds=duration)) \ + .filter(and_(func.ST_DFullyWithin(sq.c.location, sq.c.location_wkt_prev, radius), + func.ST_DFullyWithin(sq.c.location, sq.c.location_wkt_next, radius))) \ + .subquery() + + # consider them if they are near a airport + takeoff_landing_query = session.query( + sq2.c.timestamp, + sq2.c.track, + sq2.c.is_takeoff, + sq2.c.device_id, + Airport.id) \ + .filter(and_(func.ST_DFullyWithin(sq2.c.location, Airport.location_wkt, airport_radius), + between(sq2.c.altitude, Airport.altitude - airport_delta, Airport.altitude + airport_delta))) \ + .filter(between(Airport.style, 2, 5)) \ + .order_by(sq2.c.id) + + # ... and save them + ins = insert(TakeoffLanding).from_select((TakeoffLanding.timestamp, + TakeoffLanding.track, + TakeoffLanding.is_takeoff, + TakeoffLanding.device_id, + TakeoffLanding.airport_id), + takeoff_landing_query) + result = session.execute(ins) + counter = result.rowcount + session.commit() + logger.debug("New takeoffs and landings: {}".format(counter)) + + return counter diff --git a/ogn/commands/logbook.py b/ogn/commands/logbook.py index 8846179..bc92e4f 100644 --- a/ogn/commands/logbook.py +++ b/ogn/commands/logbook.py @@ -9,7 +9,8 @@ from sqlalchemy.orm import aliased from ogn.model import Device, DeviceInfo, TakeoffLanding, Airport, Logbook from ogn.commands.dbutils import session -from ogn.collect.logbook import compute_takeoff_and_landing, compute_logbook_entries +from ogn.collect.takeoff_landing import compute_takeoff_and_landing +from ogn.collect.logbook import compute_logbook_entries from manager import Manager manager = Manager() From 910a8788d4ed0a740b589ac454739cc1ba37f00d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Sat, 2 Jul 2016 20:18:20 +0200 Subject: [PATCH 13/26] Remove logbook constraint --- ogn/model/__init__.py | 1 + ogn/model/logbook.py | 6 ------ 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/ogn/model/__init__.py b/ogn/model/__init__.py index add61fa..e83b673 100644 --- a/ogn/model/__init__.py +++ b/ogn/model/__init__.py @@ -10,5 +10,6 @@ from .receiver_beacon import ReceiverBeacon from .receiver import Receiver from .takeoff_landing import TakeoffLanding from .airport import Airport +from .logbook import Logbook from .geo import Location diff --git a/ogn/model/logbook.py b/ogn/model/logbook.py index c235fc9..4902dcf 100644 --- a/ogn/model/logbook.py +++ b/ogn/model/logbook.py @@ -2,16 +2,10 @@ from sqlalchemy import Integer, DateTime, Interval, Column, ForeignKey from sqlalchemy.orm import relationship from .base import Base -from sqlalchemy.sql.schema import UniqueConstraint class Logbook(Base): __tablename__ = 'logbook' - __table_args__ = (UniqueConstraint('reftime', - 'takeoff_airport_id', - 'landing_airport_id', - 'device_id'), - ) id = Column(Integer, primary_key=True) From 9d3fea118b3b7c7933ca86eddac7ef1d969a8bd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Sat, 2 Jul 2016 20:20:04 +0200 Subject: [PATCH 14/26] Test logbook --- .travis.yml | 5 ++ config/test.py | 9 +++ tests/collect/__init__.py | 0 tests/collect/test_logbook.py | 115 ++++++++++++++++++++++++++++++++++ 4 files changed, 129 insertions(+) create mode 100644 config/test.py create mode 100644 tests/collect/__init__.py create mode 100644 tests/collect/test_logbook.py diff --git a/.travis.yml b/.travis.yml index 8fb2ef3..b1433cc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,8 +3,13 @@ language: python python: - 3.4 +services: + - postgresql + before_script: - flake8 tests ogn + - psql -c 'CREATE DATABASE ogn_test;' -U postgres + - psql -c 'CREATE EXTENSION postgis;' -U postgres -d travis_postgis script: - nosetests --with-coverage --cover-package=ogn diff --git a/config/test.py b/config/test.py new file mode 100644 index 0000000..b11fa3e --- /dev/null +++ b/config/test.py @@ -0,0 +1,9 @@ +SQLALCHEMY_DATABASE_URI = 'postgresql://postgres@localhost:5432/ogn_test' + +BROKER_URL = 'redis://localhost:6379/0' +CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' + + +CELERYBEAT_SCHEDULE = {} + +CELERY_TIMEZONE = 'UTC' diff --git a/tests/collect/__init__.py b/tests/collect/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/collect/test_logbook.py b/tests/collect/test_logbook.py new file mode 100644 index 0000000..e248c0a --- /dev/null +++ b/tests/collect/test_logbook.py @@ -0,0 +1,115 @@ +import unittest +import os + +from ogn.model import Logbook + +from ogn.collect.logbook import compute_logbook_entries + + +class TestDB(unittest.TestCase): + session = None + engine = None + app = None + + def setUp(self): + os.environ['OGN_CONFIG_MODULE'] = 'config.test' + from ogn.commands.dbutils import engine, session + self.session = session + self.engine = engine + + session.execute("INSERT INTO device(address) VALUES ('DD0815'), ('DD4711')") + session.execute("INSERT INTO airport(name) VALUES ('Koenigsdorf'), ('Ohlstadt')") + + def tearDown(self): + session = self.session + session.execute("DELETE FROM takeoff_landing") + session.execute("DELETE FROM logbook") + session.execute("DELETE FROM device") + session.execute("DELETE FROM airport") + session.commit() + pass + + def count_logbook_entries(self): + session = self.session + logbook_query = session.query(Logbook) + i = 0 + for logbook in logbook_query.all(): + i = i + 1 + print("{} {} {} {} {} {}".format(logbook.id, logbook.device_id, logbook.takeoff_airport_id, logbook.takeoff_timestamp, logbook.landing_airport_id, logbook.landing_timestamp)) + + return i + + def test_single_takeoff(self): + session = self.session + + session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.commit() + + compute_logbook_entries(session) + self.assertEqual(self.count_logbook_entries(), 1) + + def test_single_landing(self): + session = self.session + + session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', FALSE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.commit() + + compute_logbook_entries(session) + self.assertEqual(self.count_logbook_entries(), 1) + + def test_different_takeoffs(self): + session = self.session + + session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Ohlstadt' and d.address = 'DD4711'") + session.commit() + + compute_logbook_entries(session) + self.assertEqual(self.count_logbook_entries(), 2) + + def test_takeoff_and_landing(self): + session = self.session + + session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:05:00', FALSE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.commit() + + compute_logbook_entries(session) + self.assertEqual(self.count_logbook_entries(), 1) + + def test_takeoff_and_landing_on_different_days(self): + session = self.session + + session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-02 10:05:00', FALSE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.commit() + + compute_logbook_entries(session) + self.assertEqual(self.count_logbook_entries(), 2) + + def test_update(self): + session = self.session + + session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.commit() + compute_logbook_entries(session) + session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:05:00', FALSE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.commit() + compute_logbook_entries(session) + + self.assertEqual(self.count_logbook_entries(), 1) + + def test_update_wrong_order(self): + session = self.session + + session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:05:00', FALSE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.commit() + compute_logbook_entries(session) + session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.commit() + compute_logbook_entries(session) + + self.assertEqual(self.count_logbook_entries(), 1) + +if __name__ == '__main__': + unittest.main() From 8269871c5550497db30f87ad0e2a3cc562efe3f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Sat, 2 Jul 2016 20:22:30 +0200 Subject: [PATCH 15/26] Corrected db name --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index b1433cc..7209442 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,7 +9,7 @@ services: before_script: - flake8 tests ogn - psql -c 'CREATE DATABASE ogn_test;' -U postgres - - psql -c 'CREATE EXTENSION postgis;' -U postgres -d travis_postgis + - psql -c 'CREATE EXTENSION postgis;' -U postgres -d ogn_test script: - nosetests --with-coverage --cover-package=ogn From b41a4e08f001cf68fa315e0b3752805c7affc607 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Sat, 2 Jul 2016 20:36:29 +0200 Subject: [PATCH 16/26] Set environment variable --- .travis.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.travis.yml b/.travis.yml index 7209442..dc0fcc3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,8 @@ language: python +env: + - OGN_CONFIG_MODULE='config.test' + python: - 3.4 From c84b44d1aac0b4138284fab819f3faaf71d180c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Sat, 2 Jul 2016 20:47:06 +0200 Subject: [PATCH 17/26] Create db --- tests/collect/test_logbook.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/collect/test_logbook.py b/tests/collect/test_logbook.py index e248c0a..2b93a61 100644 --- a/tests/collect/test_logbook.py +++ b/tests/collect/test_logbook.py @@ -17,6 +17,9 @@ class TestDB(unittest.TestCase): self.session = session self.engine = engine + from ogn.commands.database import init + init() + session.execute("INSERT INTO device(address) VALUES ('DD0815'), ('DD4711')") session.execute("INSERT INTO airport(name) VALUES ('Koenigsdorf'), ('Ohlstadt')") @@ -99,6 +102,7 @@ class TestDB(unittest.TestCase): self.assertEqual(self.count_logbook_entries(), 1) + @unittest.skip("Doesnt work... dont know why. Fix it!") def test_update_wrong_order(self): session = self.session From b1642173290eafe402eb58bf4c12a4a898bfc327 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Sat, 2 Jul 2016 22:30:54 +0200 Subject: [PATCH 18/26] Test takeoff_landing computation --- tests/collect/test_takeoff_landing.py | 104 ++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 tests/collect/test_takeoff_landing.py diff --git a/tests/collect/test_takeoff_landing.py b/tests/collect/test_takeoff_landing.py new file mode 100644 index 0000000..b0963af --- /dev/null +++ b/tests/collect/test_takeoff_landing.py @@ -0,0 +1,104 @@ +import unittest +import os + +from ogn.model import TakeoffLanding + +from ogn.collect.takeoff_landing import get_aircraft_beacon_start_id, compute_takeoff_and_landing + + +class TestDB(unittest.TestCase): + session = None + engine = None + app = None + + def setUp(self): + os.environ['OGN_CONFIG_MODULE'] = 'config.test' + from ogn.commands.dbutils import engine, session + self.session = session + self.engine = engine + + from ogn.commands.database import init + init() + + session.execute("INSERT INTO airport(name, location, altitude, style) VALUES('Benediktbeuren','0101000020E6100000D5E76A2BF6C72640D4063A6DA0DB4740',609,4)") + session.execute("INSERT INTO airport(name, location, altitude, style) VALUES('Koenigsdorf','0101000020E610000061E8FED7A6EE26407F20661C10EA4740',600,5)") + session.execute("INSERT INTO airport(name, location, altitude, style) VALUES('Ohlstadt','0101000020E6100000057E678EBF772640A142883E32D44740',655,5)") + + session.execute("INSERT INTO device(address) VALUES('DDEFF7')") + + def tearDown(self): + session = self.session + session.execute("DELETE FROM takeoff_landing") + session.execute("DELETE FROM aircraft_beacon") + session.commit() + pass + + def count_takeoff_and_landings(self): + session = self.session + query = session.query(TakeoffLanding) + i = 0 + for takeoff_landing in query.all(): + i = i + 1 + print("{} {} {} {} {} {}".format(takeoff_landing.id, takeoff_landing.device_id, takeoff_landing.airport_id, takeoff_landing.timestamp, takeoff_landing.is_takeoff, takeoff_landing.track)) + + return i + + def test_broken_rope(self): + session = self.session + + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000009668B61829F12640330E0887F1E94740',604,'2016-07-02 10:47:12',0,0,0,0)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000009668B61829F12640330E0887F1E94740',605,'2016-07-02 10:47:32',0,0,-0.096520193,0)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000009668B61829F12640330E0887F1E94740',606,'2016-07-02 10:47:52',0,0,-0.096520193,0)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000009668B61829F12640330E0887F1E94740',606,'2016-07-02 10:48:12',0,0,-0.096520193,0)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000001B2FDD2406F12640E53C762AF3E94740',606,'2016-07-02 10:48:24',284,51.85598112,0.299720599,0.1)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000F594AFDEBBF02640623583E5F5E94740',610,'2016-07-02 10:48:26',282,88.89596764,4.729489459,-0.2)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000001C0DE02D90F026401564F188F7E94740',619,'2016-07-02 10:48:27',281,94.45196562,10.66294133,-0.3)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000ABF1D24D62F02640E12D90A0F8E94740',632,'2016-07-02 10:48:28',278,88.89596764,15.59055118,-0.7)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E610000069FD40CC38F02640C7925F2CF9E94740',650,'2016-07-02 10:48:29',273,83.33996966,18.90779782,-0.7)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000002709AF4A0FF02640C7925F2CF9E94740',670,'2016-07-02 10:48:30',272,79.63597101,20.72136144,-0.3)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000007AA85AF8E7EF2640C7925F2CF9E94740',691,'2016-07-02 10:48:31',269,79.63597101,21.02108204,-0.4)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E610000068DB43D5C2EF2640E12D90A0F8E94740',712,'2016-07-02 10:48:32',267,74.07997303,21.62560325,-0.5)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000EDA16AE19FEF2640FBC8C014F8E94740',728,'2016-07-02 10:48:33',266,68.52397506,12.36982474,-0.1)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000000AFCCE1C7FEF26401564F188F7E94740',733,'2016-07-02 10:48:34',266,68.52397506,2.21488443,0)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000275633585EEF26402FFF21FDF6E94740',731,'2016-07-02 10:48:35',267,68.52397506,-3.916687833,0.2)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E610000015891C3539EF26402FFF21FDF6E94740',726,'2016-07-02 10:48:36',270,74.07997303,-6.329692659,1.1)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000E63FA4DFBEEE264078C1CDCFFAE94740',712,'2016-07-02 10:48:39',280,88.89596764,-2.611125222,0)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000004FF9EABD0BEE2640448B6CE7FBE94740',706,'2016-07-02 10:48:43',256,90.74796697,-0.198120396,-2.5)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E610000046B921B3A0ED264003E78C28EDE94740',706,'2016-07-02 10:48:46',218,92.59996629,-0.198120396,-1.6)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E610000005C58F3177ED2640900C4C81DFE94740',703,'2016-07-02 10:48:48',202,96.30396495,-1.402082804,-1)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000211FF46C56ED26402650D7EDC6E94740',702,'2016-07-02 10:48:51',188,100.0079636,0.502921006,-1)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000806DEA295FED2640347D898BB6E94740',704,'2016-07-02 10:48:53',166,100.0079636,0.802641605,-2)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000337D898BB6ED26401383C0CAA1E94740',703,'2016-07-02 10:48:56',133,101.8599629,-1.803403607,-1.7)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000000C05593CE2ED2640FDF675E09CE94740',700,'2016-07-02 10:48:57',123,103.7119622,-2.611125222,-1.4)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000F0CCF1F778EE26409FA87F2394E94740',693,'2016-07-02 10:49:00',105,111.1199596,-2.809245618,-0.6)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000C9073D9B55EF2640BD5296218EE94740',687,'2016-07-02 10:49:04',97,112.9719589,-1.605283211,-0.1)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000006F8104C5EF26400C24287E8CE94740',682,'2016-07-02 10:49:06',97,114.8239582,-2.407924816,-0.2)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000A0648535A8F02640F597DD9387E94740',676,'2016-07-02 10:49:10',97,118.5279569,-1.402082804,0.1)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000D70FC48C03F22640621386EE7FE94740',672,'2016-07-02 10:49:16',97,116.6759575,-1.000762002,0)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000A72C431CEBF22640CB7F48BF7DE94740',666,'2016-07-02 10:49:20',84,114.8239582,-1.605283211,-1.5)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000BFCAA145B6F32640BD5296218EE94740',662,'2016-07-02 10:49:24',49,111.1199596,-1.203962408,-1.5)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E610000074DA40A70DF4264077E09C11A5E94740',659,'2016-07-02 10:49:27',23,107.4159609,-1.402082804,-1.4)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000009AE3EFF11CF42640347D898BB6E94740',656,'2016-07-02 10:49:29',4,101.8599629,-0.797561595,-1.8)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E610000074DA40A70DF426402650D7EDC6E94740',654,'2016-07-02 10:49:31',347,101.8599629,-1.706883414,-1)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000156A4DF38EF3264086EE7F6DEAE94740',649,'2016-07-02 10:49:36',312,98.15596427,-1.503683007,-1.4)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000FAEDEBC039F32640E53C762AF3E94740',644,'2016-07-02 10:49:38',295,96.30396495,-3.012446025,-1.2)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000B04A0F30E0F22640FBC8C014F8E94740',635,'2016-07-02 10:49:40',284,94.45196562,-5.125730251,-0.7)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000F38B25BF58F22640448B6CE7FBE94740',623,'2016-07-02 10:49:43',279,92.59996629,-2.809245618,0)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000A5E8482EFFF12640DC1EAA16FEE94740',617,'2016-07-02 10:49:45',279,88.89596764,-3.312166624,0)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000009F17012859F12640F0AAF40003EA4740',607,'2016-07-02 10:49:49',279,81.48797034,-1.300482601,0)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000004B5658830AF12640873E323005EA4740',607,'2016-07-02 10:49:51',278,74.07997303,-0.294640589,-0.1)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000A0648535A8F0264006373FEB07EA4740',605,'2016-07-02 10:49:54',280,61.11597775,-0.096520193,0.5)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E6100000C74B378941F02640E88C28ED0DEA4740',604,'2016-07-02 10:49:58',292,48.15198247,0.101600203,0.4)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000001B5A643BDFEF264045DB1EAA16EA4740',604,'2016-07-02 10:50:04',302,25.92799056,0.203200406,0)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E610000042D2948AB3EF264074029A081BEA4740',604,'2016-07-02 10:50:10',300,5.555997978,0.101600203,0)") + session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E610000013AB192CAFEF264074029A081BEA4740',603,'2016-07-02 10:50:16',0,0,-0.096520193,0)") + session.execute("UPDATE aircraft_beacon SET device_id = d.id FROM device d WHERE d.address='DDEFF7'") + session.commit() + + print(get_aircraft_beacon_start_id(session)) + + compute_takeoff_and_landing(session) + self.assertEqual(self.count_takeoff_and_landings(), 2) + +if __name__ == '__main__': + unittest.main() From 634e196f1e8721955793a7483dc3b7ec3a25ad9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Sun, 3 Jul 2016 09:25:40 +0200 Subject: [PATCH 19/26] Fixed task settings --- ogn/collect/celery.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ogn/collect/celery.py b/ogn/collect/celery.py index 859b157..c49509c 100644 --- a/ogn/collect/celery.py +++ b/ogn/collect/celery.py @@ -27,6 +27,7 @@ def close_db(signal, sender): app = Celery('ogn.collect', include=["ogn.collect.database", "ogn.collect.logbook", + "ogn.collect.takeoff_landing", "ogn.collect.receiver" ]) From acd606aa2e5bc9bd02384e08e43b696ee0aff0b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Sun, 3 Jul 2016 09:56:02 +0200 Subject: [PATCH 20/26] Calculate end id --- ogn/collect/takeoff_landing.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ogn/collect/takeoff_landing.py b/ogn/collect/takeoff_landing.py index c37db30..110960d 100644 --- a/ogn/collect/takeoff_landing.py +++ b/ogn/collect/takeoff_landing.py @@ -52,9 +52,9 @@ def compute_takeoff_and_landing(session=None): airport_radius = 0.025 # takeoff / landing must not exceed this radius (degree!) around the airport airport_delta = 100 # takeoff / landing must not exceed this altitude offset above/below the airport - # AircraftBeacon start id and max id offset + # AircraftBeacon start id and end id aircraft_beacon_start_id = get_aircraft_beacon_start_id(session) - max_id_offset = 500000 + aircraft_beacon_end_id = aircraft_beacon_start_id + 500000 # 'wo' is the window order for the sql window function wo = and_(AircraftBeacon.device_id, AircraftBeacon.timestamp) @@ -80,7 +80,7 @@ def compute_takeoff_and_landing(session=None): AircraftBeacon.device_id, func.lag(AircraftBeacon.device_id).over(order_by=wo).label('device_id_prev'), func.lead(AircraftBeacon.device_id).over(order_by=wo).label('device_id_next')) \ - .filter(between(AircraftBeacon.id, aircraft_beacon_start_id, aircraft_beacon_start_id + max_id_offset)) \ + .filter(between(AircraftBeacon.id, aircraft_beacon_start_id, aircraft_beacon_end_id)) \ .subquery() # find possible takeoffs and landings From d857f885125cf9bf8ef4697fe55e9ef95928c685 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Mon, 4 Jul 2016 22:54:37 +0200 Subject: [PATCH 21/26] Fixed tests... --- ogn/collect/logbook.py | 32 +++++++------ tests/collect/test_logbook.py | 86 +++++++++++++++++++---------------- 2 files changed, 63 insertions(+), 55 deletions(-) diff --git a/ogn/collect/logbook.py b/ogn/collect/logbook.py index 84a8759..7ae6e0b 100644 --- a/ogn/collect/logbook.py +++ b/ogn/collect/logbook.py @@ -107,23 +107,24 @@ def compute_logbook_entries(session=None): upd = update(Logbook) \ .where(and_(Logbook.device_id == complete_flights.c.device_id, or_(and_(Logbook.takeoff_airport_id == complete_flights.c.takeoff_airport_id, - Logbook.takeoff_timestamp == complete_flights.c.takeoff_timestamp), - Logbook.takeoff_airport_id == null()), - or_(and_(Logbook.landing_airport_id == complete_flights.c.landing_airport_id, - Logbook.landing_timestamp == complete_flights.c.landing_timestamp), - Logbook.landing_airport_id == null()))) \ + Logbook.takeoff_timestamp == complete_flights.c.takeoff_timestamp, + Logbook.landing_airport_id == null()), + and_(Logbook.takeoff_airport_id == null(), + Logbook.landing_airport_id == complete_flights.c.landing_airport_id, + Logbook.landing_timestamp == complete_flights.c.landing_timestamp)))) \ .values({"takeoff_timestamp": complete_flights.c.takeoff_timestamp, "takeoff_track": complete_flights.c.takeoff_track, "takeoff_airport_id": complete_flights.c.takeoff_airport_id, "landing_timestamp": complete_flights.c.landing_timestamp, "landing_track": complete_flights.c.landing_track, "landing_airport_id": complete_flights.c.landing_airport_id, - "duration": complete_flights.c.duration}) + "duration": complete_flights.c.duration, + "max_altitude": 1}) result = session.execute(upd) - counter = result.rowcount + update_counter = result.rowcount session.commit() - logger.debug("Updated logbook entries: {}".format(counter)) + logger.debug("Updated logbook entries: {}".format(update_counter)) # unite all computated flights ('incomplete' and 'complete') union_query = complete_flight_query.union( @@ -136,12 +137,13 @@ def compute_logbook_entries(session=None): # consider only if not already stored new_logbook_entries = session.query(union_query) \ .filter(~exists().where( - and_(Logbook.reftime == union_query.c.reftime, - Logbook.device_id == union_query.c.device_id, - or_(Logbook.takeoff_airport_id == union_query.c.takeoff_airport_id, + and_(Logbook.device_id == union_query.c.device_id, + or_(and_(Logbook.takeoff_airport_id == union_query.c.takeoff_airport_id, + Logbook.takeoff_timestamp == union_query.c.takeoff_timestamp), and_(Logbook.takeoff_airport_id == null(), union_query.c.takeoff_airport_id == null())), - or_(Logbook.landing_airport_id == union_query.c.landing_airport_id, + or_(and_(Logbook.landing_airport_id == union_query.c.landing_airport_id, + Logbook.landing_timestamp == union_query.c.landing_timestamp), and_(Logbook.landing_airport_id == null(), union_query.c.landing_airport_id == null()))))) @@ -158,8 +160,8 @@ def compute_logbook_entries(session=None): new_logbook_entries) result = session.execute(ins) - counter = result.rowcount + insert_counter = result.rowcount session.commit() - logger.debug("New logbook entries: {}".format(counter)) + logger.debug("New logbook entries: {}".format(insert_counter)) - return counter + return "{}/{}".format(update_counter, insert_counter) diff --git a/tests/collect/test_logbook.py b/tests/collect/test_logbook.py index 2b93a61..0795ce5 100644 --- a/tests/collect/test_logbook.py +++ b/tests/collect/test_logbook.py @@ -11,6 +11,11 @@ class TestDB(unittest.TestCase): engine = None app = None + TAKEOFF_KOENIGSDF_DD0815 = "INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'" + LANDING_KOENIGSDF_DD0815 = "INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:05:00', FALSE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'" + LANDING_KOENIGSDF_DD0815_LATER = "INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-02 10:05:00', FALSE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'" + TAKEOFF_OHLSTADT_DD4711 = "INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Ohlstadt' and d.address = 'DD4711'" + def setUp(self): os.environ['OGN_CONFIG_MODULE'] = 'config.test' from ogn.commands.dbutils import engine, session @@ -32,88 +37,89 @@ class TestDB(unittest.TestCase): session.commit() pass - def count_logbook_entries(self): - session = self.session - logbook_query = session.query(Logbook) - i = 0 - for logbook in logbook_query.all(): - i = i + 1 - print("{} {} {} {} {} {}".format(logbook.id, logbook.device_id, logbook.takeoff_airport_id, logbook.takeoff_timestamp, logbook.landing_airport_id, logbook.landing_timestamp)) - - return i - def test_single_takeoff(self): session = self.session - session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.execute(self.TAKEOFF_KOENIGSDF_DD0815) session.commit() - compute_logbook_entries(session) - self.assertEqual(self.count_logbook_entries(), 1) + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '0/1') def test_single_landing(self): session = self.session - session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', FALSE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.execute(self.LANDING_KOENIGSDF_DD0815) session.commit() - compute_logbook_entries(session) - self.assertEqual(self.count_logbook_entries(), 1) + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '0/1') def test_different_takeoffs(self): session = self.session - session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") - session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Ohlstadt' and d.address = 'DD4711'") + session.execute(self.TAKEOFF_KOENIGSDF_DD0815) + session.execute(self.TAKEOFF_OHLSTADT_DD4711) session.commit() - compute_logbook_entries(session) - self.assertEqual(self.count_logbook_entries(), 2) + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '0/2') def test_takeoff_and_landing(self): session = self.session - session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") - session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:05:00', FALSE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.execute(self.TAKEOFF_KOENIGSDF_DD0815) + session.execute(self.LANDING_KOENIGSDF_DD0815) session.commit() - compute_logbook_entries(session) - self.assertEqual(self.count_logbook_entries(), 1) + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '0/1') def test_takeoff_and_landing_on_different_days(self): session = self.session - session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") - session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-02 10:05:00', FALSE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.execute(self.TAKEOFF_KOENIGSDF_DD0815) + session.execute(self.LANDING_KOENIGSDF_DD0815_LATER) session.commit() - compute_logbook_entries(session) - self.assertEqual(self.count_logbook_entries(), 2) + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '0/2') def test_update(self): session = self.session - session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.execute(self.TAKEOFF_KOENIGSDF_DD0815) session.commit() - compute_logbook_entries(session) - session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:05:00', FALSE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '0/1') + + session.execute(self.LANDING_KOENIGSDF_DD0815) session.commit() - compute_logbook_entries(session) - self.assertEqual(self.count_logbook_entries(), 1) + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '1/0') + + session.execute(self.TAKEOFF_OHLSTADT_DD4711) + session.commit() + + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '0/1') - @unittest.skip("Doesnt work... dont know why. Fix it!") def test_update_wrong_order(self): session = self.session - session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:05:00', FALSE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") + session.execute(self.LANDING_KOENIGSDF_DD0815) session.commit() - compute_logbook_entries(session) - session.execute("INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'") - session.commit() - compute_logbook_entries(session) - self.assertEqual(self.count_logbook_entries(), 1) + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '0/1') + + session.execute(self.TAKEOFF_KOENIGSDF_DD0815) + session.commit() + + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '1/0') if __name__ == '__main__': unittest.main() From 08c0a73d54bd11e660fe021f0238d8a510e10dfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Wed, 6 Jul 2016 19:34:55 +0200 Subject: [PATCH 22/26] Update: complete flights only --- ogn/collect/logbook.py | 54 +++++++++++++++++------------------ tests/collect/test_logbook.py | 18 ++++++++++++ 2 files changed, 44 insertions(+), 28 deletions(-) diff --git a/ogn/collect/logbook.py b/ogn/collect/logbook.py index 7ae6e0b..6f671de 100644 --- a/ogn/collect/logbook.py +++ b/ogn/collect/logbook.py @@ -101,32 +101,7 @@ def compute_logbook_entries(session=None): sq.c.is_takeoff_next == true(), sq.c.is_takeoff_next == null())) - # update 'incomplete' logbook entries with 'complete flights' - complete_flights = complete_flight_query.subquery() - - upd = update(Logbook) \ - .where(and_(Logbook.device_id == complete_flights.c.device_id, - or_(and_(Logbook.takeoff_airport_id == complete_flights.c.takeoff_airport_id, - Logbook.takeoff_timestamp == complete_flights.c.takeoff_timestamp, - Logbook.landing_airport_id == null()), - and_(Logbook.takeoff_airport_id == null(), - Logbook.landing_airport_id == complete_flights.c.landing_airport_id, - Logbook.landing_timestamp == complete_flights.c.landing_timestamp)))) \ - .values({"takeoff_timestamp": complete_flights.c.takeoff_timestamp, - "takeoff_track": complete_flights.c.takeoff_track, - "takeoff_airport_id": complete_flights.c.takeoff_airport_id, - "landing_timestamp": complete_flights.c.landing_timestamp, - "landing_track": complete_flights.c.landing_track, - "landing_airport_id": complete_flights.c.landing_airport_id, - "duration": complete_flights.c.duration, - "max_altitude": 1}) - - result = session.execute(upd) - update_counter = result.rowcount - session.commit() - logger.debug("Updated logbook entries: {}".format(update_counter)) - - # unite all computated flights ('incomplete' and 'complete') + # unite all computated flights union_query = complete_flight_query.union( split_start_query, split_landing_query, @@ -134,7 +109,31 @@ def compute_logbook_entries(session=None): only_starts_query) \ .subquery() - # consider only if not already stored + # if a logbook entry exist --> update it + upd = update(Logbook) \ + .where(and_(Logbook.device_id == union_query.c.device_id, + union_query.c.takeoff_airport_id != null(), + union_query.c.landing_airport_id != null(), + or_(and_(Logbook.takeoff_airport_id == union_query.c.takeoff_airport_id, + Logbook.takeoff_timestamp == union_query.c.takeoff_timestamp, + Logbook.landing_airport_id == null()), + and_(Logbook.takeoff_airport_id == null(), + Logbook.landing_airport_id == union_query.c.landing_airport_id, + Logbook.landing_timestamp == union_query.c.landing_timestamp)))) \ + .values({"takeoff_timestamp": union_query.c.takeoff_timestamp, + "takeoff_track": union_query.c.takeoff_track, + "takeoff_airport_id": union_query.c.takeoff_airport_id, + "landing_timestamp": union_query.c.landing_timestamp, + "landing_track": union_query.c.landing_track, + "landing_airport_id": union_query.c.landing_airport_id, + "duration": union_query.c.duration}) + + result = session.execute(upd) + update_counter = result.rowcount + session.commit() + logger.debug("Updated logbook entries: {}".format(update_counter)) + + # if a logbook entry doesnt exist --> insert it new_logbook_entries = session.query(union_query) \ .filter(~exists().where( and_(Logbook.device_id == union_query.c.device_id, @@ -147,7 +146,6 @@ def compute_logbook_entries(session=None): and_(Logbook.landing_airport_id == null(), union_query.c.landing_airport_id == null()))))) - # ... and save them ins = insert(Logbook).from_select((Logbook.reftime, Logbook.device_id, Logbook.takeoff_timestamp, diff --git a/tests/collect/test_logbook.py b/tests/collect/test_logbook.py index 0795ce5..cbec2c9 100644 --- a/tests/collect/test_logbook.py +++ b/tests/collect/test_logbook.py @@ -46,6 +46,9 @@ class TestDB(unittest.TestCase): entries_changed = compute_logbook_entries(session) self.assertEqual(entries_changed, '0/1') + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '0/0') + def test_single_landing(self): session = self.session @@ -55,6 +58,9 @@ class TestDB(unittest.TestCase): entries_changed = compute_logbook_entries(session) self.assertEqual(entries_changed, '0/1') + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '0/0') + def test_different_takeoffs(self): session = self.session @@ -65,6 +71,9 @@ class TestDB(unittest.TestCase): entries_changed = compute_logbook_entries(session) self.assertEqual(entries_changed, '0/2') + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '0/0') + def test_takeoff_and_landing(self): session = self.session @@ -75,6 +84,9 @@ class TestDB(unittest.TestCase): entries_changed = compute_logbook_entries(session) self.assertEqual(entries_changed, '0/1') + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '0/0') + def test_takeoff_and_landing_on_different_days(self): session = self.session @@ -85,6 +97,9 @@ class TestDB(unittest.TestCase): entries_changed = compute_logbook_entries(session) self.assertEqual(entries_changed, '0/2') + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '0/0') + def test_update(self): session = self.session @@ -106,6 +121,9 @@ class TestDB(unittest.TestCase): entries_changed = compute_logbook_entries(session) self.assertEqual(entries_changed, '0/1') + entries_changed = compute_logbook_entries(session) + self.assertEqual(entries_changed, '0/0') + def test_update_wrong_order(self): session = self.session From a039ee31852dc9352cbbfb4fac07bdb203cc7345 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Thu, 14 Jul 2016 21:03:55 +0200 Subject: [PATCH 23/26] Better detection with multiple receivers in sight --- ogn/collect/logbook.py | 3 ++- ogn/collect/takeoff_landing.py | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/ogn/collect/logbook.py b/ogn/collect/logbook.py index 6f671de..753b693 100644 --- a/ogn/collect/logbook.py +++ b/ogn/collect/logbook.py @@ -23,7 +23,8 @@ def compute_logbook_entries(session=None): # 'wo' is the window order for the sql window function wo = and_(func.date(TakeoffLanding.timestamp), TakeoffLanding.device_id, - TakeoffLanding.timestamp) + TakeoffLanding.timestamp, + TakeoffLanding.airport_id) # make a query with current, previous and next "takeoff_landing" event, so we can find complete flights sq = session.query( diff --git a/ogn/collect/takeoff_landing.py b/ogn/collect/takeoff_landing.py index 110960d..54e720a 100644 --- a/ogn/collect/takeoff_landing.py +++ b/ogn/collect/takeoff_landing.py @@ -57,7 +57,9 @@ def compute_takeoff_and_landing(session=None): aircraft_beacon_end_id = aircraft_beacon_start_id + 500000 # 'wo' is the window order for the sql window function - wo = and_(AircraftBeacon.device_id, AircraftBeacon.timestamp) + wo = and_(AircraftBeacon.device_id, + AircraftBeacon.timestamp, + AircraftBeacon.receiver_id) # make a query with current, previous and next position sq = session.query( From 943203804fe32066b5752f1564ebdc0dc234d770 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Thu, 14 Jul 2016 21:14:55 +0200 Subject: [PATCH 24/26] Optional: filter airports by country --- ogn/commands/showairport.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/ogn/commands/showairport.py b/ogn/commands/showairport.py index f874282..fdc8037 100644 --- a/ogn/commands/showairport.py +++ b/ogn/commands/showairport.py @@ -2,14 +2,23 @@ from ogn.model import Airport from ogn.commands.dbutils import session from manager import Manager +from sqlalchemy import and_, between manager = Manager() +@manager.arg('country_code', help='filter by country code, eg. "de" for germany') @manager.command -def list_all(): +def list_all(country_code=None): """Show a list of all airports.""" + or_args = [] + if country_code is None: + or_args = [between(Airport.style, 2, 5)] + else: + or_args = [and_(between(Airport.style, 2, 5), + Airport.country_code == country_code)] query = session.query(Airport) \ - .order_by(Airport.name) + .order_by(Airport.name) \ + .filter(*or_args) print('--- Airports ---') for airport in query.all(): From 3b74d9b545752de3887f250745427278b71e95a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Thu, 14 Jul 2016 21:15:58 +0200 Subject: [PATCH 25/26] Update manage options --- README.md | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index b6da858..ae6de92 100644 --- a/README.md +++ b/README.md @@ -102,15 +102,22 @@ available commands: run Run the aprs client. [logbook] - compute Compute takeoffs and landings. + compute_logbook Compute logbook. + compute_takeoff_landingCompute takeoffs and landings. show Show a logbook for . [show.airport] list_all Show a list of all airports. - [show.devices] + [show.deviceinfos] stats Show some stats on registered devices. + [show.devices] + aircraft_type_stats Show stats about aircraft types used by devices. + hardware_stats Show stats about hardware version used by devices. + software_stats Show stats about software version used by devices. + stealth_stats Show stats about stealth flag set by devices. + [show.receiver] hardware_stats Show some statistics of receiver hardware. list_all Show a list of all receivers. From bbe167a8decdda4c3182b0973dd10c9e33ab9b5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Thu, 14 Jul 2016 21:29:27 +0200 Subject: [PATCH 26/26] Flake8 fixes... again --- tests/collect/test_logbook.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/collect/test_logbook.py b/tests/collect/test_logbook.py index cbec2c9..c7cfca0 100644 --- a/tests/collect/test_logbook.py +++ b/tests/collect/test_logbook.py @@ -1,8 +1,6 @@ import unittest import os -from ogn.model import Logbook - from ogn.collect.logbook import compute_logbook_entries @@ -11,10 +9,10 @@ class TestDB(unittest.TestCase): engine = None app = None - TAKEOFF_KOENIGSDF_DD0815 = "INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'" - LANDING_KOENIGSDF_DD0815 = "INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:05:00', FALSE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'" + TAKEOFF_KOENIGSDF_DD0815 = "INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'" + LANDING_KOENIGSDF_DD0815 = "INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:05:00', FALSE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'" LANDING_KOENIGSDF_DD0815_LATER = "INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-02 10:05:00', FALSE FROM airport a, device d WHERE a.name='Koenigsdorf' and d.address = 'DD0815'" - TAKEOFF_OHLSTADT_DD4711 = "INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Ohlstadt' and d.address = 'DD4711'" + TAKEOFF_OHLSTADT_DD4711 = "INSERT INTO takeoff_landing(device_id, airport_id, timestamp, is_takeoff) SELECT d.id, a.id, '2016-06-01 10:00:00', TRUE FROM airport a, device d WHERE a.name='Ohlstadt' and d.address = 'DD4711'" def setUp(self): os.environ['OGN_CONFIG_MODULE'] = 'config.test'