diff --git a/.travis.yml b/.travis.yml index dc0fcc3..ebc54fe 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,18 +5,22 @@ env: python: - 3.4 + - 3.5 + - 3.6 -services: - - postgresql +addons: + postgresql: "9.5" + apt: + packages: + - postgresql-9.5-postgis-2.3 before_script: - - flake8 tests ogn - - psql -c 'CREATE DATABASE ogn_test;' -U postgres - - psql -c 'CREATE EXTENSION postgis;' -U postgres -d ogn_test + - flake8 tests ogn_test + - psql -U postgres -c 'CREATE DATABASE ogn_test;' + - psql -U postgres -c 'CREATE EXTENSION postgis;' script: - nosetests --with-coverage --cover-package=ogn - - pip install . --upgrade - python -c 'import ogn' diff --git a/ogn/collect/celery.py b/ogn/collect/celery.py index c49509c..6abf880 100644 --- a/ogn/collect/celery.py +++ b/ogn/collect/celery.py @@ -27,8 +27,8 @@ def close_db(signal, sender): app = Celery('ogn.collect', include=["ogn.collect.database", "ogn.collect.logbook", + "ogn.collect.stats", "ogn.collect.takeoff_landing", - "ogn.collect.receiver" ]) app.config_from_envvar("OGN_CONFIG_MODULE") diff --git a/ogn/collect/database.py b/ogn/collect/database.py index 1a36dee..f5c75ed 100644 --- a/ogn/collect/database.py +++ b/ogn/collect/database.py @@ -1,10 +1,13 @@ from celery.utils.log import get_task_logger -from ogn.model import DeviceInfo, AddressOrigin -from ogn.utils import get_ddb +from sqlalchemy import insert, distinct +from sqlalchemy.sql import null, and_, or_, func, not_ +from sqlalchemy.sql.expression import case -from ogn.collect.celery import app +from ogn.model import DeviceInfo, DeviceInfoOrigin, AircraftBeacon, ReceiverBeacon, Device, Receiver +from ogn.utils import get_ddb, get_country_code +from .celery import app logger = get_task_logger(__name__) @@ -27,7 +30,7 @@ def import_ddb(): """Import registered devices from the DDB.""" logger.info("Import registered devices fom the DDB...") - address_origin = AddressOrigin.ogn_ddb + address_origin = DeviceInfoOrigin.ogn_ddb counter = update_device_infos(app.session, address_origin) logger.info("Imported {} devices.".format(counter)) @@ -38,7 +41,212 @@ def import_file(path='tests/custom_ddb.txt'): """Import registered devices from a local file.""" logger.info("Import registered devices from '{}'...".format(path)) - address_origin = AddressOrigin.user_defined + address_origin = DeviceInfoOrigin.user_defined counter = update_device_infos(app.session, address_origin, csvfile=path) logger.info("Imported {} devices.".format(counter)) + + +@app.task +def update_devices(): + """Add/update entries in devices table and update foreign keys in aircraft beacons.""" + + # Create missing Device from AircraftBeacon + available_devices = app.session.query(Device.address) \ + .subquery() + + missing_devices_query = app.session.query(distinct(AircraftBeacon.address)) \ + .filter(and_(AircraftBeacon.device_id == null(), AircraftBeacon.error_count == 0)) \ + .filter(~AircraftBeacon.address.in_(available_devices)) + + ins = insert(Device).from_select([Device.address], missing_devices_query) + res = app.session.execute(ins) + insert_count = res.rowcount + app.session.commit() + + # For each address in the new beacons: get firstseen, lastseen and last values != NULL + last_valid_values = app.session.query( + distinct(AircraftBeacon.address).label('address'), + func.first_value(AircraftBeacon.timestamp) + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.timestamp == null(), None), (AircraftBeacon.timestamp != null(), AircraftBeacon.timestamp)])) + .label('firstseen'), + func.last_value(AircraftBeacon.timestamp) + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.timestamp == null(), None), (AircraftBeacon.timestamp != null(), AircraftBeacon.timestamp)])) + .label('lastseen'), + func.first_value(AircraftBeacon.aircraft_type) + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.aircraft_type == null(), None), (AircraftBeacon.aircraft_type != null(), AircraftBeacon.aircraft_type)])) + .label('aircraft_type'), + func.first_value(AircraftBeacon.stealth) + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.stealth == null(), None), (AircraftBeacon.stealth != null(), AircraftBeacon.stealth)])) + .label('stealth'), + func.first_value(AircraftBeacon.software_version) + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.software_version == null(), None), (AircraftBeacon.software_version != null(), AircraftBeacon.software_version)])) + .label('software_version'), + func.first_value(AircraftBeacon.hardware_version) + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.hardware_version == null(), None), (AircraftBeacon.hardware_version != null(), AircraftBeacon.hardware_version)])) + .label('hardware_version'), + func.first_value(AircraftBeacon.real_address) + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.real_address == null(), None), (AircraftBeacon.real_address != null(), AircraftBeacon.real_address)])) + .label('real_address')) \ + .filter(and_(AircraftBeacon.device_id == null(), AircraftBeacon.error_count == 0)) \ + .subquery() + + update_values = app.session.query( + Device.address, + case([(or_(Device.firstseen == null(), Device.firstseen > last_valid_values.c.firstseen), last_valid_values.c.firstseen), + (Device.firstseen <= last_valid_values.c.firstseen, Device.firstseen)]).label('firstseen'), + case([(or_(Device.lastseen == null(), Device.lastseen < last_valid_values.c.lastseen), last_valid_values.c.lastseen), + (Device.lastseen >= last_valid_values.c.lastseen, Device.lastseen)]).label('lastseen'), + case([(or_(Device.aircraft_type == null(), Device.lastseen < last_valid_values.c.lastseen), last_valid_values.c.aircraft_type), + (Device.lastseen >= last_valid_values.c.lastseen, Device.aircraft_type)]).label('aircraft_type'), + case([(or_(Device.stealth == null(), Device.lastseen < last_valid_values.c.lastseen), last_valid_values.c.stealth), + (Device.lastseen >= last_valid_values.c.lastseen, Device.stealth)]).label('stealth'), + case([(or_(Device.software_version == null(), Device.lastseen < last_valid_values.c.lastseen), last_valid_values.c.software_version), + (Device.lastseen >= last_valid_values.c.lastseen, Device.software_version)]).label('software_version'), + case([(or_(Device.hardware_version == null(), Device.lastseen < last_valid_values.c.lastseen), last_valid_values.c.hardware_version), + (Device.lastseen >= last_valid_values.c.lastseen, Device.hardware_version)]).label('hardware_version'), + case([(or_(Device.real_address == null(), Device.lastseen < last_valid_values.c.lastseen), last_valid_values.c.real_address), + (Device.lastseen >= last_valid_values.c.lastseen, Device.real_address)]).label('real_address')) \ + .filter(Device.address == last_valid_values.c.address) \ + .subquery() + + update_receivers = app.session.query(Device) \ + .filter(Device.address == update_values.c.address) \ + .update({ + Device.firstseen: update_values.c.firstseen, + Device.lastseen: update_values.c.lastseen, + Device.aircraft_type: update_values.c.aircraft_type, + Device.stealth: update_values.c.stealth, + Device.software_version: update_values.c.software_version, + Device.hardware_version: update_values.c.hardware_version, + Device.real_address: update_values.c.real_address}, + synchronize_session='fetch') + + # Update relations to aircraft beacons + upd = app.session.query(AircraftBeacon) \ + .filter(AircraftBeacon.device_id == null()) \ + .filter(AircraftBeacon.address == Device.address) \ + .update({ + AircraftBeacon.device_id: Device.id}, + synchronize_session='fetch') + + app.session.commit() + logger.info("Devices: {} inserted, {} updated".format(insert_count, update_receivers)) + logger.info("Updated {} AircraftBeacons".format(upd)) + + return "{} Devices inserted, {} Devices updated, {} AircraftBeacons updated" \ + .format(insert_count, update_receivers, upd) + + +@app.task +def update_receivers(): + """Add/update_receivers entries in receiver table and update receivers foreign keys and distance in aircraft beacons and update foreign keys in receiver beacons.""" + # Create missing Receiver from ReceiverBeacon + available_receivers = app.session.query(Receiver.name) \ + .subquery() + + missing_receiver_query = app.session.query(distinct(ReceiverBeacon.name)) \ + .filter(ReceiverBeacon.receiver_id == null()) \ + .filter(~ReceiverBeacon.name.in_(available_receivers)) + + ins = insert(Receiver).from_select([Receiver.name], missing_receiver_query) + res = app.session.execute(ins) + insert_count = res.rowcount + + # For each name in the new beacons: get firstseen, lastseen and last values != NULL + last_valid_values = app.session.query( + distinct(ReceiverBeacon.name).label('name'), + func.first_value(ReceiverBeacon.timestamp) + .over(partition_by=ReceiverBeacon.name, order_by=case([(ReceiverBeacon.timestamp == null(), None), (ReceiverBeacon.timestamp != null(), ReceiverBeacon.timestamp)])) + .label('firstseen'), + func.last_value(ReceiverBeacon.timestamp) + .over(partition_by=ReceiverBeacon.name, order_by=case([(ReceiverBeacon.timestamp == null(), None), (ReceiverBeacon.timestamp != null(), ReceiverBeacon.timestamp)])) + .label('lastseen'), + func.first_value(ReceiverBeacon.location_wkt) + .over(partition_by=ReceiverBeacon.name, order_by=case([(ReceiverBeacon.location_wkt == null(), None), (ReceiverBeacon.location_wkt != null(), ReceiverBeacon.location_wkt)])) + .label('location_wkt'), + func.first_value(ReceiverBeacon.altitude) + .over(partition_by=ReceiverBeacon.name, order_by=case([(ReceiverBeacon.altitude == null(), None), (ReceiverBeacon.altitude != null(), ReceiverBeacon.altitude)])) + .label('altitude'), + func.first_value(ReceiverBeacon.version) + .over(partition_by=ReceiverBeacon.name, order_by=case([(ReceiverBeacon.version == null(), None), (ReceiverBeacon.version != null(), ReceiverBeacon.version)])) + .label('version'), + func.first_value(ReceiverBeacon.platform) + .over(partition_by=ReceiverBeacon.name, order_by=case([(ReceiverBeacon.platform == null(), None), (ReceiverBeacon.platform != null(), ReceiverBeacon.platform)])) + .label('platform')) \ + .filter(ReceiverBeacon.receiver_id == null()) \ + .subquery() + + update_values = app.session.query( + Receiver.name, + case([(or_(Receiver.firstseen == null(), Receiver.firstseen > last_valid_values.c.firstseen), last_valid_values.c.firstseen), + (Receiver.firstseen <= last_valid_values.c.firstseen, Receiver.firstseen)]).label('firstseen'), + case([(or_(Receiver.lastseen == null(), Receiver.lastseen < last_valid_values.c.lastseen), last_valid_values.c.lastseen), + (Receiver.firstseen >= last_valid_values.c.firstseen, Receiver.firstseen)]).label('lastseen'), + case([(or_(Receiver.lastseen == null(), Receiver.lastseen < last_valid_values.c.lastseen), func.ST_Transform(last_valid_values.c.location_wkt, 4326)), + (Receiver.lastseen >= last_valid_values.c.lastseen, func.ST_Transform(Receiver.location_wkt, 4326))]).label('location_wkt'), + case([(or_(Receiver.lastseen == null(), Receiver.lastseen < last_valid_values.c.lastseen), last_valid_values.c.altitude), + (Receiver.lastseen >= last_valid_values.c.lastseen, Receiver.altitude)]).label('altitude'), + case([(or_(Receiver.lastseen == null(), Receiver.lastseen < last_valid_values.c.lastseen), last_valid_values.c.version), + (Receiver.lastseen >= last_valid_values.c.lastseen, Receiver.version)]).label('version'), + case([(or_(Receiver.lastseen == null(), Receiver.lastseen < last_valid_values.c.lastseen), last_valid_values.c.platform), + (Receiver.lastseen >= last_valid_values.c.lastseen, Receiver.platform)]).label('platform'), + case([(or_(Receiver.location_wkt == null(), not_(func.ST_Equals(Receiver.location_wkt, last_valid_values.c.location_wkt))), None), # set country code to None if location changed + (func.ST_Equals(Receiver.location_wkt, last_valid_values.c.location_wkt), Receiver.country_code)]).label('country_code')) \ + .filter(Receiver.name == last_valid_values.c.name) \ + .subquery() + + update_receivers = app.session.query(Receiver) \ + .filter(Receiver.name == update_values.c.name) \ + .update({ + Receiver.firstseen: update_values.c.firstseen, + Receiver.lastseen: update_values.c.lastseen, + Receiver.location_wkt: update_values.c.location_wkt, + Receiver.altitude: update_values.c.altitude, + Receiver.version: update_values.c.version, + Receiver.platform: update_values.c.platform, + Receiver.country_code: update_values.c.country_code}, + synchronize_session='fetch') + + # Update relations to aircraft beacons + update_aircraft_beacons = app.session.query(AircraftBeacon) \ + .filter(and_(AircraftBeacon.receiver_id == null(), AircraftBeacon.receiver_name == Receiver.name)) \ + .update({AircraftBeacon.receiver_id: Receiver.id, + AircraftBeacon.distance: func.ST_Distance_Sphere(AircraftBeacon.location_wkt, Receiver.location_wkt)}, + synchronize_session='fetch') + + # Update relations to receiver beacons + update_receiver_beacons = app.session.query(ReceiverBeacon) \ + .filter(and_(ReceiverBeacon.receiver_id == null(), ReceiverBeacon.name == Receiver.name)) \ + .update({ReceiverBeacon.receiver_id: Receiver.id}, + synchronize_session='fetch') + + app.session.commit() + + logger.info("Receivers: {} inserted, {} updated.".format(insert_count, update_receivers)) + logger.info("Updated relations: {} aircraft beacons, {} receiver beacons".format(update_aircraft_beacons, update_receiver_beacons)) + + return "{} Receivers inserted, {} Receivers updated, {} AircraftBeacons updated, {} ReceiverBeacons updated" \ + .format(insert_count, update_receivers, update_aircraft_beacons, update_receiver_beacons) + + +@app.task +def update_country_code(): + # update country code in receivers table if None + unknown_country_query = app.session.query(Receiver) \ + .filter(Receiver.country_code == null()) \ + .filter(Receiver.location_wkt != null()) \ + .order_by(Receiver.name) + + counter = 0 + for receiver in unknown_country_query.all(): + location = receiver.location + country_code = get_country_code(location.latitude, location.longitude) + if country_code is not None: + receiver.country_code = country_code + logger.info("Updated country_code for {} to {}".format(receiver.name, receiver.country_code)) + counter += 1 + + app.session.commit() + + return "Updated country_code for {} Receivers".format(counter) diff --git a/ogn/collect/logbook.py b/ogn/collect/logbook.py index d9af45d..b086459 100644 --- a/ogn/collect/logbook.py +++ b/ogn/collect/logbook.py @@ -1,25 +1,22 @@ from celery.utils.log import get_task_logger -from sqlalchemy import and_, or_, insert, update, between, exists +from sqlalchemy import and_, or_, insert, update, exists from sqlalchemy.sql import func, null from sqlalchemy.sql.expression import true, false from ogn.collect.celery import app -from ogn.model import TakeoffLanding, Logbook +from ogn.model import TakeoffLanding, Logbook, AircraftBeacon logger = get_task_logger(__name__) @app.task -def compute_logbook_entries(session=None): +def update_logbook(session=None): logger.info("Compute logbook.") if session is None: session = app.session - or_args = [between(TakeoffLanding.timestamp, '2016-06-28 00:00:00', '2016-06-28 23:59:59')] - or_args = [] - # 'wo' is the window order for the sql window function wo = and_(func.date(TakeoffLanding.timestamp), TakeoffLanding.device_id, @@ -43,7 +40,6 @@ def compute_logbook_entries(session=None): TakeoffLanding.airport_id, func.lag(TakeoffLanding.airport_id).over(order_by=wo).label('airport_id_prev'), func.lead(TakeoffLanding.airport_id).over(order_by=wo).label('airport_id_next')) \ - .filter(*or_args) \ .subquery() # find complete flights (with takeoff and landing on the same day) @@ -156,4 +152,36 @@ def compute_logbook_entries(session=None): session.commit() logger.debug("New logbook entries: {}".format(insert_counter)) - return "{}/{}".format(update_counter, insert_counter) + return "Logbook entries: {} inserted, {} updated".format(update_counter, insert_counter) + + +@app.task +def update_max_altitude(session=None): + logger.info("Update logbook max altitude.") + + if session is None: + session = app.session + + logbook_entries = session.query(Logbook.id) \ + .filter(and_(Logbook.takeoff_timestamp != null(), Logbook.landing_timestamp != null(), Logbook.max_altitude == null())) \ + .limit(1000) \ + .subquery() + + max_altitudes = session.query(Logbook.id, func.max(AircraftBeacon.altitude).label('max_altitude')) \ + .filter(Logbook.id == logbook_entries.c.id) \ + .filter(and_(AircraftBeacon.device_id == Logbook.device_id, + AircraftBeacon.timestamp >= Logbook.takeoff_timestamp, + AircraftBeacon.timestamp <= Logbook.landing_timestamp)) \ + .group_by(Logbook.id) \ + .subquery() + + update_logbook = app.session.query(Logbook) \ + .filter(Logbook.id == max_altitudes.c.id) \ + .update({ + Logbook.max_altitude: max_altitudes.c.max_altitude}, + synchronize_session='fetch') + + session.commit() + logger.info("Logbook: {} entries updated.".format(update_logbook)) + + return "Logbook: {} entries updated.".format(update_logbook) diff --git a/ogn/collect/receiver.py b/ogn/collect/receiver.py deleted file mode 100644 index 81fdf83..0000000 --- a/ogn/collect/receiver.py +++ /dev/null @@ -1,111 +0,0 @@ -from sqlalchemy.sql import func, null -from sqlalchemy.sql.functions import coalesce -from sqlalchemy import and_, not_, or_ - -from celery.utils.log import get_task_logger - -from ogn.model import Receiver, ReceiverBeacon -from ogn.utils import get_country_code -from ogn.collect.celery import app - -logger = get_task_logger(__name__) - - -@app.task -def update_receivers(): - """Update the receiver table.""" - # get the timestamp of last update - last_update_query = app.session.query(coalesce(func.max(Receiver.lastseen), '2015-01-01 00:00:00').label('last_entry')) - last_update = last_update_query.one().last_entry - - # get last receiver beacons since last update - last_receiver_beacon_sq = app.session.query(ReceiverBeacon.name, - func.max(ReceiverBeacon.timestamp).label('lastseen')) \ - .filter(ReceiverBeacon.timestamp >= last_update) \ - .group_by(ReceiverBeacon.name) \ - .subquery() - - # update receivers - receivers_to_update = app.session.query(ReceiverBeacon.name, - ReceiverBeacon.location_wkt, - ReceiverBeacon.altitude, - last_receiver_beacon_sq.columns.lastseen, - ReceiverBeacon.version, - ReceiverBeacon.platform) \ - .filter(and_(ReceiverBeacon.name == last_receiver_beacon_sq.columns.name, - ReceiverBeacon.timestamp == last_receiver_beacon_sq.columns.lastseen)) \ - .subquery() - - # ... set country code to None if lat or lon changed - changed_count = app.session.query(Receiver) \ - .filter(Receiver.name == receivers_to_update.columns.name) \ - .filter(or_(not_(func.ST_Equals(Receiver.location_wkt, receivers_to_update.columns.location)), - and_(Receiver.location_wkt == null(), - receivers_to_update.columns.location != null()))) \ - .update({"location_wkt": receivers_to_update.columns.location, - "country_code": null()}, - synchronize_session=False) - - # ... and update altitude, lastseen, version and platform - update_count = app.session.query(Receiver) \ - .filter(Receiver.name == receivers_to_update.columns.name) \ - .update({"altitude": receivers_to_update.columns.altitude, - "lastseen": receivers_to_update.columns.lastseen, - "version": receivers_to_update.columns.version, - "platform": receivers_to_update.columns.platform}) - - # add new receivers - empty_sq = app.session.query(ReceiverBeacon.name, - ReceiverBeacon.location_wkt, - ReceiverBeacon.altitude, - last_receiver_beacon_sq.columns.lastseen, - ReceiverBeacon.version, ReceiverBeacon.platform) \ - .filter(and_(ReceiverBeacon.name == last_receiver_beacon_sq.columns.name, - ReceiverBeacon.timestamp == last_receiver_beacon_sq.columns.lastseen)) \ - .outerjoin(Receiver, Receiver.name == ReceiverBeacon.name) \ - .filter(Receiver.name == null()) \ - .order_by(ReceiverBeacon.name) - - for receiver_beacon in empty_sq.all(): - receiver = Receiver() - receiver.name = receiver_beacon.name - receiver.location_wkt = receiver_beacon.location_wkt - receiver.altitude = receiver_beacon.altitude - receiver.firstseen = None - receiver.lastseen = receiver_beacon.lastseen - receiver.version = receiver_beacon.version - receiver.platform = receiver_beacon.platform - - app.session.add(receiver) - logger.info("{} added".format(receiver.name)) - - # update firstseen if None - firstseen_null_query = app.session.query(Receiver.name, - func.min(ReceiverBeacon.timestamp).label('firstseen')) \ - .filter(Receiver.firstseen == null()) \ - .join(ReceiverBeacon, Receiver.name == ReceiverBeacon.name) \ - .group_by(Receiver.name) \ - .subquery() - - added_count = app.session.query(Receiver) \ - .filter(Receiver.name == firstseen_null_query.columns.name) \ - .update({'firstseen': firstseen_null_query.columns.firstseen}) - - # update country code if None - unknown_country_query = app.session.query(Receiver) \ - .filter(Receiver.country_code == null()) \ - .filter(Receiver.location_wkt != null()) \ - .order_by(Receiver.name) - - for receiver in unknown_country_query.all(): - location = receiver.location - country_code = get_country_code(location.latitude, location.longitude) - if country_code is not None: - receiver.country_code = country_code - logger.info("Updated country_code for {} to {}".format(receiver.name, receiver.country_code)) - - logger.info("Added: {}, location changed: {}".format(added_count, changed_count)) - - app.session.commit() - - return update_count diff --git a/ogn/collect/stats.py b/ogn/collect/stats.py new file mode 100644 index 0000000..fe3a069 --- /dev/null +++ b/ogn/collect/stats.py @@ -0,0 +1,85 @@ +from celery.utils.log import get_task_logger + +from sqlalchemy import insert, distinct +from sqlalchemy.sql import null, and_, func +from sqlalchemy.sql.expression import literal_column + +from ogn.model import AircraftBeacon, DeviceStats, ReceiverStats + +from .celery import app + +logger = get_task_logger(__name__) + + +@app.task +def update_device_stats(date=None): + """Add/update entries in device stats table.""" + + if not date: + logger.warn("A date is needed for calculating stats. Exiting") + return None + + # First kill the stats for the selected date + deleted_counter = app.session.query(DeviceStats) \ + .filter(DeviceStats.date == date) \ + .delete() + + # Calculate stats for the selected date + device_stats = app.session.query( + AircraftBeacon.device_id, + func.date(AircraftBeacon.timestamp).label('date'), + func.count(distinct(AircraftBeacon.receiver_id)).label('receiver_count'), + func.count(AircraftBeacon.id).label('aircraft_beacon_count'), + func.max(AircraftBeacon.altitude).label('max_altitude')) \ + .filter(and_(AircraftBeacon.device_id != null(), AircraftBeacon.receiver_id != null())) \ + .filter(func.date(AircraftBeacon.timestamp) == date) \ + .group_by(AircraftBeacon.device_id, func.date(AircraftBeacon.timestamp)) \ + .subquery() + + # And insert them + ins = insert(DeviceStats).from_select( + [DeviceStats.device_id, DeviceStats.date, DeviceStats.receiver_count, DeviceStats.aircraft_beacon_count, DeviceStats.max_altitude], + device_stats) + res = app.session.execute(ins) + insert_counter = res.rowcount + app.session.commit() + logger.debug("DeviceStats entries for {}: {} deleted, {} inserted".format(date, deleted_counter, insert_counter)) + + return "DeviceStats entries for {}: {} deleted, {} inserted".format(date, deleted_counter, insert_counter) + + +@app.task +def update_receiver_stats(date=None): + """Add/update entries in receiver stats table.""" + + if not date: + logger.warn("A date is needed for calculating stats. Exiting") + return None + + # First kill the stats for the selected date + deleted_counter = app.session.query(ReceiverStats) \ + .filter(ReceiverStats.date == date) \ + .delete() + + # Calculate stats for the selected date + receiver_stats = app.session.query( + AircraftBeacon.receiver_id, + literal_column("'{}'".format(date)).label('date'), + func.count(AircraftBeacon.id).label('aircraft_beacon_count'), + func.count(distinct(AircraftBeacon.device_id)).label('aircraft_count'), + func.max(AircraftBeacon.distance).label('max_distance')) \ + .filter(AircraftBeacon.receiver_id != null()) \ + .filter(func.date(AircraftBeacon.timestamp) == date) \ + .group_by(AircraftBeacon.receiver_id) \ + .subquery() + + # And insert them + ins = insert(ReceiverStats).from_select( + [ReceiverStats.receiver_id, ReceiverStats.date, ReceiverStats.aircraft_beacon_count, ReceiverStats.aircraft_count, ReceiverStats.max_distance], + receiver_stats) + res = app.session.execute(ins) + insert_counter = res.rowcount + app.session.commit() + logger.debug("ReceiverStats entries for {}: {} deleted, {} inserted".format(date, deleted_counter, insert_counter)) + + return "ReceiverStats entries for {}: {} deleted, {} inserted".format(date, deleted_counter, insert_counter) diff --git a/ogn/collect/takeoff_landing.py b/ogn/collect/takeoff_landing.py index e4572cd..558a365 100644 --- a/ogn/collect/takeoff_landing.py +++ b/ogn/collect/takeoff_landing.py @@ -13,7 +13,7 @@ logger = get_task_logger(__name__) @app.task -def compute_takeoff_and_landing(session=None): +def update_takeoff_landing(session=None): logger.info("Compute takeoffs and landings.") if session is None: @@ -29,10 +29,10 @@ def compute_takeoff_and_landing(session=None): takeoff_speed = 55 # takeoff detection: 1st point below, 2nd and 3rd above this limit landing_speed = 40 # landing detection: 1st point above, 2nd and 3rd below this limit duration = 100 # the points must not exceed this duration - radius = 0.05 # the points must not exceed this radius (degree!) around the 2nd point + radius = 5000 # the points must not exceed this radius around the 2nd point # takeoff / landing has to be near an airport - airport_radius = 0.025 # takeoff / landing must not exceed this radius (degree!) around the airport + airport_radius = 2500 # takeoff / landing must not exceed this radius around the airport airport_delta = 100 # takeoff / landing must not exceed this altitude offset above/below the airport # 'wo' is the window order for the sql window function @@ -41,8 +41,19 @@ def compute_takeoff_and_landing(session=None): AircraftBeacon.receiver_id) # make a query with current, previous and next position + beacon_selection = session.query(AircraftBeacon.id) \ + .filter(AircraftBeacon.status == null()) \ + .order_by(AircraftBeacon.timestamp) \ + .limit(1000000) \ + .subquery() + sq = session.query( AircraftBeacon.id, + func.lag(AircraftBeacon.id).over(order_by=wo).label('id_prev'), + func.lead(AircraftBeacon.id).over(order_by=wo).label('id_next'), + AircraftBeacon.device_id, + func.lag(AircraftBeacon.device_id).over(order_by=wo).label('device_id_prev'), + func.lead(AircraftBeacon.device_id).over(order_by=wo).label('device_id_next'), AircraftBeacon.timestamp, func.lag(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_prev'), func.lead(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_next'), @@ -57,16 +68,14 @@ def compute_takeoff_and_landing(session=None): func.lead(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_next'), AircraftBeacon.altitude, func.lag(AircraftBeacon.altitude).over(order_by=wo).label('altitude_prev'), - func.lead(AircraftBeacon.altitude).over(order_by=wo).label('altitude_next'), - AircraftBeacon.device_id, - func.lag(AircraftBeacon.device_id).over(order_by=wo).label('device_id_prev'), - func.lead(AircraftBeacon.device_id).over(order_by=wo).label('device_id_next')) \ - .filter(AircraftBeacon.status == null()) \ + func.lead(AircraftBeacon.altitude).over(order_by=wo).label('altitude_next')) \ + .filter(AircraftBeacon.id == beacon_selection.c.id) \ .subquery() + # consider only positions with the same device id sq2 = session.query(sq) \ - .filter(sq.c.device_id_prev == sq.c.device_id == sq.c.device_id_next) \ - .subquery() + .filter(sq.c.device_id_prev == sq.c.device_id == sq.c.device_id_next) \ + .subquery() # find possible takeoffs and landings sq3 = session.query( @@ -82,8 +91,8 @@ def compute_takeoff_and_landing(session=None): (sq2.c.ground_speed < landing_speed, False)]).label('is_takeoff'), sq2.c.device_id) \ .filter(sq2.c.timestamp_next - sq2.c.timestamp_prev < timedelta(seconds=duration)) \ - .filter(and_(func.ST_DFullyWithin(sq2.c.location, sq2.c.location_wkt_prev, radius), - func.ST_DFullyWithin(sq2.c.location, sq2.c.location_wkt_next, radius))) \ + .filter(and_(func.ST_Distance_Sphere(sq2.c.location, sq2.c.location_wkt_prev) < radius, + func.ST_Distance_Sphere(sq2.c.location, sq2.c.location_wkt_next) < radius)) \ .filter(or_(and_(sq2.c.ground_speed_prev < takeoff_speed, # takeoff sq2.c.ground_speed > takeoff_speed, sq2.c.ground_speed_next > takeoff_speed), @@ -99,7 +108,7 @@ def compute_takeoff_and_landing(session=None): sq3.c.is_takeoff, sq3.c.device_id, Airport.id.label('airport_id')) \ - .filter(and_(func.ST_DFullyWithin(sq3.c.location, Airport.location_wkt, airport_radius), + .filter(and_(func.ST_Distance_Sphere(sq3.c.location, Airport.location_wkt) < airport_radius, between(sq3.c.altitude, Airport.altitude - airport_delta, Airport.altitude + airport_delta))) \ .filter(between(Airport.style, 2, 5)) \ .subquery() @@ -120,7 +129,14 @@ def compute_takeoff_and_landing(session=None): takeoff_landing_query) result = session.execute(ins) counter = result.rowcount + + # mark the computated AircraftBeacons as 'used' + update_aircraft_beacons = session.query(AircraftBeacon) \ + .filter(AircraftBeacon.id == sq2.c.id) \ + .update({AircraftBeacon.status: 1}, + synchronize_session='fetch') + session.commit() - logger.debug("New takeoffs and landings: {}".format(counter)) + logger.debug("Inserted {} TakeoffLandings, updated {} AircraftBeacons".format(counter, update_aircraft_beacons)) return counter diff --git a/ogn/commands/bulkimport.py b/ogn/commands/bulkimport.py index 9d5c2ae..0a0d3a5 100644 --- a/ogn/commands/bulkimport.py +++ b/ogn/commands/bulkimport.py @@ -1,4 +1,5 @@ import os +import re from manager import Manager from ogn.commands.dbutils import session @@ -9,18 +10,19 @@ from ogn.utils import open_file manager = Manager() +PATTERN = '^.+\.txt\_(\d{4}\-\d{2}\-\d{2})(\.gz)?$' + @manager.command def convert_logfile(path, logfile='main.log', loglevel='INFO'): """Convert ogn logfiles to csv logfiles (one for aircraft beacons and one for receiver beacons) . Logfile name: blablabla.txt_YYYY-MM-DD.""" if os.path.isfile(path): - print("Reading file: {}".format(path)) - convert(path) + head, tail = os.path.split(path) + convert(tail, path=head) print("Finished") elif os.path.isdir(path): for filename in os.listdir(path): - print("Reading file: {}".format(filename)) convert(filename, path=path) print("Finished") else: @@ -28,15 +30,25 @@ def convert_logfile(path, logfile='main.log', loglevel='INFO'): def convert(sourcefile, path=''): - import re import csv import gzip import datetime - match = re.search('^.+\.txt\_(\d{4}\-\d{2}\-\d{2})(\.gz)?$', sourcefile) + match = re.search(PATTERN, sourcefile) if match: reference_date_string = match.group(1) reference_date = datetime.datetime.strptime(reference_date_string, "%Y-%m-%d") + + aircraft_beacon_filename = os.path.join(path, 'aircraft_beacons.csv_' + reference_date_string + '.gz') + receiver_beacon_filename = os.path.join(path, 'receiver_beacons.csv_' + reference_date_string + '.gz') + + if not os.path.exists(aircraft_beacon_filename) and not os.path.exists(receiver_beacon_filename): + print("Reading file: {}".format(sourcefile)) + fout_ab = gzip.open(aircraft_beacon_filename, 'wt') + fout_rb = gzip.open(receiver_beacon_filename, 'wt') + else: + print("Output files for file {} already exists. Skipping".format(sourcefile)) + return else: print("filename '{}' does not match pattern. Skipping".format(sourcefile)) return @@ -49,16 +61,6 @@ def convert(sourcefile, path=''): total += 1 fin.seek(0) - aircraft_beacon_filename = os.path.join(path, 'aircraft_beacons.csv_' + reference_date_string + '.gz') - receiver_beacon_filename = os.path.join(path, 'receiver_beacons.csv_' + reference_date_string + '.gz') - - if not os.path.exists(aircraft_beacon_filename) and not os.path.exists(receiver_beacon_filename): - fout_ab = gzip.open(aircraft_beacon_filename, 'wt') - fout_rb = gzip.open(receiver_beacon_filename, 'wt') - else: - print("Output files already exists. Skipping") - return - aircraft_beacons = list() receiver_beacons = list() @@ -182,7 +184,7 @@ def import_logfile(path): else: print("For {} beacons already exist. Skipping".format(reference_date_string)) else: - print("Unknown file type: {}".format()) + print("Unknown file type: {}".format(tail)) def check_no_beacons(tablename, reference_date_string): diff --git a/ogn/commands/database.py b/ogn/commands/database.py index 9af5717..ca1dd20 100644 --- a/ogn/commands/database.py +++ b/ogn/commands/database.py @@ -1,10 +1,10 @@ from manager import Manager from ogn.collect.database import update_device_infos from ogn.commands.dbutils import engine, session -from ogn.model import Base, AddressOrigin, AircraftBeacon, ReceiverBeacon, Device, Receiver +from ogn.model import Base, DeviceInfoOrigin, AircraftBeacon, ReceiverBeacon from ogn.utils import get_airports -from sqlalchemy import insert, distinct -from sqlalchemy.sql import null +from sqlalchemy import distinct +from sqlalchemy.sql import null, func manager = Manager() @@ -53,7 +53,7 @@ def import_ddb(): """Import registered devices from the DDB.""" print("Import registered devices fom the DDB...") - address_origin = AddressOrigin.ogn_ddb + address_origin = DeviceInfoOrigin.ogn_ddb counter = update_device_infos(session, address_origin) print("Imported %i devices." % counter) @@ -65,7 +65,7 @@ def import_file(path='tests/custom_ddb.txt'): # (flushes previously manually imported entries) print("Import registered devices from '{}'...".format(path)) - address_origin = AddressOrigin.user_defined + address_origin = DeviceInfoOrigin.user_defined counter = update_device_infos(session, address_origin, csvfile=path) @@ -81,53 +81,3 @@ def import_airports(path='tests/SeeYou.cup'): session.bulk_save_objects(airports) session.commit() print("Imported {} airports.".format(len(airports))) - - -@manager.command -def update_relations(): - """Update AircraftBeacon and ReceiverBeacon relations""" - - # Create missing Receiver from ReceiverBeacon - available_receivers = session.query(Receiver.name) \ - .subquery() - - missing_receiver_query = session.query(distinct(ReceiverBeacon.name)) \ - .filter(ReceiverBeacon.receiver_id == null()) \ - .filter(~ReceiverBeacon.name.in_(available_receivers)) - - ins = insert(Receiver).from_select([Receiver.name], missing_receiver_query) - session.execute(ins) - - # Create missing Device from AircraftBeacon - available_addresses = session.query(Device.address) \ - .subquery() - - missing_addresses_query = session.query(distinct(AircraftBeacon.address)) \ - .filter(AircraftBeacon.device_id == null()) \ - .filter(~AircraftBeacon.address.in_(available_addresses)) - - ins2 = insert(Device).from_select([Device.address], missing_addresses_query) - session.execute(ins2) - session.commit() - print("Inserted {} Receivers and {} Devices".format(ins, ins2)) - return - - # Update AircraftBeacons - upd = session.query(AircraftBeacon) \ - .filter(AircraftBeacon.device_id == null()) \ - .filter(AircraftBeacon.receiver_id == null()) \ - .filter(AircraftBeacon.address == Device.address) \ - .filter(AircraftBeacon.receiver_name == Receiver.name) \ - .update({AircraftBeacon.device_id: Device.id, - AircraftBeacon.receiver_id: Receiver.id}, - synchronize_session='fetch') - - upd2 = session.query(ReceiverBeacon) \ - .filter(ReceiverBeacon.receiver_id == null()) \ - .filter(ReceiverBeacon.receiver_name == Receiver.name) \ - .update({Receiver.name: ReceiverBeacon.receiver_name}, - synchronize_session='fetch') - - session.commit() - print("Updated {} AircraftBeacons and {} ReceiverBeacons". - format(upd, upd2)) diff --git a/ogn/commands/logbook.py b/ogn/commands/logbook.py index a603933..a9c712d 100644 --- a/ogn/commands/logbook.py +++ b/ogn/commands/logbook.py @@ -3,8 +3,8 @@ from datetime import timedelta, datetime from manager import Manager -from ogn.collect.logbook import compute_logbook_entries -from ogn.collect.takeoff_landing import compute_takeoff_and_landing +from ogn.collect.logbook import update_logbook +from ogn.collect.takeoff_landing import update_takeoff_landing from ogn.commands.dbutils import session from ogn.model import Device, DeviceInfo, TakeoffLanding, Airport, Logbook from sqlalchemy import and_, or_ @@ -19,7 +19,7 @@ manager = Manager() def compute_takeoff_landing(): """Compute takeoffs and landings.""" print("Compute takeoffs and landings...") - result = compute_takeoff_and_landing.delay() + result = update_takeoff_landing.delay() counter = result.get() print("New takeoffs/landings: {}".format(counter)) @@ -28,7 +28,7 @@ def compute_takeoff_landing(): def compute_logbook(): """Compute logbook.""" print("Compute logbook...") - result = compute_logbook_entries.delay() + result = update_logbook.delay() counter = result.get() print("New logbook entries: {}".format(counter)) @@ -63,7 +63,8 @@ def show(airport_name, utc_delta_hours=0, date=None): # get all logbook entries and add device and airport infos takeoff_airport = aliased(Airport, name='takeoff_airport') landing_airport = aliased(Airport, name='landing_airport') - logbook_query = session.query(Logbook, + logbook_query = session.query(func.row_number().over(order_by=Logbook.reftime).label('row_number'), + Logbook, Device, sq3.c.registration, sq3.c.aircraft) \ @@ -105,8 +106,9 @@ def show(airport_name, utc_delta_hours=0, date=None): def none_altitude_replacer(altitude_object, airport_object): return "?" if altitude_object is None else "{:5d}m ({:+5d}m)".format(altitude_object, altitude_object - airport_object.altitude) - for [logbook, device, registration, aircraft] in logbook_query.all(): - print('%10s %8s (%2s) %8s (%2s) %8s %15s %8s %17s %20s' % ( + for [row_number, logbook, device, registration, aircraft] in logbook_query.all(): + print('%3d. %10s %8s (%2s) %8s (%2s) %8s %15s %8s %17s %20s' % ( + row_number, logbook.reftime.date(), none_datetime_replacer(logbook.takeoff_timestamp), none_track_replacer(logbook.takeoff_track), diff --git a/ogn/commands/showdeviceinfos.py b/ogn/commands/showdeviceinfos.py index b39b856..b1a46aa 100644 --- a/ogn/commands/showdeviceinfos.py +++ b/ogn/commands/showdeviceinfos.py @@ -1,6 +1,6 @@ from manager import Manager from ogn.commands.dbutils import session -from ogn.model import AddressOrigin +from ogn.model import DeviceInfoOrigin from ogn.model.device_info import DeviceInfo from sqlalchemy import func, and_, true, false @@ -39,7 +39,7 @@ def get_devices_stats(session): stats = {} for [address_origin, device_count, default_count, nt_count, ni_count, ntni_count] in query.all(): - origin = AddressOrigin(address_origin).name() + origin = DeviceInfoOrigin(address_origin).name() stats[origin] = {'device_count': device_count, 'default_count': default_count, 'nt_count': nt_count, diff --git a/ogn/gateway/process.py b/ogn/gateway/process.py index 57b0168..0c4b797 100644 --- a/ogn/gateway/process.py +++ b/ogn/gateway/process.py @@ -31,6 +31,9 @@ def message_to_beacon(raw_message, reference_date): beacon = ReceiverBeacon(**message) else: print("Whoops: what is this: {}".format(message)) + except NotImplementedError as e: + logger.error('Received message: {}'.format(raw_message)) + logger.error(e) except ParseError as e: logger.error('Received message: {}'.format(raw_message)) logger.error('Drop packet, {}'.format(e.message)) diff --git a/ogn/model/__init__.py b/ogn/model/__init__.py index e83b673..57c42c4 100644 --- a/ogn/model/__init__.py +++ b/ogn/model/__init__.py @@ -1,13 +1,15 @@ # flake8: noqa -from .address_origin import AddressOrigin from .aircraft_type import AircraftType from .base import Base from .beacon import Beacon from .device import Device from .device_info import DeviceInfo +from .device_info_origin import DeviceInfoOrigin +from .device_stats import DeviceStats from .aircraft_beacon import AircraftBeacon from .receiver_beacon import ReceiverBeacon from .receiver import Receiver +from .receiver_stats import ReceiverStats from .takeoff_landing import TakeoffLanding from .airport import Airport from .logbook import Logbook diff --git a/ogn/model/aircraft_beacon.py b/ogn/model/aircraft_beacon.py index f0eea61..f75360e 100644 --- a/ogn/model/aircraft_beacon.py +++ b/ogn/model/aircraft_beacon.py @@ -38,6 +38,9 @@ class AircraftBeacon(Beacon): status = Column(SmallInteger, index=True) + # Calculated values + distance = Column(Float) + # Relations receiver_id = Column(Integer, ForeignKey('receiver.id', ondelete='SET NULL'), index=True) receiver = relationship('Receiver', foreign_keys=[receiver_id]) diff --git a/ogn/model/device.py b/ogn/model/device.py index 491817c..753e481 100644 --- a/ogn/model/device.py +++ b/ogn/model/device.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, Integer, String, Float, Boolean, SmallInteger +from sqlalchemy import Column, Integer, String, Float, Boolean, SmallInteger, DateTime from sqlalchemy.orm import relationship from .base import Base @@ -9,11 +9,15 @@ class Device(Base): id = Column(Integer, primary_key=True) address = Column(String(6), index=True) + firstseen = Column(DateTime, index=True) + lastseen = Column(DateTime, index=True) aircraft_type = Column(SmallInteger, index=True) stealth = Column(Boolean) software_version = Column(Float) hardware_version = Column(SmallInteger) real_address = Column(String(6)) + firstseen = Column(DateTime, index=True) + lastseen = Column(DateTime, index=True) # Relations aircraft_beacons = relationship('AircraftBeacon') diff --git a/ogn/model/address_origin.py b/ogn/model/device_info_origin.py similarity index 96% rename from ogn/model/address_origin.py rename to ogn/model/device_info_origin.py index a269973..4105169 100644 --- a/ogn/model/address_origin.py +++ b/ogn/model/device_info_origin.py @@ -1,4 +1,4 @@ -class AddressOrigin: +class DeviceInfoOrigin: unknown = 0 ogn_ddb = 1 flarmnet = 2 diff --git a/ogn/model/device_stats.py b/ogn/model/device_stats.py new file mode 100644 index 0000000..b881ad4 --- /dev/null +++ b/ogn/model/device_stats.py @@ -0,0 +1,19 @@ +from sqlalchemy import Column, Integer, Date, Float, ForeignKey +from sqlalchemy.orm import relationship + +from .base import Base + + +class DeviceStats(Base): + __tablename__ = "device_stats" + + id = Column(Integer, primary_key=True) + + date = Column(Date) + receiver_count = Column(Integer) + aircraft_beacon_count = Column(Integer) + max_altitude = Column(Float) + + # Relations + device_id = Column(Integer, ForeignKey('device.id', ondelete='SET NULL'), index=True) + device = relationship('Device', foreign_keys=[device_id]) diff --git a/ogn/model/receiver_stats.py b/ogn/model/receiver_stats.py new file mode 100644 index 0000000..d24e311 --- /dev/null +++ b/ogn/model/receiver_stats.py @@ -0,0 +1,20 @@ +from sqlalchemy import Column, Integer, Date, Float, ForeignKey +from sqlalchemy.orm import relationship + +from .base import Base + + +class ReceiverStats(Base): + __tablename__ = "receiver_stats" + + id = Column(Integer, primary_key=True) + + date = Column(Date) + aircraft_beacon_count = Column(Integer) + receiver_beacon_count = Column(Integer) + aircraft_count = Column(Integer) + max_distance = Column(Float) + + # Relations + receiver_id = Column(Integer, ForeignKey('receiver.id', ondelete='SET NULL'), index=True) + receiver = relationship('Receiver', foreign_keys=[receiver_id]) diff --git a/ogn/utils.py b/ogn/utils.py index fa7caf2..8cd4cfd 100644 --- a/ogn/utils.py +++ b/ogn/utils.py @@ -8,7 +8,7 @@ from geopy.geocoders import Nominatim from ogn.parser.utils import feet2m import requests -from .model import AddressOrigin, DeviceInfo, Airport, Location +from .model import DeviceInfoOrigin, DeviceInfo, Airport, Location DDB_URL = "http://ddb.glidernet.org/download/?t=1" @@ -22,7 +22,7 @@ nm2m = 1852 mi2m = 1609.34 -def get_ddb(csvfile=None, address_origin=AddressOrigin.unknown): +def get_ddb(csvfile=None, address_origin=DeviceInfoOrigin.unknown): if csvfile is None: r = requests.get(DDB_URL) rows = '\n'.join(i for i in r.text.splitlines() if i[0] != '#') diff --git a/setup.py b/setup.py index 167a3cd..1c6ad32 100644 --- a/setup.py +++ b/setup.py @@ -32,22 +32,22 @@ setup( keywords='gliding ogn', packages=find_packages(exclude=['tests', 'tests.*']), install_requires=[ - 'SQLAlchemy==1.1.10', + 'SQLAlchemy==1.1.15', 'geopy==1.11.0', 'manage.py==0.2.10', 'celery[redis]>=3.1,<3.2', - 'alembic==0.9.2', - 'aerofiles==0.3', + 'alembic==0.9.6', + 'aerofiles==0.4', 'geoalchemy2==0.4.0', - 'shapely==1.5.17.post1', + 'shapely>=1.5.17,<1.6', 'ogn-client==0.8.0', - 'psycopg2==2.7.1' + 'psycopg2==2.7.3.2' ], extras_require={ 'dev': [ 'nose==1.3.7', - 'coveralls==1.1', - 'flake8==3.3.0' + 'coveralls==1.2', + 'flake8==3.5.0' ] }, zip_safe=False diff --git a/tests/collect/test_logbook.py b/tests/collect/test_logbook.py index 3836948..9d979dc 100644 --- a/tests/collect/test_logbook.py +++ b/tests/collect/test_logbook.py @@ -1,7 +1,10 @@ import unittest import os -from ogn.collect.logbook import compute_logbook_entries +from sqlalchemy.sql import null, and_ + +from ogn.model import Logbook, Airport +from ogn.collect.logbook import update_logbook class TestDB(unittest.TestCase): @@ -35,17 +38,43 @@ class TestDB(unittest.TestCase): session.commit() pass + def count_logbook_entries(self): + session = self.session + query = session.query(Logbook) + return len(query.all()) + + def assert_entries(self, koen_to=0, koen_ldg=0, koen_complete=0, ohl_to=0, ohl_ldg=0, ohl_complete=0): + session = self.session + + entries = len(session.query(Logbook).filter(and_(Airport.id == Logbook.takeoff_airport_id, Airport.name == 'Koenigsdorf')).filter(Logbook.landing_airport_id == null()).all()) + self.assertEqual(entries, koen_to) + + entries = len(session.query(Logbook).filter(and_(Airport.id == Logbook.landing_airport_id, Airport.name == 'Koenigsdorf')).filter(Logbook.takeoff_airport_id == null()).all()) + self.assertEqual(entries, koen_ldg) + + entries = len(session.query(Logbook).filter(and_(Airport.id == Logbook.takeoff_airport_id, Airport.name == 'Koenigsdorf')).filter(Logbook.takeoff_airport_id == Logbook.landing_airport_id).all()) + self.assertEqual(entries, koen_complete) + + entries = len(session.query(Logbook).filter(and_(Airport.id == Logbook.takeoff_airport_id, Airport.name == 'Ohlstadt')).filter(Logbook.landing_airport_id == null()).all()) + self.assertEqual(entries, ohl_to) + + entries = len(session.query(Logbook).filter(and_(Airport.id == Logbook.landing_airport_id, Airport.name == 'Ohlstadt')).filter(Logbook.takeoff_airport_id == null()).all()) + self.assertEqual(entries, ohl_ldg) + + entries = len(session.query(Logbook).filter(and_(Airport.id == Logbook.takeoff_airport_id, Airport.name == 'Ohlstadt')).filter(Logbook.takeoff_airport_id == Logbook.landing_airport_id).all()) + self.assertEqual(entries, ohl_complete) + def test_single_takeoff(self): session = self.session session.execute(self.TAKEOFF_KOENIGSDF_DD0815) session.commit() - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '0/1') + update_logbook(session) + self.assert_entries(koen_to=1) - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '0/0') + update_logbook(session) + self.assert_entries(koen_to=1) def test_single_landing(self): session = self.session @@ -53,11 +82,11 @@ class TestDB(unittest.TestCase): session.execute(self.LANDING_KOENIGSDF_DD0815) session.commit() - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '0/1') + update_logbook(session) + self.assert_entries(koen_ldg=1) - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '0/0') + update_logbook(session) + self.assert_entries(koen_ldg=1) def test_different_takeoffs(self): session = self.session @@ -66,11 +95,11 @@ class TestDB(unittest.TestCase): session.execute(self.TAKEOFF_OHLSTADT_DD4711) session.commit() - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '0/2') + update_logbook(session) + self.assert_entries(koen_to=1, ohl_to=1) - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '0/0') + update_logbook(session) + self.assert_entries(koen_to=1, ohl_to=1) def test_takeoff_and_landing(self): session = self.session @@ -79,11 +108,11 @@ class TestDB(unittest.TestCase): session.execute(self.LANDING_KOENIGSDF_DD0815) session.commit() - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '0/1') + update_logbook(session) + self.assert_entries(koen_complete=1) - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '0/0') + update_logbook(session) + self.assert_entries(koen_complete=1) def test_takeoff_and_landing_on_different_days(self): session = self.session @@ -92,11 +121,11 @@ class TestDB(unittest.TestCase): session.execute(self.LANDING_KOENIGSDF_DD0815_LATER) session.commit() - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '0/2') + update_logbook(session) + self.assert_entries(koen_to=1, koen_ldg=1) - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '0/0') + update_logbook(session) + self.assert_entries(koen_to=1, koen_ldg=1) def test_update(self): session = self.session @@ -104,23 +133,23 @@ class TestDB(unittest.TestCase): session.execute(self.TAKEOFF_KOENIGSDF_DD0815) session.commit() - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '0/1') + update_logbook(session) + self.assert_entries(koen_to=1) session.execute(self.LANDING_KOENIGSDF_DD0815) session.commit() - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '1/0') + update_logbook(session) + self.assert_entries(koen_complete=1) session.execute(self.TAKEOFF_OHLSTADT_DD4711) session.commit() - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '0/1') + update_logbook(session) + self.assert_entries(koen_complete=1, ohl_to=1) - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '0/0') + update_logbook(session) + self.assert_entries(koen_complete=1, ohl_to=1) def test_update_wrong_order(self): session = self.session @@ -128,14 +157,14 @@ class TestDB(unittest.TestCase): session.execute(self.LANDING_KOENIGSDF_DD0815) session.commit() - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '0/1') + update_logbook(session) + self.assert_entries(koen_ldg=1) session.execute(self.TAKEOFF_KOENIGSDF_DD0815) session.commit() - entries_changed = compute_logbook_entries(session) - self.assertEqual(entries_changed, '1/0') + update_logbook(session) + self.assert_entries(koen_complete=1) if __name__ == '__main__': diff --git a/tests/collect/test_takeoff_landing.py b/tests/collect/test_takeoff_landing.py index 4957621..d212ca6 100644 --- a/tests/collect/test_takeoff_landing.py +++ b/tests/collect/test_takeoff_landing.py @@ -3,7 +3,7 @@ import os from ogn.model import TakeoffLanding -from ogn.collect.takeoff_landing import compute_takeoff_and_landing +from ogn.collect.takeoff_landing import update_takeoff_landing class TestDB(unittest.TestCase): @@ -30,6 +30,7 @@ class TestDB(unittest.TestCase): session = self.session session.execute("DELETE FROM takeoff_landing") session.execute("DELETE FROM aircraft_beacon") + session.execute("DELETE FROM airport") session.commit() pass @@ -44,6 +45,7 @@ class TestDB(unittest.TestCase): return i def test_broken_rope(self): + """Fill the db with a winch launch where the rope breaks. The algorithm should detect one takeoff and one landing.""" session = self.session session.execute("INSERT INTO aircraft_beacon(address, location, altitude, timestamp, track, ground_speed, climb_rate, turn_rate) VALUES('DDEFF7','0101000020E61000009668B61829F12640330E0887F1E94740',604,'2016-07-02 10:47:12',0,0,0,0)") @@ -95,7 +97,12 @@ class TestDB(unittest.TestCase): session.execute("UPDATE aircraft_beacon SET device_id = d.id FROM device d WHERE d.address='DDEFF7'") session.commit() - compute_takeoff_and_landing(session) + # find the takeoff and the landing + update_takeoff_landing(session) + self.assertEqual(self.count_takeoff_and_landings(), 2) + + # we should not find the takeoff and the landing again + update_takeoff_landing(session) self.assertEqual(self.count_takeoff_and_landings(), 2) diff --git a/tests/gateway/test_manage.py b/tests/gateway/test_manage.py deleted file mode 100644 index b4a952d..0000000 --- a/tests/gateway/test_manage.py +++ /dev/null @@ -1,23 +0,0 @@ -import unittest -import unittest.mock as mock - -from ogn.gateway.manage import run -# from ogn.gateway.manage import import_logfile - - -class GatewayManagerTest(unittest.TestCase): - # try simple user interrupt - @mock.patch('ogn.gateway.manage.AprsClient') - def test_run_user_interruption(self, mock_aprs_client): - instance = mock_aprs_client.return_value - instance.run.side_effect = KeyboardInterrupt() - - run(aprs_user="testuser") - - instance.connect.assert_called_once_with() - self.assertEqual(instance.run.call_count, 1) - instance.disconnect.assert_called_once_with() - - -if __name__ == '__main__': - unittest.main()