diff --git a/config/default.py b/config/default.py index 88d6d7e..9742b01 100644 --- a/config/default.py +++ b/config/default.py @@ -11,10 +11,18 @@ CELERYBEAT_SCHEDULE = { 'task': 'ogn.collect.database.import_ddb', 'schedule': timedelta(minutes=15), }, - 'update-logbook': { + 'update-takeoff-and-landing': { 'task': 'ogn.collect.logbook.compute_takeoff_and_landing', 'schedule': timedelta(minutes=15), }, + 'update-logbook': { + 'task': 'ogn.collect.logbook.compute_logbook', + 'schedule': timedelta(minutes=1), + }, + 'update-altitudes': { + 'task': 'ogn.collect.logbook.compute_altitudes', + 'schedule': timedelta(minutes=1), + }, 'update-receiver-table': { 'task': 'ogn.collect.receiver.update_receivers', 'schedule': timedelta(minutes=15), diff --git a/ogn/collect/logbook.py b/ogn/collect/logbook.py index 1726b5d..23c2c80 100644 --- a/ogn/collect/logbook.py +++ b/ogn/collect/logbook.py @@ -3,11 +3,11 @@ from datetime import timedelta from celery.utils.log import get_task_logger from ogn.collect.celery import app -from sqlalchemy.sql import func -from sqlalchemy import and_, or_, insert, between -from sqlalchemy.sql.expression import case +from sqlalchemy.sql import func, null +from sqlalchemy import and_, or_, insert, update, between, exists +from sqlalchemy.sql.expression import case, true, false, label -from ogn.model import AircraftBeacon, TakeoffLanding, Airport +from ogn.model import AircraftBeacon, TakeoffLanding, Airport, Logbook logger = get_task_logger(__name__) @@ -122,3 +122,162 @@ def compute_takeoff_and_landing(): logger.debug("New takeoffs and landings: {}".format(counter)) return counter + + +@app.task +def compute_logbook(): + logger.info("Compute logbook.") + + or_args = [between(TakeoffLanding.timestamp, '2016-06-28 00:00:00', '2016-06-28 23:59:59')] + or_args = [] + + # 'wo' is the window order for the sql window function + wo = and_(func.date(TakeoffLanding.timestamp), + TakeoffLanding.device_id, + TakeoffLanding.timestamp) + + # make a query with current, previous and next "takeoff_landing" event, so we can find complete flights + sq = app.session.query( + TakeoffLanding.device_id, + func.lag(TakeoffLanding.device_id) + .over(order_by=wo).label('device_id_prev'), + func.lead(TakeoffLanding.device_id) + .over(order_by=wo).label('device_id_next'), + TakeoffLanding.timestamp, + func.lag(TakeoffLanding.timestamp) + .over(order_by=wo).label('timestamp_prev'), + func.lead(TakeoffLanding.timestamp) + .over(order_by=wo).label('timestamp_next'), + TakeoffLanding.track, + func.lag(TakeoffLanding.track) + .over(order_by=wo).label('track_prev'), + func.lead(TakeoffLanding.track) + .over(order_by=wo).label('track_next'), + TakeoffLanding.is_takeoff, + func.lag(TakeoffLanding.is_takeoff) + .over(order_by=wo).label('is_takeoff_prev'), + func.lead(TakeoffLanding.is_takeoff) + .over(order_by=wo).label('is_takeoff_next'), + TakeoffLanding.airport_id, + func.lag(TakeoffLanding.airport_id) + .over(order_by=wo).label('airport_id_prev'), + func.lead(TakeoffLanding.airport_id) + .over(order_by=wo).label('airport_id_next')) \ + .filter(*or_args) \ + .subquery() + + # find complete flights (with takeoff and landing on the same day) + complete_flight_query = app.session.query( + sq.c.timestamp.label('reftime'), + sq.c.device_id.label('device_id'), + sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), + sq.c.timestamp_next.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), + label('duration', sq.c.timestamp_next - sq.c.timestamp)) \ + .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ + .filter(sq.c.device_id == sq.c.device_id_next) \ + .filter(func.date(sq.c.timestamp_next) == func.date(sq.c.timestamp)) + + # split complete flights (with takeoff and landing on different days) into one takeoff and one landing + split_start_query = app.session.query( + sq.c.timestamp.label('reftime'), + sq.c.device_id.label('device_id'), + sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), + null().label('landing'), null().label('landing_track'), null().label('landing_airport_id'), + null().label('duration')) \ + .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ + .filter(sq.c.device_id == sq.c.device_id_next) \ + .filter(func.date(sq.c.timestamp_next) != func.date(sq.c.timestamp)) + + split_landing_query = app.session.query( + sq.c.timestamp_next.label('reftime'), + sq.c.device_id.label('device_id'), + null().label('takeoff'), null().label('takeoff_track'), null().label('takeoff_airport_id'), + sq.c.timestamp_next.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), + null().label('duration')) \ + .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ + .filter(sq.c.device_id == sq.c.device_id_next) \ + .filter(func.date(sq.c.timestamp_next) != func.date(sq.c.timestamp)) + + # find landings without start + only_landings_query = app.session.query( + sq.c.timestamp.label('reftime'), + sq.c.device_id.label('device_id'), + null().label('takeoff'), null().label('takeoff_track'), null().label('takeoff_airport_id'), + sq.c.timestamp.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), + null().label('duration')) \ + .filter(sq.c.is_takeoff == false()) \ + .filter(or_(sq.c.device_id != sq.c.device_id_prev, + sq.c.is_takeoff_prev == false())) + + # find starts without landing + only_starts_query = app.session.query( + sq.c.timestamp.label('reftime'), + sq.c.device_id.label('device_id'), + sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), + null().label('landing'), null().label('landing_track'), null().label('landing_airport_id'), + null().label('duration')) \ + .filter(sq.c.is_takeoff == true()) \ + .filter(or_(sq.c.device_id != sq.c.device_id_next, + sq.c.is_takeoff_next == true())) + + # unite all + union_query = complete_flight_query.union( + split_start_query, + split_landing_query, + only_landings_query, + only_starts_query) \ + .subquery() + + # consider only if not already stored + new_logbook_entries = app.session.query(union_query) \ + .filter(~exists().where( + and_(Logbook.reftime == union_query.c.reftime, + Logbook.device_id == union_query.c.device_id, + or_(Logbook.takeoff_airport_id == union_query.c.takeoff_airport_id, + and_(Logbook.takeoff_airport_id == null(), + union_query.c.takeoff_airport_id == null())), + or_(Logbook.landing_airport_id == union_query.c.landing_airport_id, + and_(Logbook.landing_airport_id == null(), + union_query.c.landing_airport_id == null()))))) + + # ... and save them + ins = insert(Logbook).from_select((Logbook.reftime, + Logbook.device_id, + Logbook.takeoff_timestamp, + Logbook.takeoff_track, + Logbook.takeoff_airport_id, + Logbook.landing_timestamp, + Logbook.landing_track, + Logbook.landing_airport_id, + Logbook.duration), + new_logbook_entries) + + result = app.session.execute(ins) + counter = result.rowcount + app.session.commit() + logger.debug("New logbook entries: {}".format(counter)) + + return counter + + +@app.task +def compute_altitudes(): + logger.info("Compute maximum altitudes.") + + altitude_query = app.session.query(Logbook.id, func.max(AircraftBeacon.altitude).label('max_altitude')) \ + .filter(and_(Logbook.takeoff_airport_id != null(), + Logbook.landing_airport_id != null())) \ + .filter(and_(between(AircraftBeacon.timestamp, Logbook.takeoff_timestamp, Logbook.landing_timestamp))) \ + .group_by(Logbook.id) \ + .subquery() + + upd = update(Logbook) \ + .values({'max_altitude': altitude_query.c.max_altitude}) \ + .where(Logbook.id == altitude_query.c.id) + + result = app.session.execute(upd) + counter = result.rowcount + app.session.commit() + logger.debug("Updated logbook entries: {}".format(counter)) + + return counter \ No newline at end of file diff --git a/ogn/commands/logbook.py b/ogn/commands/logbook.py index fa526ec..ab9450b 100644 --- a/ogn/commands/logbook.py +++ b/ogn/commands/logbook.py @@ -2,31 +2,47 @@ from datetime import timedelta, datetime -from sqlalchemy.sql import func, null +from sqlalchemy.sql import func from sqlalchemy import and_, or_ -from sqlalchemy.sql.expression import true, false, label from sqlalchemy.orm import aliased -from ogn.model import Device, DeviceInfo, TakeoffLanding, Airport +from ogn.model import Device, DeviceInfo, TakeoffLanding, Airport, Logbook from ogn.commands.dbutils import session -from ogn.collect.logbook import compute_takeoff_and_landing +from ogn.collect.logbook import compute_takeoff_and_landing, compute_logbook, compute_altitudes from manager import Manager manager = Manager() @manager.command -def compute(): +def compute_takeoff_landing(): """Compute takeoffs and landings.""" print("Compute takeoffs and landings...") result = compute_takeoff_and_landing.delay() counter = result.get() - print("New/recalculated takeoffs/landings: {}".format(counter)) + print("New takeoffs/landings: {}".format(counter)) + + +@manager.command +def compute_logbook(): + """Compute logbook.""" + print("Compute logbook...") + result = compute_logbook.delay() + counter = result.get() + print("New logbook entries: {}".format(counter)) + + +@manager.command +def compute_altitudes(): + """Compute maximum altitudes.""" + print("Compute maximum altitudes...") + result = compute_altitudes.delay() + counter = result.get() + print("Updated logbook entries: {}".format(counter)) @manager.arg('date', help='date (format: yyyy-mm-dd)') -@manager.arg('utc_delta_hours', help='delta hours to utc (for local time logs)') @manager.command def show(airport_name, utc_delta_hours=0, date=None): """Show a logbook for .""" @@ -38,138 +54,13 @@ def show(airport_name, utc_delta_hours=0, date=None): print('Airport "{}" not found.'.format(airport_name)) return - utc_timedelta = timedelta(hours=utc_delta_hours) or_args = [] if date is not None: date = datetime.strptime(date, "%Y-%m-%d") - or_args = [and_(TakeoffLanding.timestamp >= date + utc_timedelta, - TakeoffLanding.timestamp < date + timedelta(hours=24) + utc_timedelta)] + or_args = [and_(TakeoffLanding.timestamp >= date, + TakeoffLanding.timestamp < date + timedelta(hours=24))] - # make a query with current, previous and next "takeoff_landing" event, so we can find complete flights - sq = session.query( - TakeoffLanding.device_id, - func.lag(TakeoffLanding.device_id) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('device_id_prev'), - func.lead(TakeoffLanding.device_id) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('device_id_next'), - (TakeoffLanding.timestamp).label('timestamp'), - func.lag(TakeoffLanding.timestamp) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('timestamp_prev'), - func.lead(TakeoffLanding.timestamp) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('timestamp_next'), - TakeoffLanding.track, - func.lag(TakeoffLanding.track) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('track_prev'), - func.lead(TakeoffLanding.track) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('track_next'), - TakeoffLanding.is_takeoff, - func.lag(TakeoffLanding.is_takeoff) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('is_takeoff_prev'), - func.lead(TakeoffLanding.is_takeoff) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('is_takeoff_next'), - TakeoffLanding.airport_id, - func.lag(TakeoffLanding.airport_id) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('airport_id_prev'), - func.lead(TakeoffLanding.airport_id) - .over(order_by=and_(func.date(TakeoffLanding.timestamp), - TakeoffLanding.device_id, - TakeoffLanding.timestamp)) - .label('airport_id_next')) \ - .filter(*or_args) \ - .subquery() - - # find complete flights (with takeoff and landing on the same day) - complete_flight_query = session.query(sq.c.timestamp.label('reftime'), - sq.c.device_id.label('device_id'), - sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), - sq.c.timestamp_next.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), - label('duration', sq.c.timestamp_next - sq.c.timestamp)) \ - .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ - .filter(sq.c.device_id == sq.c.device_id_next) \ - .filter(func.date(sq.c.timestamp_next + utc_timedelta) == func.date(sq.c.timestamp + utc_timedelta)) \ - .filter(or_(sq.c.airport_id == airport.id, - sq.c.airport_id_next == airport.id)) - - # split complete flights (with takeoff and landing on different days) into one takeoff and one landing - split_start_query = session.query(sq.c.timestamp.label('reftime'), - sq.c.device_id.label('device_id'), - sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), - null().label('landing'), null().label('landing_track'), null().label('landing_airport_id'), - null().label('duration')) \ - .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ - .filter(sq.c.device_id == sq.c.device_id_next) \ - .filter(func.date(sq.c.timestamp_next + utc_timedelta) != func.date(sq.c.timestamp + utc_timedelta)) \ - .filter(and_(sq.c.airport_id == airport.id, - sq.c.airport_id_next == airport.id)) - - split_landing_query = session.query(sq.c.timestamp_next.label('reftime'), - sq.c.device_id.label('device_id'), - null().label('takeoff'), null().label('takeoff_track'), null().label('takeoff_airport_id'), - sq.c.timestamp_next.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), - null().label('duration')) \ - .filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \ - .filter(sq.c.device_id == sq.c.device_id_next) \ - .filter(func.date(sq.c.timestamp_next + utc_timedelta) != func.date(sq.c.timestamp + utc_timedelta)) \ - .filter(and_(sq.c.airport_id == airport.id, - sq.c.airport_id_next == airport.id)) - - # find landings without start - only_landings_query = session.query(sq.c.timestamp.label('reftime'), - sq.c.device_id.label('device_id'), - null().label('takeoff'), null().label('takeoff_track'), null().label('takeoff_airport_id'), - sq.c.timestamp.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'), - null().label('duration')) \ - .filter(sq.c.is_takeoff == false()) \ - .filter(or_(sq.c.device_id != sq.c.device_id_prev, - sq.c.is_takeoff_prev == false())) \ - .filter(sq.c.airport_id_next == airport.id) - - # find starts without landing - only_starts_query = session.query(sq.c.timestamp.label('reftime'), - sq.c.device_id.label('device_id'), - sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'), - null().label('landing'), null().label('landing_track'), null().label('landing_airport_id'), - null().label('duration')) \ - .filter(sq.c.is_takeoff == true()) \ - .filter(or_(sq.c.device_id != sq.c.device_id_next, - sq.c.is_takeoff_next == true())) \ - .filter(sq.c.airport_id == airport.id) - - # unite all - union_query = complete_flight_query.union(split_start_query, - split_landing_query, - only_landings_query, - only_starts_query) \ - .subquery() - - # get aircraft and airport informations and sort all entries by the reference time + # get device info with highes priority sq2 = session.query(DeviceInfo.address, func.max(DeviceInfo.address_origin).label('address_origin')) \ .group_by(DeviceInfo.address) \ .subquery() @@ -178,24 +69,23 @@ def show(airport_name, utc_delta_hours=0, date=None): .filter(and_(DeviceInfo.address == sq2.c.address, DeviceInfo.address_origin == sq2.c.address_origin)) \ .subquery() + # get all logbook entries and add device and airport infos takeoff_airport = aliased(Airport, name='takeoff_airport') landing_airport = aliased(Airport, name='landing_airport') - logbook_query = session.query(union_query.c.reftime, - union_query.c.takeoff, - union_query.c.takeoff_track, + logbook_query = session.query(Logbook, takeoff_airport, - union_query.c.landing, - union_query.c.landing_track, landing_airport, - union_query.c.duration, Device, sq3.c.registration, sq3.c.aircraft) \ - .outerjoin(takeoff_airport, union_query.c.takeoff_airport_id == takeoff_airport.id) \ - .outerjoin(landing_airport, union_query.c.landing_airport_id == landing_airport.id) \ - .outerjoin(Device, union_query.c.device_id == Device.id) \ + .filter(or_(Logbook.takeoff_airport_id == airport.id, + Logbook.landing_airport_id == airport.id)) \ + .filter(*or_args) \ + .outerjoin(takeoff_airport, Logbook.takeoff_airport_id == takeoff_airport.id) \ + .outerjoin(landing_airport, Logbook.landing_airport_id == landing_airport.id) \ + .outerjoin(Device, Logbook.device_id == Device.id) \ .outerjoin(sq3, sq3.c.address == Device.address) \ - .order_by(union_query.c.reftime) + .order_by(Logbook.reftime) # ... and finally print out the logbook print('--- Logbook ({}) ---'.format(airport_name)) @@ -223,14 +113,18 @@ def show(airport_name, utc_delta_hours=0, date=None): else: return ('') - for [reftime, takeoff, takeoff_track, takeoff_airport, landing, landing_track, landing_airport, duration, device, registration, aircraft] in logbook_query.all(): - print('%10s %8s (%2s) %8s (%2s) %8s %8s %17s %20s' % ( - reftime.date(), - none_datetime_replacer(takeoff), - none_track_replacer(takeoff_track), - none_datetime_replacer(landing), - none_track_replacer(landing_track), - none_timedelta_replacer(duration), + def none_altitude_replacer(altitude_object, airport_object): + return "?" if altitude_object is None else "{:5d}m ({:+5d}m)".format(altitude_object, altitude_object - airport_object.altitude) + + for [logbook, takeoff_airport, landing_airport, device, registration, aircraft] in logbook_query.all(): + print('%10s %8s (%2s) %8s (%2s) %8s %15s %8s %17s %20s' % ( + logbook.reftime.date(), + none_datetime_replacer(logbook.takeoff_timestamp), + none_track_replacer(logbook.takeoff_track), + none_datetime_replacer(logbook.landing_timestamp), + none_track_replacer(logbook.landing_track), + none_timedelta_replacer(logbook.duration), + none_altitude_replacer(logbook.max_altitude, takeoff_airport), none_registration_replacer(device, registration), none_aircraft_replacer(device, aircraft), airport_marker(takeoff_airport, landing_airport))) diff --git a/ogn/model/logbook.py b/ogn/model/logbook.py new file mode 100644 index 0000000..c235fc9 --- /dev/null +++ b/ogn/model/logbook.py @@ -0,0 +1,34 @@ +from sqlalchemy import Integer, DateTime, Interval, Column, ForeignKey +from sqlalchemy.orm import relationship + +from .base import Base +from sqlalchemy.sql.schema import UniqueConstraint + + +class Logbook(Base): + __tablename__ = 'logbook' + __table_args__ = (UniqueConstraint('reftime', + 'takeoff_airport_id', + 'landing_airport_id', + 'device_id'), + ) + + id = Column(Integer, primary_key=True) + + reftime = Column(DateTime, index=True) + takeoff_timestamp = Column(DateTime) + takeoff_track = Column(Integer) + landing_timestamp = Column(DateTime) + landing_track = Column(Integer) + duration = Column(Interval) + max_altitude = Column(Integer) + + # Relations + takeoff_airport_id = Column(Integer, ForeignKey('airport.id', ondelete='CASCADE'), index=True) + takeoff_airport = relationship('Airport', foreign_keys=[takeoff_airport_id]) + + landing_airport_id = Column(Integer, ForeignKey('airport.id', ondelete='CASCADE'), index=True) + landing_airport = relationship('Airport', foreign_keys=[landing_airport_id]) + + device_id = Column(Integer, ForeignKey('device.id', ondelete='CASCADE'), index=True) + device = relationship('Device', foreign_keys=[device_id])