Persist logbook

pull/56/head
Konstantin Gründger 2016-06-29 23:26:30 +02:00
rodzic 998cbf4990
commit f08bf220c8
4 zmienionych plików z 253 dodań i 158 usunięć

Wyświetl plik

@ -11,10 +11,18 @@ CELERYBEAT_SCHEDULE = {
'task': 'ogn.collect.database.import_ddb',
'schedule': timedelta(minutes=15),
},
'update-logbook': {
'update-takeoff-and-landing': {
'task': 'ogn.collect.logbook.compute_takeoff_and_landing',
'schedule': timedelta(minutes=15),
},
'update-logbook': {
'task': 'ogn.collect.logbook.compute_logbook',
'schedule': timedelta(minutes=1),
},
'update-altitudes': {
'task': 'ogn.collect.logbook.compute_altitudes',
'schedule': timedelta(minutes=1),
},
'update-receiver-table': {
'task': 'ogn.collect.receiver.update_receivers',
'schedule': timedelta(minutes=15),

Wyświetl plik

@ -3,11 +3,11 @@ from datetime import timedelta
from celery.utils.log import get_task_logger
from ogn.collect.celery import app
from sqlalchemy.sql import func
from sqlalchemy import and_, or_, insert, between
from sqlalchemy.sql.expression import case
from sqlalchemy.sql import func, null
from sqlalchemy import and_, or_, insert, update, between, exists
from sqlalchemy.sql.expression import case, true, false, label
from ogn.model import AircraftBeacon, TakeoffLanding, Airport
from ogn.model import AircraftBeacon, TakeoffLanding, Airport, Logbook
logger = get_task_logger(__name__)
@ -122,3 +122,162 @@ def compute_takeoff_and_landing():
logger.debug("New takeoffs and landings: {}".format(counter))
return counter
@app.task
def compute_logbook():
logger.info("Compute logbook.")
or_args = [between(TakeoffLanding.timestamp, '2016-06-28 00:00:00', '2016-06-28 23:59:59')]
or_args = []
# 'wo' is the window order for the sql window function
wo = and_(func.date(TakeoffLanding.timestamp),
TakeoffLanding.device_id,
TakeoffLanding.timestamp)
# make a query with current, previous and next "takeoff_landing" event, so we can find complete flights
sq = app.session.query(
TakeoffLanding.device_id,
func.lag(TakeoffLanding.device_id)
.over(order_by=wo).label('device_id_prev'),
func.lead(TakeoffLanding.device_id)
.over(order_by=wo).label('device_id_next'),
TakeoffLanding.timestamp,
func.lag(TakeoffLanding.timestamp)
.over(order_by=wo).label('timestamp_prev'),
func.lead(TakeoffLanding.timestamp)
.over(order_by=wo).label('timestamp_next'),
TakeoffLanding.track,
func.lag(TakeoffLanding.track)
.over(order_by=wo).label('track_prev'),
func.lead(TakeoffLanding.track)
.over(order_by=wo).label('track_next'),
TakeoffLanding.is_takeoff,
func.lag(TakeoffLanding.is_takeoff)
.over(order_by=wo).label('is_takeoff_prev'),
func.lead(TakeoffLanding.is_takeoff)
.over(order_by=wo).label('is_takeoff_next'),
TakeoffLanding.airport_id,
func.lag(TakeoffLanding.airport_id)
.over(order_by=wo).label('airport_id_prev'),
func.lead(TakeoffLanding.airport_id)
.over(order_by=wo).label('airport_id_next')) \
.filter(*or_args) \
.subquery()
# find complete flights (with takeoff and landing on the same day)
complete_flight_query = app.session.query(
sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'),
sq.c.timestamp_next.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'),
label('duration', sq.c.timestamp_next - sq.c.timestamp)) \
.filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \
.filter(sq.c.device_id == sq.c.device_id_next) \
.filter(func.date(sq.c.timestamp_next) == func.date(sq.c.timestamp))
# split complete flights (with takeoff and landing on different days) into one takeoff and one landing
split_start_query = app.session.query(
sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'),
null().label('landing'), null().label('landing_track'), null().label('landing_airport_id'),
null().label('duration')) \
.filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \
.filter(sq.c.device_id == sq.c.device_id_next) \
.filter(func.date(sq.c.timestamp_next) != func.date(sq.c.timestamp))
split_landing_query = app.session.query(
sq.c.timestamp_next.label('reftime'),
sq.c.device_id.label('device_id'),
null().label('takeoff'), null().label('takeoff_track'), null().label('takeoff_airport_id'),
sq.c.timestamp_next.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'),
null().label('duration')) \
.filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \
.filter(sq.c.device_id == sq.c.device_id_next) \
.filter(func.date(sq.c.timestamp_next) != func.date(sq.c.timestamp))
# find landings without start
only_landings_query = app.session.query(
sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
null().label('takeoff'), null().label('takeoff_track'), null().label('takeoff_airport_id'),
sq.c.timestamp.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'),
null().label('duration')) \
.filter(sq.c.is_takeoff == false()) \
.filter(or_(sq.c.device_id != sq.c.device_id_prev,
sq.c.is_takeoff_prev == false()))
# find starts without landing
only_starts_query = app.session.query(
sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'),
null().label('landing'), null().label('landing_track'), null().label('landing_airport_id'),
null().label('duration')) \
.filter(sq.c.is_takeoff == true()) \
.filter(or_(sq.c.device_id != sq.c.device_id_next,
sq.c.is_takeoff_next == true()))
# unite all
union_query = complete_flight_query.union(
split_start_query,
split_landing_query,
only_landings_query,
only_starts_query) \
.subquery()
# consider only if not already stored
new_logbook_entries = app.session.query(union_query) \
.filter(~exists().where(
and_(Logbook.reftime == union_query.c.reftime,
Logbook.device_id == union_query.c.device_id,
or_(Logbook.takeoff_airport_id == union_query.c.takeoff_airport_id,
and_(Logbook.takeoff_airport_id == null(),
union_query.c.takeoff_airport_id == null())),
or_(Logbook.landing_airport_id == union_query.c.landing_airport_id,
and_(Logbook.landing_airport_id == null(),
union_query.c.landing_airport_id == null())))))
# ... and save them
ins = insert(Logbook).from_select((Logbook.reftime,
Logbook.device_id,
Logbook.takeoff_timestamp,
Logbook.takeoff_track,
Logbook.takeoff_airport_id,
Logbook.landing_timestamp,
Logbook.landing_track,
Logbook.landing_airport_id,
Logbook.duration),
new_logbook_entries)
result = app.session.execute(ins)
counter = result.rowcount
app.session.commit()
logger.debug("New logbook entries: {}".format(counter))
return counter
@app.task
def compute_altitudes():
logger.info("Compute maximum altitudes.")
altitude_query = app.session.query(Logbook.id, func.max(AircraftBeacon.altitude).label('max_altitude')) \
.filter(and_(Logbook.takeoff_airport_id != null(),
Logbook.landing_airport_id != null())) \
.filter(and_(between(AircraftBeacon.timestamp, Logbook.takeoff_timestamp, Logbook.landing_timestamp))) \
.group_by(Logbook.id) \
.subquery()
upd = update(Logbook) \
.values({'max_altitude': altitude_query.c.max_altitude}) \
.where(Logbook.id == altitude_query.c.id)
result = app.session.execute(upd)
counter = result.rowcount
app.session.commit()
logger.debug("Updated logbook entries: {}".format(counter))
return counter

Wyświetl plik

@ -2,31 +2,47 @@
from datetime import timedelta, datetime
from sqlalchemy.sql import func, null
from sqlalchemy.sql import func
from sqlalchemy import and_, or_
from sqlalchemy.sql.expression import true, false, label
from sqlalchemy.orm import aliased
from ogn.model import Device, DeviceInfo, TakeoffLanding, Airport
from ogn.model import Device, DeviceInfo, TakeoffLanding, Airport, Logbook
from ogn.commands.dbutils import session
from ogn.collect.logbook import compute_takeoff_and_landing
from ogn.collect.logbook import compute_takeoff_and_landing, compute_logbook, compute_altitudes
from manager import Manager
manager = Manager()
@manager.command
def compute():
def compute_takeoff_landing():
"""Compute takeoffs and landings."""
print("Compute takeoffs and landings...")
result = compute_takeoff_and_landing.delay()
counter = result.get()
print("New/recalculated takeoffs/landings: {}".format(counter))
print("New takeoffs/landings: {}".format(counter))
@manager.command
def compute_logbook():
"""Compute logbook."""
print("Compute logbook...")
result = compute_logbook.delay()
counter = result.get()
print("New logbook entries: {}".format(counter))
@manager.command
def compute_altitudes():
"""Compute maximum altitudes."""
print("Compute maximum altitudes...")
result = compute_altitudes.delay()
counter = result.get()
print("Updated logbook entries: {}".format(counter))
@manager.arg('date', help='date (format: yyyy-mm-dd)')
@manager.arg('utc_delta_hours', help='delta hours to utc (for local time logs)')
@manager.command
def show(airport_name, utc_delta_hours=0, date=None):
"""Show a logbook for <airport_name>."""
@ -38,138 +54,13 @@ def show(airport_name, utc_delta_hours=0, date=None):
print('Airport "{}" not found.'.format(airport_name))
return
utc_timedelta = timedelta(hours=utc_delta_hours)
or_args = []
if date is not None:
date = datetime.strptime(date, "%Y-%m-%d")
or_args = [and_(TakeoffLanding.timestamp >= date + utc_timedelta,
TakeoffLanding.timestamp < date + timedelta(hours=24) + utc_timedelta)]
or_args = [and_(TakeoffLanding.timestamp >= date,
TakeoffLanding.timestamp < date + timedelta(hours=24))]
# make a query with current, previous and next "takeoff_landing" event, so we can find complete flights
sq = session.query(
TakeoffLanding.device_id,
func.lag(TakeoffLanding.device_id)
.over(order_by=and_(func.date(TakeoffLanding.timestamp),
TakeoffLanding.device_id,
TakeoffLanding.timestamp))
.label('device_id_prev'),
func.lead(TakeoffLanding.device_id)
.over(order_by=and_(func.date(TakeoffLanding.timestamp),
TakeoffLanding.device_id,
TakeoffLanding.timestamp))
.label('device_id_next'),
(TakeoffLanding.timestamp).label('timestamp'),
func.lag(TakeoffLanding.timestamp)
.over(order_by=and_(func.date(TakeoffLanding.timestamp),
TakeoffLanding.device_id,
TakeoffLanding.timestamp))
.label('timestamp_prev'),
func.lead(TakeoffLanding.timestamp)
.over(order_by=and_(func.date(TakeoffLanding.timestamp),
TakeoffLanding.device_id,
TakeoffLanding.timestamp))
.label('timestamp_next'),
TakeoffLanding.track,
func.lag(TakeoffLanding.track)
.over(order_by=and_(func.date(TakeoffLanding.timestamp),
TakeoffLanding.device_id,
TakeoffLanding.timestamp))
.label('track_prev'),
func.lead(TakeoffLanding.track)
.over(order_by=and_(func.date(TakeoffLanding.timestamp),
TakeoffLanding.device_id,
TakeoffLanding.timestamp))
.label('track_next'),
TakeoffLanding.is_takeoff,
func.lag(TakeoffLanding.is_takeoff)
.over(order_by=and_(func.date(TakeoffLanding.timestamp),
TakeoffLanding.device_id,
TakeoffLanding.timestamp))
.label('is_takeoff_prev'),
func.lead(TakeoffLanding.is_takeoff)
.over(order_by=and_(func.date(TakeoffLanding.timestamp),
TakeoffLanding.device_id,
TakeoffLanding.timestamp))
.label('is_takeoff_next'),
TakeoffLanding.airport_id,
func.lag(TakeoffLanding.airport_id)
.over(order_by=and_(func.date(TakeoffLanding.timestamp),
TakeoffLanding.device_id,
TakeoffLanding.timestamp))
.label('airport_id_prev'),
func.lead(TakeoffLanding.airport_id)
.over(order_by=and_(func.date(TakeoffLanding.timestamp),
TakeoffLanding.device_id,
TakeoffLanding.timestamp))
.label('airport_id_next')) \
.filter(*or_args) \
.subquery()
# find complete flights (with takeoff and landing on the same day)
complete_flight_query = session.query(sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'),
sq.c.timestamp_next.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'),
label('duration', sq.c.timestamp_next - sq.c.timestamp)) \
.filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \
.filter(sq.c.device_id == sq.c.device_id_next) \
.filter(func.date(sq.c.timestamp_next + utc_timedelta) == func.date(sq.c.timestamp + utc_timedelta)) \
.filter(or_(sq.c.airport_id == airport.id,
sq.c.airport_id_next == airport.id))
# split complete flights (with takeoff and landing on different days) into one takeoff and one landing
split_start_query = session.query(sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'),
null().label('landing'), null().label('landing_track'), null().label('landing_airport_id'),
null().label('duration')) \
.filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \
.filter(sq.c.device_id == sq.c.device_id_next) \
.filter(func.date(sq.c.timestamp_next + utc_timedelta) != func.date(sq.c.timestamp + utc_timedelta)) \
.filter(and_(sq.c.airport_id == airport.id,
sq.c.airport_id_next == airport.id))
split_landing_query = session.query(sq.c.timestamp_next.label('reftime'),
sq.c.device_id.label('device_id'),
null().label('takeoff'), null().label('takeoff_track'), null().label('takeoff_airport_id'),
sq.c.timestamp_next.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'),
null().label('duration')) \
.filter(and_(sq.c.is_takeoff == true(), sq.c.is_takeoff_next == false())) \
.filter(sq.c.device_id == sq.c.device_id_next) \
.filter(func.date(sq.c.timestamp_next + utc_timedelta) != func.date(sq.c.timestamp + utc_timedelta)) \
.filter(and_(sq.c.airport_id == airport.id,
sq.c.airport_id_next == airport.id))
# find landings without start
only_landings_query = session.query(sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
null().label('takeoff'), null().label('takeoff_track'), null().label('takeoff_airport_id'),
sq.c.timestamp.label('landing'), sq.c.track_next.label('landing_track'), sq.c.airport_id_next.label('landing_airport_id'),
null().label('duration')) \
.filter(sq.c.is_takeoff == false()) \
.filter(or_(sq.c.device_id != sq.c.device_id_prev,
sq.c.is_takeoff_prev == false())) \
.filter(sq.c.airport_id_next == airport.id)
# find starts without landing
only_starts_query = session.query(sq.c.timestamp.label('reftime'),
sq.c.device_id.label('device_id'),
sq.c.timestamp.label('takeoff'), sq.c.track.label('takeoff_track'), sq.c.airport_id.label('takeoff_airport_id'),
null().label('landing'), null().label('landing_track'), null().label('landing_airport_id'),
null().label('duration')) \
.filter(sq.c.is_takeoff == true()) \
.filter(or_(sq.c.device_id != sq.c.device_id_next,
sq.c.is_takeoff_next == true())) \
.filter(sq.c.airport_id == airport.id)
# unite all
union_query = complete_flight_query.union(split_start_query,
split_landing_query,
only_landings_query,
only_starts_query) \
.subquery()
# get aircraft and airport informations and sort all entries by the reference time
# get device info with highes priority
sq2 = session.query(DeviceInfo.address, func.max(DeviceInfo.address_origin).label('address_origin')) \
.group_by(DeviceInfo.address) \
.subquery()
@ -178,24 +69,23 @@ def show(airport_name, utc_delta_hours=0, date=None):
.filter(and_(DeviceInfo.address == sq2.c.address, DeviceInfo.address_origin == sq2.c.address_origin)) \
.subquery()
# get all logbook entries and add device and airport infos
takeoff_airport = aliased(Airport, name='takeoff_airport')
landing_airport = aliased(Airport, name='landing_airport')
logbook_query = session.query(union_query.c.reftime,
union_query.c.takeoff,
union_query.c.takeoff_track,
logbook_query = session.query(Logbook,
takeoff_airport,
union_query.c.landing,
union_query.c.landing_track,
landing_airport,
union_query.c.duration,
Device,
sq3.c.registration,
sq3.c.aircraft) \
.outerjoin(takeoff_airport, union_query.c.takeoff_airport_id == takeoff_airport.id) \
.outerjoin(landing_airport, union_query.c.landing_airport_id == landing_airport.id) \
.outerjoin(Device, union_query.c.device_id == Device.id) \
.filter(or_(Logbook.takeoff_airport_id == airport.id,
Logbook.landing_airport_id == airport.id)) \
.filter(*or_args) \
.outerjoin(takeoff_airport, Logbook.takeoff_airport_id == takeoff_airport.id) \
.outerjoin(landing_airport, Logbook.landing_airport_id == landing_airport.id) \
.outerjoin(Device, Logbook.device_id == Device.id) \
.outerjoin(sq3, sq3.c.address == Device.address) \
.order_by(union_query.c.reftime)
.order_by(Logbook.reftime)
# ... and finally print out the logbook
print('--- Logbook ({}) ---'.format(airport_name))
@ -223,14 +113,18 @@ def show(airport_name, utc_delta_hours=0, date=None):
else:
return ('')
for [reftime, takeoff, takeoff_track, takeoff_airport, landing, landing_track, landing_airport, duration, device, registration, aircraft] in logbook_query.all():
print('%10s %8s (%2s) %8s (%2s) %8s %8s %17s %20s' % (
reftime.date(),
none_datetime_replacer(takeoff),
none_track_replacer(takeoff_track),
none_datetime_replacer(landing),
none_track_replacer(landing_track),
none_timedelta_replacer(duration),
def none_altitude_replacer(altitude_object, airport_object):
return "?" if altitude_object is None else "{:5d}m ({:+5d}m)".format(altitude_object, altitude_object - airport_object.altitude)
for [logbook, takeoff_airport, landing_airport, device, registration, aircraft] in logbook_query.all():
print('%10s %8s (%2s) %8s (%2s) %8s %15s %8s %17s %20s' % (
logbook.reftime.date(),
none_datetime_replacer(logbook.takeoff_timestamp),
none_track_replacer(logbook.takeoff_track),
none_datetime_replacer(logbook.landing_timestamp),
none_track_replacer(logbook.landing_track),
none_timedelta_replacer(logbook.duration),
none_altitude_replacer(logbook.max_altitude, takeoff_airport),
none_registration_replacer(device, registration),
none_aircraft_replacer(device, aircraft),
airport_marker(takeoff_airport, landing_airport)))

Wyświetl plik

@ -0,0 +1,34 @@
from sqlalchemy import Integer, DateTime, Interval, Column, ForeignKey
from sqlalchemy.orm import relationship
from .base import Base
from sqlalchemy.sql.schema import UniqueConstraint
class Logbook(Base):
__tablename__ = 'logbook'
__table_args__ = (UniqueConstraint('reftime',
'takeoff_airport_id',
'landing_airport_id',
'device_id'),
)
id = Column(Integer, primary_key=True)
reftime = Column(DateTime, index=True)
takeoff_timestamp = Column(DateTime)
takeoff_track = Column(Integer)
landing_timestamp = Column(DateTime)
landing_track = Column(Integer)
duration = Column(Interval)
max_altitude = Column(Integer)
# Relations
takeoff_airport_id = Column(Integer, ForeignKey('airport.id', ondelete='CASCADE'), index=True)
takeoff_airport = relationship('Airport', foreign_keys=[takeoff_airport_id])
landing_airport_id = Column(Integer, ForeignKey('airport.id', ondelete='CASCADE'), index=True)
landing_airport = relationship('Airport', foreign_keys=[landing_airport_id])
device_id = Column(Integer, ForeignKey('device.id', ondelete='CASCADE'), index=True)
device = relationship('Device', foreign_keys=[device_id])