From cb02e35611e97c7e39d64604e94e0342c880f848 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Tue, 1 Jan 2019 20:13:08 +0100 Subject: [PATCH] Refactoring --- ogn/collect/database.py | 3 +- ogn/collect/logbook.py | 10 +- ogn/collect/ognrange.py | 3 +- ogn/collect/stats.py | 128 +++++++++++++------------ ogn/collect/takeoff_landings.py | 159 ++++++++++++++++---------------- ogn/commands/bulkimport.py | 103 +++++++++++++-------- ogn/commands/database.py | 36 ++++++-- ogn/commands/logbook.py | 38 ++++---- ogn/commands/stats.py | 40 ++++---- ogn/gateway/manage.py | 3 +- ogn/gateway/process.py | 2 +- ogn/gateway/process_tools.py | 41 ++++---- ogn/model/aircraft_beacon.py | 10 +- ogn/model/airport.py | 1 - ogn/model/beacon.py | 2 +- ogn/model/country.py | 1 - ogn/model/device.py | 1 - ogn/model/flights2d.py | 5 +- ogn/model/receiver_beacon.py | 11 ++- ogn/model/receiver_coverage.py | 4 +- ogn/model/relation_stats.py | 5 +- ogn/model/takeoff_landing.py | 3 +- ogn/utils.py | 21 +++-- tests/collect/test_database.py | 8 +- tests/collect/test_stats.py | 34 +++---- tests/test_utils.py | 9 +- 26 files changed, 383 insertions(+), 298 deletions(-) diff --git a/ogn/collect/database.py b/ogn/collect/database.py index 479d1f8..30cdafc 100644 --- a/ogn/collect/database.py +++ b/ogn/collect/database.py @@ -1,8 +1,7 @@ from celery.utils.log import get_task_logger from sqlalchemy import insert, distinct -from sqlalchemy.sql import null, and_, or_, func, not_ -from sqlalchemy.sql.expression import case +from sqlalchemy.sql import null, and_, func, not_ from ogn.collect.celery import app from ogn.model import Country, DeviceInfo, DeviceInfoOrigin, AircraftBeacon, ReceiverBeacon, Device, Receiver diff --git a/ogn/collect/logbook.py b/ogn/collect/logbook.py index 07faf42..7040c6b 100644 --- a/ogn/collect/logbook.py +++ b/ogn/collect/logbook.py @@ -1,17 +1,18 @@ from celery.utils.log import get_task_logger -from sqlalchemy import and_, or_, insert, update, exists +from sqlalchemy import and_, or_, insert, update, exists, between from sqlalchemy.sql import func, null from sqlalchemy.sql.expression import true, false from ogn.collect.celery import app from ogn.model import TakeoffLanding, Logbook, AircraftBeacon +from ogn.utils import date_to_timestamps logger = get_task_logger(__name__) @app.task -def update_logbook(session=None): +def update_logbook(session=None, date=None): """Add/update logbook entries.""" logger.info("Compute logbook.") @@ -24,11 +25,13 @@ def update_logbook(session=None): TakeoffLanding.device_id, TakeoffLanding.airport_id, TakeoffLanding.timestamp) - + # 'pa' is the window partition for the sql window function pa = (func.date(TakeoffLanding.timestamp), TakeoffLanding.device_id) + (start, end) = date_to_timestamps(date) + # make a query with current, previous and next "takeoff_landing" event, so we can find complete flights sq = session.query( TakeoffLanding.device_id, @@ -46,6 +49,7 @@ def update_logbook(session=None): TakeoffLanding.airport_id, func.lag(TakeoffLanding.airport_id).over(order_by=wo).label('airport_id_prev'), func.lead(TakeoffLanding.airport_id).over(order_by=wo).label('airport_id_next')) \ + .filter(between(TakeoffLanding.timestamp, start, end)) \ .subquery() # find complete flights (with takeoff and landing on the same day) diff --git a/ogn/collect/ognrange.py b/ogn/collect/ognrange.py index ccb2396..7d8b7ac 100644 --- a/ogn/collect/ognrange.py +++ b/ogn/collect/ognrange.py @@ -1,9 +1,8 @@ from celery.utils.log import get_task_logger from sqlalchemy import String -from sqlalchemy import and_, or_, insert, update, exists +from sqlalchemy import and_, insert, update, exists from sqlalchemy.sql import func, null -from sqlalchemy.sql.expression import true, false from ogn.collect.celery import app from ogn.model import AircraftBeacon, ReceiverCoverage diff --git a/ogn/collect/stats.py b/ogn/collect/stats.py index ff2796c..1574c3f 100644 --- a/ogn/collect/stats.py +++ b/ogn/collect/stats.py @@ -1,15 +1,14 @@ -from datetime import datetime - from celery.utils.log import get_task_logger -from sqlalchemy import insert, distinct +from sqlalchemy import insert, distinct, between from sqlalchemy.sql import null, and_, func, or_, update -from sqlalchemy.sql.expression import literal_column, case +from sqlalchemy.sql.expression import case from ogn.model import AircraftBeacon, DeviceStats, ReceiverStats, RelationStats, Receiver, Device from .celery import app from ogn.model.receiver_beacon import ReceiverBeacon +from ogn.utils import date_to_timestamps logger = get_task_logger(__name__) @@ -24,6 +23,8 @@ def create_device_stats(session=None, date=None): if not date: logger.warn("A date is needed for calculating stats. Exiting") return None + else: + (start, end) = date_to_timestamps(date) # First kill the stats for the selected date deleted_counter = session.query(DeviceStats) \ @@ -35,7 +36,7 @@ def create_device_stats(session=None, date=None): func.dense_rank() .over(partition_by=AircraftBeacon.device_id, order_by=AircraftBeacon.receiver_id) .label('dr')) \ - .filter(and_(func.date(AircraftBeacon.timestamp) == date, AircraftBeacon.device_id != null())) \ + .filter(and_(between(AircraftBeacon.timestamp, start, end), AircraftBeacon.device_id != null())) \ .filter(or_(AircraftBeacon.error_count == 0, AircraftBeacon.error_count == null())) \ .subquery() @@ -49,7 +50,7 @@ def create_device_stats(session=None, date=None): func.max(sq.c.altitude) .over(partition_by=sq.c.device_id) .label('max_altitude'), - func.count(sq.c.id) + func.count(sq.c.device_id) .over(partition_by=sq.c.device_id) .label('aircraft_beacon_count'), func.first_value(sq.c.timestamp) @@ -98,6 +99,8 @@ def create_receiver_stats(session=None, date=None): if not date: logger.warn("A date is needed for calculating stats. Exiting") return None + else: + (start, end) = date_to_timestamps(date) # First kill the stats for the selected date deleted_counter = session.query(ReceiverStats) \ @@ -106,7 +109,7 @@ def create_receiver_stats(session=None, date=None): # Select one day sq = session.query(ReceiverBeacon) \ - .filter(func.date(ReceiverBeacon.timestamp) == date) \ + .filter(between(ReceiverBeacon.timestamp, start, end)) \ .subquery() # Calculate stats, firstseen, lastseen and last values != NULL @@ -132,7 +135,7 @@ def create_receiver_stats(session=None, date=None): .over(partition_by=sq.c.receiver_id, order_by=case([(sq.c.platform == null(), None)], else_=sq.c.timestamp).desc().nullslast()) .label('platform')) \ .subquery() - + # And insert them ins = insert(ReceiverStats).from_select( [ReceiverStats.receiver_id, ReceiverStats.date, ReceiverStats.firstseen, ReceiverStats.lastseen, ReceiverStats.location_wkt, ReceiverStats.altitude, ReceiverStats.version, ReceiverStats.platform], @@ -145,27 +148,27 @@ def create_receiver_stats(session=None, date=None): # Update aircraft_beacon_count, aircraft_count and max_distance (without any error and max quality of 36dB@10km which is enough for 640km ... ) aircraft_beacon_stats = session.query(func.date(AircraftBeacon.timestamp).label('date'), AircraftBeacon.receiver_id, - func.count(AircraftBeacon.id).label('aircraft_beacon_count'), + func.count(AircraftBeacon.timestamp).label('aircraft_beacon_count'), func.count(func.distinct(AircraftBeacon.device_id)).label('aircraft_count'), func.max(AircraftBeacon.distance).label('max_distance')) \ - .filter(and_(func.date(AircraftBeacon.timestamp) == date, + .filter(and_(between(AircraftBeacon.timestamp, start, end), AircraftBeacon.error_count == 0, AircraftBeacon.quality <= 40)) \ .group_by(func.date(AircraftBeacon.timestamp), AircraftBeacon.receiver_id) \ .subquery() - + upd = update(ReceiverStats) \ .where(and_(ReceiverStats.date == aircraft_beacon_stats.c.date, ReceiverStats.receiver_id == aircraft_beacon_stats.c.receiver_id)) \ .values({'aircraft_beacon_count': aircraft_beacon_stats.c.aircraft_beacon_count, 'aircraft_count': aircraft_beacon_stats.c.aircraft_count, 'max_distance': aircraft_beacon_stats.c.max_distance}) - + result = session.execute(upd) update_counter = result.rowcount session.commit() - logger.warn("Updated {} ReceiverStats".format(update_counter)) + logger.warn("Updated {} ReceiverStats".format(update_counter)) return "ReceiverStats for {}: {} deleted, {} inserted, {} updated".format(date, deleted_counter, insert_counter, update_counter) @@ -178,8 +181,10 @@ def update_device_stats_jumps(session=None, date=None): session = app.session if not date: - logger.warn("A date is needed for calculating device stats jumps. Exiting") + logger.warn("A date is needed for calculating stats. Exiting") return None + else: + (start, end) = date_to_timestamps(date) sq = session.query(AircraftBeacon.device_id, AircraftBeacon.timestamp.label('t0'), @@ -188,10 +193,10 @@ def update_device_stats_jumps(session=None, date=None): func.lead(AircraftBeacon.location_wkt).over(partition_by=AircraftBeacon.device_id, order_by=AircraftBeacon.timestamp).label('l1'), AircraftBeacon.altitude.label('a0'), func.lead(AircraftBeacon.altitude).over(partition_by=AircraftBeacon.device_id, order_by=AircraftBeacon.timestamp).label('a1')) \ - .filter(and_(func.date(AircraftBeacon.timestamp) == date, + .filter(and_(between(AircraftBeacon.timestamp, start, end), AircraftBeacon.error_count == 0)) \ .subquery() - + sq2 = session.query(sq.c.device_id, (func.st_distancesphere(sq.c.l1, sq.c.l0) / (func.extract('epoch', sq.c.t1) - func.extract('epoch', sq.c.t0))).label('horizontal_speed'), ((sq.c.a1 - sq.c.a0) / (func.extract('epoch', sq.c.t1) - func.extract('epoch', sq.c.t0))).label('vertical_speed')) \ @@ -199,22 +204,22 @@ def update_device_stats_jumps(session=None, date=None): sq.c.t1 != null(), sq.c.t0 < sq.c.t1)) \ .subquery() - + sq3 = session.query(sq2.c.device_id, case([(or_(func.abs(sq2.c.horizontal_speed) > 1000, func.abs(sq2.c.vertical_speed) > 100), 1)], else_=0).label('jump')) \ .subquery() - + sq4 = session.query(sq3.c.device_id, func.sum(sq3.c.jump).label('jumps')) \ .group_by(sq3.c.device_id) \ .subquery() - + upd = update(DeviceStats) \ .where(and_(DeviceStats.date == date, DeviceStats.device_id == sq4.c.device_id)) \ .values({'ambiguous': sq4.c.jumps > 10, 'jumps': sq4.c.jumps}) - + result = session.execute(upd) update_counter = result.rowcount session.commit() @@ -222,6 +227,7 @@ def update_device_stats_jumps(session=None, date=None): return "DeviceStats jumps for {}: {} updated".format(date, update_counter) + @app.task def create_relation_stats(session=None, date=None): """Add/update relation stats.""" @@ -232,6 +238,8 @@ def create_relation_stats(session=None, date=None): if not date: logger.warn("A date is needed for calculating stats. Exiting") return None + else: + (start, end) = date_to_timestamps(date) # First kill the stats for the selected date deleted_counter = session.query(RelationStats) \ @@ -244,9 +252,9 @@ def create_relation_stats(session=None, date=None): AircraftBeacon.device_id, AircraftBeacon.receiver_id, func.max(AircraftBeacon.quality), - func.count(AircraftBeacon.id) - ) \ - .filter(and_(func.date(AircraftBeacon.timestamp) == date, + func.count(AircraftBeacon.timestamp) + ) \ + .filter(and_(between(AircraftBeacon.timestamp, start, end), AircraftBeacon.distance > 1000, AircraftBeacon.error_count == 0, AircraftBeacon.quality <= 40, @@ -265,104 +273,105 @@ def create_relation_stats(session=None, date=None): return "RelationStats for {}: {} deleted, {} inserted".format(date, deleted_counter, insert_counter) + @app.task def update_qualities(session=None, date=None): """Calculate relative qualities of receivers and devices.""" - + if session is None: session = app.session - + if not date: - logger.warn("A date is needed for update stats. Exiting") + logger.warn("A date is needed for calculating stats. Exiting") return None # Calculate avg quality of devices dev_sq = session.query(RelationStats.date, - RelationStats.device_id, - func.avg(RelationStats.quality).label('quality')) \ + RelationStats.device_id, + func.avg(RelationStats.quality).label('quality')) \ .filter(RelationStats.date == date) \ .group_by(RelationStats.date, RelationStats.device_id) \ .subquery() - + dev_upd = update(DeviceStats) \ .where(and_(DeviceStats.date == dev_sq.c.date, DeviceStats.device_id == dev_sq.c.device_id)) \ .values({'quality': dev_sq.c.quality}) - + dev_result = session.execute(dev_upd) dev_update_counter = dev_result.rowcount session.commit() - logger.warn("Updated {} DeviceStats: quality".format(dev_update_counter)) + logger.warn("Updated {} DeviceStats: quality".format(dev_update_counter)) # Calculate avg quality of receivers rec_sq = session.query(RelationStats.date, - RelationStats.receiver_id, - func.avg(RelationStats.quality).label('quality')) \ + RelationStats.receiver_id, + func.avg(RelationStats.quality).label('quality')) \ .filter(RelationStats.date == date) \ .group_by(RelationStats.date, RelationStats.receiver_id) \ .subquery() - + rec_upd = update(ReceiverStats) \ .where(and_(ReceiverStats.date == rec_sq.c.date, ReceiverStats.receiver_id == rec_sq.c.receiver_id)) \ .values({'quality': rec_sq.c.quality}) - + rec_result = session.execute(rec_upd) rec_update_counter = rec_result.rowcount session.commit() - logger.warn("Updated {} ReceiverStats: quality".format(rec_update_counter)) - + logger.warn("Updated {} ReceiverStats: quality".format(rec_update_counter)) + # Calculate quality_offset of devices dev_sq = session.query(RelationStats.date, RelationStats.device_id, - (func.sum(RelationStats.beacon_count*(RelationStats.quality - ReceiverStats.quality))/(func.sum(RelationStats.beacon_count))).label('quality_offset')) \ + (func.sum(RelationStats.beacon_count * (RelationStats.quality - ReceiverStats.quality)) / (func.sum(RelationStats.beacon_count))).label('quality_offset')) \ .filter(RelationStats.date == date) \ .filter(and_(RelationStats.receiver_id == ReceiverStats.receiver_id, RelationStats.date == ReceiverStats.date)) \ .group_by(RelationStats.date, RelationStats.device_id) \ .subquery() - + dev_upd = update(DeviceStats) \ .where(and_(DeviceStats.date == dev_sq.c.date, DeviceStats.device_id == dev_sq.c.device_id)) \ .values({'quality_offset': dev_sq.c.quality_offset}) - + dev_result = session.execute(dev_upd) dev_update_counter = dev_result.rowcount session.commit() - logger.warn("Updated {} DeviceStats: quality_offset".format(dev_update_counter)) - + logger.warn("Updated {} DeviceStats: quality_offset".format(dev_update_counter)) + # Calculate quality_offset of receivers rec_sq = session.query(RelationStats.date, RelationStats.receiver_id, - (func.sum(RelationStats.beacon_count*(RelationStats.quality - DeviceStats.quality))/(func.sum(RelationStats.beacon_count))).label('quality_offset')) \ + (func.sum(RelationStats.beacon_count * (RelationStats.quality - DeviceStats.quality)) / (func.sum(RelationStats.beacon_count))).label('quality_offset')) \ .filter(RelationStats.date == date) \ .filter(and_(RelationStats.device_id == DeviceStats.device_id, RelationStats.date == DeviceStats.date)) \ .group_by(RelationStats.date, RelationStats.receiver_id) \ .subquery() - + rec_upd = update(ReceiverStats) \ .where(and_(ReceiverStats.date == rec_sq.c.date, ReceiverStats.receiver_id == rec_sq.c.receiver_id)) \ .values({'quality_offset': rec_sq.c.quality_offset}) - + rec_result = session.execute(rec_upd) rec_update_counter = rec_result.rowcount session.commit() - logger.warn("Updated {} ReceiverStats: quality_offset".format(rec_update_counter)) - + logger.warn("Updated {} ReceiverStats: quality_offset".format(rec_update_counter)) + return "Updated {} DeviceStats and {} ReceiverStats".format(dev_update_counter, rec_update_counter) @app.task -def update_receivers_stats(session=None): +def update_receivers(session=None): """Update receivers with stats.""" - + if session is None: session = app.session @@ -388,7 +397,7 @@ def update_receivers_stats(session=None): .label('platform')) \ .order_by(ReceiverStats.receiver_id) \ .subquery() - + upd = update(Receiver) \ .where(and_(Receiver.id == receiver_stats.c.receiver_id)) \ .values({'firstseen': receiver_stats.c.firstseen, @@ -397,18 +406,19 @@ def update_receivers_stats(session=None): 'altitude': receiver_stats.c.altitude, 'version': receiver_stats.c.version, 'platform': receiver_stats.c.platform}) - + result = session.execute(upd) update_counter = result.rowcount session.commit() - logger.warn("Updated {} Receivers".format(update_counter)) - + logger.warn("Updated {} Receivers".format(update_counter)) + return "Updated {} Receivers".format(update_counter) + @app.task -def update_devices_stats(session=None): +def update_devices(session=None): """Update devices with stats.""" - + if session is None: session = app.session @@ -437,7 +447,7 @@ def update_devices_stats(session=None): .label('real_address')) \ .order_by(DeviceStats.device_id) \ .subquery() - + upd = update(Device) \ .where(and_(Device.id == device_stats.c.device_id)) \ .values({'firstseen': device_stats.c.firstseen, @@ -447,10 +457,10 @@ def update_devices_stats(session=None): 'software_version': device_stats.c.software_version, 'hardware_version': device_stats.c.hardware_version, 'real_address': device_stats.c.real_address}) - + result = session.execute(upd) update_counter = result.rowcount session.commit() - logger.warn("Updated {} Devices".format(update_counter)) - + logger.warn("Updated {} Devices".format(update_counter)) + return "Updated {} Devices".format(update_counter) diff --git a/ogn/collect/takeoff_landings.py b/ogn/collect/takeoff_landings.py index 11d2bd2..36e20ed 100644 --- a/ogn/collect/takeoff_landings.py +++ b/ogn/collect/takeoff_landings.py @@ -1,13 +1,14 @@ -from datetime import datetime, timedelta +from datetime import timedelta from celery.utils.log import get_task_logger -from sqlalchemy import and_, or_, insert, update, between, exists +from sqlalchemy import and_, or_, insert, between, exists from sqlalchemy.sql import func, null from sqlalchemy.sql.expression import case from ogn.collect.celery import app from ogn.model import AircraftBeacon, TakeoffLanding, Airport +from ogn.utils import date_to_timestamps logger = get_task_logger(__name__) @@ -15,7 +16,7 @@ logger = get_task_logger(__name__) @app.task def update_takeoff_landings(session=None, date=None): """Compute takeoffs and landings.""" - + logger.info("Compute takeoffs and landings.") if session is None: @@ -27,104 +28,100 @@ def update_takeoff_landings(session=None, date=None): logger.warn("Cannot calculate takeoff and landings without any airport! Please import airports first.") return - # takeoff / landing detection is based on 3 consecutive points + # takeoff / landing detection is based on 3 consecutive points all below a certain altitude AGL takeoff_speed = 55 # takeoff detection: 1st point below, 2nd and 3rd above this limit landing_speed = 40 # landing detection: 1st point above, 2nd and 3rd below this limit duration = 100 # the points must not exceed this duration radius = 5000 # the points must not exceed this radius around the 2nd point + max_agl = 100 # takeoff / landing must not exceed this altitude AGL - # takeoff / landing has to be near an airport - airport_radius = 2500 # takeoff / landing must not exceed this radius around the airport - airport_delta = 100 # takeoff / landing must not exceed this altitude offset above/below the airport + # calculate from - to timestamps + (start, end) = date_to_timestamps(date) - # 'wo' is the window order for the sql window function - wo = and_(func.date(AircraftBeacon.timestamp), - AircraftBeacon.device_id, - AircraftBeacon.timestamp) - - # get beacons for selected day and filter out duplicates (e.g. from multiple receivers) - sq = session.query(AircraftBeacon.id, - func.row_number().over(partition_by=(func.date(AircraftBeacon.timestamp), - AircraftBeacon.device_id, - AircraftBeacon.timestamp), - order_by=AircraftBeacon.error_count).label('row')) \ - .filter(func.date(AircraftBeacon.timestamp) == date) \ + # get beacons for selected day, one per device_id and timestamp + sq = session.query(AircraftBeacon) \ + .distinct(AircraftBeacon.device_id, AircraftBeacon.timestamp) \ + .order_by(AircraftBeacon.device_id, AircraftBeacon.timestamp, AircraftBeacon.error_count) \ + .filter(AircraftBeacon.agl < max_agl) \ + .filter(between(AircraftBeacon.timestamp, start, end)) \ .subquery() - - sq2 = session.query(sq.c.id) \ - .filter(sq.c.row == 1) \ - .subquery() - + # make a query with current, previous and next position - sq3 = session.query( - AircraftBeacon.device_id, - func.lag(AircraftBeacon.device_id).over(order_by=wo).label('device_id_prev'), - func.lead(AircraftBeacon.device_id).over(order_by=wo).label('device_id_next'), - AircraftBeacon.timestamp, - func.lag(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_prev'), - func.lead(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_next'), - AircraftBeacon.location_wkt, - func.lag(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_prev'), - func.lead(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_next'), - AircraftBeacon.track, - func.lag(AircraftBeacon.track).over(order_by=wo).label('track_prev'), - func.lead(AircraftBeacon.track).over(order_by=wo).label('track_next'), - AircraftBeacon.ground_speed, - func.lag(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_prev'), - func.lead(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_next'), - AircraftBeacon.altitude, - func.lag(AircraftBeacon.altitude).over(order_by=wo).label('altitude_prev'), - func.lead(AircraftBeacon.altitude).over(order_by=wo).label('altitude_next')) \ - .filter(AircraftBeacon.id == sq2.c.id) \ + sq2 = session.query( + sq.c.device_id, + func.lag(sq.c.device_id).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('device_id_prev'), + func.lead(sq.c.device_id).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('device_id_next'), + sq.c.timestamp, + func.lag(sq.c.timestamp).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('timestamp_prev'), + func.lead(sq.c.timestamp).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('timestamp_next'), + sq.c.location, + func.lag(sq.c.location).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('location_wkt_prev'), + func.lead(sq.c.location).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('location_wkt_next'), + sq.c.track, + func.lag(sq.c.track).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('track_prev'), + func.lead(sq.c.track).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('track_next'), + sq.c.ground_speed, + func.lag(sq.c.ground_speed).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('ground_speed_prev'), + func.lead(sq.c.ground_speed).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('ground_speed_next'), + sq.c.altitude, + func.lag(sq.c.altitude).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('altitude_prev'), + func.lead(sq.c.altitude).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('altitude_next')) \ .subquery() - - # consider only positions with the same device id - sq4 = session.query(sq3) \ - .filter(sq3.c.device_id_prev == sq3.c.device_id == sq3.c.device_id_next) \ - .subquery() - + + # consider only positions with predecessor and successor and limit distance and duration between points + sq3 = session.query(sq2) \ + .filter(and_(sq2.c.device_id_prev != null(), + sq2.c.device_id_next != null())) \ + .filter(and_(func.ST_DistanceSphere(sq2.c.location, sq2.c.location_wkt_prev) < radius, + func.ST_DistanceSphere(sq2.c.location, sq2.c.location_wkt_next) < radius)) \ + .filter(sq2.c.timestamp_next - sq2.c.timestamp_prev < timedelta(seconds=duration)) \ + .subquery() + # find possible takeoffs and landings + sq4 = session.query( + sq3.c.timestamp, + case([(sq3.c.ground_speed > takeoff_speed, sq3.c.location_wkt_prev), # on takeoff we take the location from the previous fix because it is nearer to the airport + (sq3.c.ground_speed <= takeoff_speed, sq3.c.location)]).label('location'), + case([(sq3.c.ground_speed > landing_speed, sq3.c.track), + (sq3.c.ground_speed <= landing_speed, sq3.c.track_prev)]).label('track'), # on landing we take the track from the previous fix because gliders tend to leave the runway quickly + sq3.c.ground_speed, + sq3.c.altitude, + case([(sq3.c.ground_speed > takeoff_speed, True), + (sq3.c.ground_speed < landing_speed, False)]).label('is_takeoff'), + sq3.c.device_id) \ + .filter(or_(and_(sq3.c.ground_speed_prev < takeoff_speed, # takeoff + sq3.c.ground_speed > takeoff_speed, + sq3.c.ground_speed_next > takeoff_speed), + and_(sq3.c.ground_speed_prev > landing_speed, # landing + sq3.c.ground_speed < landing_speed, + sq3.c.ground_speed_next < landing_speed))) \ + .subquery() + + # consider them if the are near airports ... sq5 = session.query( sq4.c.timestamp, - case([(sq4.c.ground_speed > takeoff_speed, sq4.c.location_wkt_prev), # on takeoff we take the location from the previous fix because it is nearer to the airport - (sq4.c.ground_speed < landing_speed, sq4.c.location)]).label('location'), - case([(sq4.c.ground_speed > takeoff_speed, sq4.c.track), - (sq4.c.ground_speed < landing_speed, sq4.c.track_prev)]).label('track'), # on landing we take the track from the previous fix because gliders tend to leave the runway quickly - sq4.c.ground_speed, - sq4.c.altitude, - case([(sq4.c.ground_speed > takeoff_speed, True), - (sq4.c.ground_speed < landing_speed, False)]).label('is_takeoff'), - sq4.c.device_id) \ - .filter(sq4.c.timestamp_next - sq4.c.timestamp_prev < timedelta(seconds=duration)) \ - .filter(and_(func.ST_DistanceSphere(sq4.c.location, sq4.c.location_wkt_prev) < radius, - func.ST_DistanceSphere(sq4.c.location, sq4.c.location_wkt_next) < radius)) \ - .filter(or_(and_(sq4.c.ground_speed_prev < takeoff_speed, # takeoff - sq4.c.ground_speed > takeoff_speed, - sq4.c.ground_speed_next > takeoff_speed), - and_(sq4.c.ground_speed_prev > landing_speed, # landing - sq4.c.ground_speed < landing_speed, - sq4.c.ground_speed_next < landing_speed))) \ + sq4.c.track, + sq4.c.is_takeoff, + sq4.c.device_id, + Airport.id.label('airport_id'), + func.ST_DistanceSphere(sq4.c.location, Airport.location_wkt).label('airport_distance')) \ + .filter(and_(func.ST_Within(sq4.c.location, Airport.border), + between(Airport.style, 2, 5))) \ .subquery() - - # consider them if they are near a airport - sq6 = session.query( - sq5.c.timestamp, - sq5.c.track, - sq5.c.is_takeoff, - sq5.c.device_id, - Airport.id.label('airport_id')) \ - .filter(and_(func.ST_DistanceSphere(sq5.c.location, Airport.location_wkt) < airport_radius, - between(sq5.c.altitude, Airport.altitude - airport_delta, Airport.altitude + airport_delta))) \ - .filter(between(Airport.style, 2, 5)) \ + + # ... and take the nearest airport + sq6 = session.query(sq5.c.timestamp, sq5.c.track, sq5.c.is_takeoff, sq5.c.device_id, sq5.c.airport_id) \ + .distinct(sq5.c.timestamp, sq5.c.track, sq5.c.is_takeoff, sq5.c.device_id, sq5.c.airport_id) \ + .order_by(sq5.c.timestamp, sq5.c.track, sq5.c.is_takeoff, sq5.c.device_id, sq5.c.airport_id, sq5.c.airport_distance) \ .subquery() - + # consider them only if they are not already existing in db takeoff_landing_query = session.query(sq6) \ .filter(~exists().where( and_(TakeoffLanding.timestamp == sq6.c.timestamp, TakeoffLanding.device_id == sq6.c.device_id, TakeoffLanding.airport_id == sq6.c.airport_id))) - + # ... and save them ins = insert(TakeoffLanding).from_select((TakeoffLanding.timestamp, TakeoffLanding.track, diff --git a/ogn/commands/bulkimport.py b/ogn/commands/bulkimport.py index ee2ec09..088d313 100644 --- a/ogn/commands/bulkimport.py +++ b/ogn/commands/bulkimport.py @@ -6,15 +6,17 @@ from io import StringIO from ogn.model import AircraftBeacon, ReceiverBeacon from ogn.utils import open_file +from ogn.commands.database import get_database_days manager = Manager() + class LogfileDbSaver(): def __init__(self): """Establish the database connection.""" try: - self.conn = psycopg2.connect(database = "ogn", user = "postgres", password = "postgres", host = "localhost", port = "5432") - except: + self.conn = psycopg2.connect(database="ogn", user="postgres", password="postgres", host="localhost", port="5432") + except Exception as e: raise Exception("I am unable to connect to the database") self.cur = self.conn.cursor() @@ -41,10 +43,12 @@ class LogfileDbSaver(): index_clause = " AND hasindexes = FALSE" if no_index_only == True else "" - self.cur.execute(("SELECT DISTINCT(RIGHT(tablename, 10))" - " FROM pg_catalog.pg_tables" - " WHERE schemaname = 'public' AND tablename LIKE 'aircraft_beacons_%'{}" - " ORDER BY RIGHT(tablename, 10);".format(index_clause))) + self.cur.execute((""" + SELECT DISTINCT(RIGHT(tablename, 10)) + FROM pg_catalog.pg_tables + WHERE schemaname = 'public' AND tablename LIKE 'aircraft_beacons_%'{} + ORDER BY RIGHT(tablename, 10); + """.format(index_clause))) return [datestr[0].replace('_', '-') for datestr in self.cur.fetchall()] @@ -124,7 +128,7 @@ class LogfileDbSaver(): receiver_id int); """.format(self.receiver_table)) self.conn.commit() - except: + except Exception as e: raise Exception("I can't create the tables") def add(self, beacon): @@ -157,10 +161,11 @@ class LogfileDbSaver(): self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_receiver_beacons_subquery()), gzip_file) def create_indices(self): - """Creates indices for aircraft- and receiver-beacons. We need them for the beacon merging operation.""" + """Creates indices for aircraft- and receiver-beacons.""" self.cur.execute(""" CREATE INDEX IF NOT EXISTS ix_{0}_timestamp_name_receiver_name ON "{0}" (timestamp, name, receiver_name); + CREATE INDEX IF NOT EXISTS ix_{0}_device_id_timestamp_error_count ON "{0}" (device_id, timestamp, error_count); CREATE INDEX IF NOT EXISTS ix_{1}_timestamp_name_receiver_name ON "{1}" (timestamp, name, receiver_name); """.format(self.aircraft_table, self.receiver_table)) self.conn.commit() @@ -194,14 +199,15 @@ class LogfileDbSaver(): self.cur.execute(""" UPDATE receivers AS r - SET location = sq.location + SET location = sq.location, + altitude = sq.altitude FROM - (SELECT rb.receiver_id, - ROW_NUMBER() OVER (PARTITION BY receiver_id), - FIRST_VALUE(rb.location) OVER (PARTITION BY receiver_id ORDER BY CASE WHEN location IS NOT NULL THEN timestamp ELSE NULL END NULLS LAST) AS location + (SELECT DISTINCT ON (rb.receiver_id) rb.receiver_id, rb.location, rb.altitude FROM "{1}" AS rb + WHERE rb.location IS NOT NULL + ORDER BY rb.receiver_id, rb.timestamp ) AS sq - WHERE r.id = sq.receiver_id AND sq.row_number = 1; + WHERE r.id = sq.receiver_id; """.format(self.aircraft_table, self.receiver_table)) self.conn.commit() @@ -250,7 +256,7 @@ class LogfileDbSaver(): ELSE NULL END AS quality, CAST(ab.altitude - ST_Value(e.rast, ab.location) AS REAL) AS agl - + INTO "{0}_temp" FROM "{0}" AS ab, devices AS d, receivers AS r, elevation AS e WHERE ab.address = d.address AND receiver_name = r.name AND ST_Intersects(e.rast, ab.location); @@ -352,7 +358,7 @@ class LogfileDbSaver(): self.cur.execute(query) return len(self.cur.fetchall()) == 1 - def transfer(self): + def transfer_aircraft_beacons(self): query = """ INSERT INTO aircraft_beacons(location, altitude, name, dstcall, relay, receiver_name, timestamp, track, ground_speed, address_type, aircraft_type, stealth, address, climb_rate, turn_rate, signal_quality, error_count, frequency_offset, gps_quality_horizontal, gps_quality_vertical, software_version, hardware_version, real_address, signal_power, @@ -365,6 +371,22 @@ class LogfileDbSaver(): self.cur.execute(query) self.conn.commit() + def transfer_receiver_beacons(self): + query = """ + INSERT INTO receiver_beacons(location, altitude, name, receiver_name, dstcall, timestamp, + + version, platform, cpu_load, free_ram, total_ram, ntp_error, rt_crystal_correction, voltage, + amperage, cpu_temp, senders_visible, senders_total, rec_input_noise, senders_signal, + senders_messages, good_senders_signal, good_senders, good_and_bad_senders, + + receiver_id) + {} + ON CONFLICT DO NOTHING; + """.format(self.get_merged_receiver_beacons_subquery()) + + self.cur.execute(query) + self.conn.commit() + def create_flights2d(self): query = """ INSERT INTO flights2d @@ -396,20 +418,17 @@ class LogfileDbSaver(): ELSE 1 END AS ping FROM ( - SELECT sq.timestamp t1, - lag(sq.timestamp) OVER (partition BY sq.timestamp::date, sq.device_id ORDER BY sq.timestamp) t2, - sq.location l1, - lag(sq.location) OVER (partition BY sq.timestamp::date, sq.device_id ORDER BY sq.timestamp) l2, - sq.device_id d1, - lag(sq.device_id) OVER (partition BY sq.timestamp::date, sq.device_id ORDER BY sq.timestamp) d2 + SELECT sq.timestamp t1, + lag(sq.timestamp) OVER (partition BY sq.device_id ORDER BY sq.timestamp) t2, + sq.location l1, + lag(sq.location) OVER (partition BY sq.device_id ORDER BY sq.timestamp) l2, + sq.device_id d1, + lag(sq.device_id) OVER (partition BY sq.device_id ORDER BY sq.timestamp) d2 FROM ( - SELECT timestamp, - device_id, - location, - row_number() OVER (partition BY timestamp::date, device_id, timestamp ORDER BY error_count) message_number + SELECT DISTINCT ON (device_id, timestamp) timestamp, device_id, location FROM {} - WHERE device_id IS NOT NULL) sq - WHERE sq.message_number = 1 ) sq2 ) sq3 ) sq4 + WHERE device_id IS NOT NULL AND ground_speed > 250 AND agl < 100 + ORDER BY device_id, timestamp, error_count) sq) sq2 ) sq3 ) sq4 GROUP BY sq4.timestamp::date, sq4.device_id, sq4.part ) sq5 @@ -441,15 +460,14 @@ class LogfileDbSaver(): sq.device_id d1, LAG(sq.device_id) OVER ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) d2, sq.agl a1, - LAG(sq.agl) over ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) a2 + LAG(sq.agl) over ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) a2 FROM ( - SELECT timestamp, device_id, location, agl, - Row_number() OVER ( PARTITION BY timestamp::DATE, device_id, timestamp ORDER BY error_count) message_number + SELECT DISTINCT ON (device_id, timestamp) timestamp, device_id, location, agl FROM {} - ) sq - WHERE sq.message_number = 1 - ) sq2 + ORDER BY device_id, timestamp, error_count + ) sq + ) sq2 WHERE EXTRACT(epoch FROM sq2.t1 - sq2.t2) > 300 AND ST_DistanceSphere(sq2.l1, sq2.l2) / EXTRACT(epoch FROM sq2.t1 - sq2.t2) BETWEEN 15 AND 50 AND sq2.a1 > 300 AND sq2.a2 > 300 @@ -461,6 +479,7 @@ class LogfileDbSaver(): self.cur.execute(query) self.conn.commit() + def convert(sourcefile, datestr, saver): from ogn.gateway.process import string_to_message from ogn.gateway.process_tools import AIRCRAFT_TYPES, RECEIVER_TYPES @@ -490,7 +509,7 @@ def convert(sourcefile, datestr, saver): if message is None: continue - dictfilt = lambda x, y: dict([ (i,x[i]) for i in x if i in set(y) ]) + dictfilt = lambda x, y: dict([(i, x[i]) for i in x if i in set(y)]) try: if message['beacon_type'] in AIRCRAFT_TYPES: @@ -562,18 +581,25 @@ def update(): saver.update_receiver_location() saver.create_indices() + @manager.command -def transfer(): +def transfer(start=None, end=None): """Transfer beacons from separate logfile tables to beacon table.""" with LogfileDbSaver() as saver: - datestrs = saver.get_datestrs() + if start is not None and end is not None: + dates = get_database_days(start, end) + datestrs = [date.strftime('%Y_%m_%d') for date in dates] + else: + datestrs = saver.get_datestrs() pbar = tqdm(datestrs) for datestr in pbar: pbar.set_description("Transfer beacons for {}".format(datestr)) saver.set_datestr(datestr) if not saver.is_transfered(): - saver.transfer() + saver.transfer_aircraft_beacons() + saver.transfer_receiver_beacons() + @manager.command def create_flights2d(): @@ -587,6 +613,7 @@ def create_flights2d(): saver.set_datestr(datestr) saver.create_flights2d() + @manager.command def create_gaps2d(): """Create 'gaps' from logfile tables.""" @@ -599,6 +626,7 @@ def create_gaps2d(): saver.set_datestr(datestr) saver.create_gaps2d() + @manager.command def file_export(path): """Export separate logfile tables to csv files. They can be used for fast bulk import with sql COPY command.""" @@ -610,6 +638,7 @@ def file_export(path): with LogfileDbSaver() as saver: datestrs = saver.get_datestrs() + datestrs = filter(lambda x: x.startswith('2018-12'), datestrs) pbar = tqdm(datestrs) for datestr in pbar: pbar.set_description("Exporting data for {}".format(datestr)) diff --git a/ogn/commands/database.py b/ogn/commands/database.py index 4cb9aab..28220bb 100644 --- a/ogn/commands/database.py +++ b/ogn/commands/database.py @@ -1,17 +1,33 @@ +from datetime import datetime, timedelta + from manager import Manager from ogn.collect.database import update_device_infos, update_country_code from ogn.commands.dbutils import engine, session -from ogn.model import Base, DeviceInfoOrigin -from ogn.utils import get_airports -from sqlalchemy import distinct -from sqlalchemy.sql import null, func - +from ogn.model import Base, DeviceInfoOrigin, AircraftBeacon +from ogn.utils import get_airports, get_days +from sqlalchemy.sql import func manager = Manager() ALEMBIC_CONFIG_FILE = "alembic.ini" +def get_database_days(start, end): + """Returns the first and the last day in aircraft_beacons table.""" + + if start is None and end is None: + days_from_db = session.query(func.min(AircraftBeacon.timestamp).label('first_day'), func.max(AircraftBeacon.timestamp).label('last_day')).one() + start = days_from_db[0].date() + end = days_from_db[1].date() + else: + start = datetime.strptime(start, "%Y-%m-%d") + end = datetime.strptime(end, "%Y-%m-%d") + + days = get_days(start, end) + + return days + + @manager.command def init(): """Initialize the database.""" @@ -24,8 +40,8 @@ def init(): session.execute('CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;') session.commit() Base.metadata.create_all(engine) - session.execute("SELECT create_hypertable('aircraft_beacons', 'timestamp', chunk_target_size => '2GB');") - session.execute("SELECT create_hypertable('receiver_beacons', 'timestamp', chunk_target_size => '2GB');") + session.execute("SELECT create_hypertable('aircraft_beacons', 'timestamp', chunk_target_size => '2GB', if_not_exists => TRUE);") + session.execute("SELECT create_hypertable('receiver_beacons', 'timestamp', chunk_target_size => '2GB', if_not_exists => TRUE);") session.commit() #alembic_cfg = Config(ALEMBIC_CONFIG_FILE) #command.stamp(alembic_cfg, "head") @@ -92,11 +108,13 @@ def import_airports(path='tests/SeeYou.cup'): airports = get_airports(path) session.bulk_save_objects(airports) session.commit() + session.execute("UPDATE airports SET border = ST_Expand(location, 0.05)") + session.commit() print("Imported {} airports.".format(len(airports))) @manager.command def update_country_codes(): """Update country codes of all receivers.""" - - update_country_code(session=session) \ No newline at end of file + + update_country_code(session=session) diff --git a/ogn/commands/logbook.py b/ogn/commands/logbook.py index 7e27692..499a6c3 100644 --- a/ogn/commands/logbook.py +++ b/ogn/commands/logbook.py @@ -1,36 +1,42 @@ # -*- coding: utf-8 -*- -from datetime import timedelta, datetime, date +from datetime import datetime from manager import Manager from ogn.collect.logbook import update_logbook from ogn.collect.takeoff_landings import update_takeoff_landings from ogn.commands.dbutils import session -from ogn.model import Device, DeviceInfo, TakeoffLanding, Airport, Logbook -from sqlalchemy import and_, or_ -from sqlalchemy.orm import aliased +from ogn.model import Airport, Logbook +from sqlalchemy import or_ from sqlalchemy.sql import func - +from tqdm import tqdm +from ogn.commands.database import get_database_days manager = Manager() @manager.command -def compute_takeoff_landing(): +def compute_takeoff_landing(start=None, end=None): """Compute takeoffs and landings.""" - - for single_date in (date(2017, 6, 14) + timedelta(days=n) for n in range(800)): - print("Berechne für den {}".format(single_date.strftime("%Y-%m-%d"))) + + days = get_database_days(start, end) + + pbar = tqdm(days) + for single_date in pbar: + pbar.set_description(datetime.strftime(single_date, '%Y-%m-%d')) result = update_takeoff_landings(session=session, date=single_date) - print(result) - + + @manager.command -def compute_logbook(): +def compute_logbook(start=None, end=None): """Compute logbook.""" - print("Compute logbook...") - result = update_logbook(session=session)#.delay() - #counter = result.get() - #print("New logbook entries: {}".format(counter)) + + days = get_database_days(start, end) + + pbar = tqdm(days) + for single_date in pbar: + pbar.set_description(datetime.strftime(single_date, '%Y-%m-%d')) + result = update_logbook(session=session, date=single_date) @manager.arg('date', help='date (format: yyyy-mm-dd)') diff --git a/ogn/commands/stats.py b/ogn/commands/stats.py index ac6e775..825446b 100644 --- a/ogn/commands/stats.py +++ b/ogn/commands/stats.py @@ -1,43 +1,42 @@ from manager import Manager from ogn.commands.dbutils import session - -from datetime import date, timedelta +from ogn.commands.database import get_database_days from ogn.collect.stats import create_device_stats, create_receiver_stats, create_relation_stats,\ - update_qualities, update_receivers_stats, update_devices_stats,\ + update_qualities, update_receivers, update_devices,\ update_device_stats_jumps manager = Manager() + @manager.command -def create_stats(): +def create_stats(start=None, end=None): """Create DeviceStats, ReceiverStats and RelationStats.""" - - for single_date in (date(2016, 1, 1) + timedelta(days=n) for n in range(800)): + + days = get_database_days(start, end) + + for single_date in days: result = create_device_stats(session=session, date=single_date) print(result) - + result = update_device_stats_jumps(session=session, date=single_date) print(result) - + result = create_receiver_stats(session=session, date=single_date) print(result) - + result = create_relation_stats(session=session, date=single_date) print(result) - + result = update_qualities(session=session, date=single_date) print(result) - - #result = update_device_stats(session=session, date=date) - #print(result) - + @manager.command def add_missing_receivers(): """Update receivers with data from stats.""" - result = update_receivers_stats(session=session) + result = update_receivers(session=session) print(result) @@ -45,17 +44,18 @@ def add_missing_receivers(): def add_missing_devices(): """Update devices with data from stats.""" - result = update_devices_stats(session=session) + result = update_devices(session=session) print(result) @manager.command -def create_flights(): +def create_flights(start=None, end=None): """Create Flights.""" - for single_date in (date(2016, 8, 10) + timedelta(days=n) for n in range(800)): + days = get_database_days(start, end) + + for single_date in days: result = _create_flights2d(session=session, date=single_date) - #result = _create_flights3d(session=session, date=single_date) print(result) @@ -111,7 +111,7 @@ def _create_flights2d(session=None, date=None): sq5.device_id ON CONFLICT DO NOTHING; """ - + result = session.execute(SQL.format(date.strftime("%Y-%m-%d"))) insert_counter = result.rowcount session.commit() diff --git a/ogn/gateway/manage.py b/ogn/gateway/manage.py index 44b793c..813661e 100644 --- a/ogn/gateway/manage.py +++ b/ogn/gateway/manage.py @@ -2,7 +2,7 @@ import logging from manager import Manager from ogn.client import AprsClient -from ogn.gateway.process import string_to_message +from ogn.gateway.process import string_to_message from datetime import datetime from ogn.gateway.process_tools import DummyMerger, Converter, DbSaver from ogn.commands.dbutils import session @@ -17,6 +17,7 @@ saver = DbSaver(session=session) converter = Converter(callback=saver) merger = DummyMerger(callback=converter) + def asdf(raw_string): message = string_to_message(raw_string, reference_date=datetime.utcnow()) if message is not None: diff --git a/ogn/gateway/process.py b/ogn/gateway/process.py index 0b21410..8986efa 100644 --- a/ogn/gateway/process.py +++ b/ogn/gateway/process.py @@ -3,7 +3,7 @@ import logging from mgrs import MGRS from ogn.commands.dbutils import session -from ogn.model import AircraftBeacon, ReceiverBeacon, Location +from ogn.model import Location from ogn.parser import parse, ParseError from ogn.gateway.process_tools import DbSaver, Converter, DummyMerger, AIRCRAFT_TYPES, RECEIVER_TYPES diff --git a/ogn/gateway/process_tools.py b/ogn/gateway/process_tools.py index 6cbb1c2..c17c3a4 100644 --- a/ogn/gateway/process_tools.py +++ b/ogn/gateway/process_tools.py @@ -4,6 +4,7 @@ from ogn.model import AircraftBeacon, ReceiverBeacon AIRCRAFT_TYPES = ['aprs_aircraft', 'flarm', 'tracker', 'fanet', 'lt24', 'naviter', 'skylines', 'spider', 'spot'] RECEIVER_TYPES = ['aprs_receiver', 'receiver'] + class DummyMerger: def __init__(self, callback): self.callback = callback @@ -14,6 +15,7 @@ class DummyMerger: def flush(self): pass + class Merger: def __init__(self, callback, max_timedelta=None, max_lines=None): self.callback = callback @@ -27,8 +29,8 @@ class Merger: # release old messages if self.max_timedelta is not None: - for receiver,v1 in self.message_map.items(): - for name,v2 in v1.items(): + for receiver, v1 in self.message_map.items(): + for name, v2 in v1.items(): for timestamp,message in v2.items(): if message['timestamp'] - timestamp > self.max_timedelta: self.callback.add_message(message) @@ -37,30 +39,30 @@ class Merger: # release messages > max_lines if self.max_lines is not None: pass - + # merge messages with same timestamp receiver_name = message['receiver_name'] name = message['name'] timestamp = message['timestamp'] - + if receiver_name in self.message_map: if name in self.message_map[receiver_name]: timestamps = self.message_map[receiver_name][name] if timestamp in timestamps: other = timestamps[timestamp] - params1 = dict( [(k,v) for k,v in message.items() if v is not None]) - params2 = dict( [(k,v) for k,v in other.items() if v is not None]) + params1 = dict([(k, v) for k, v in message.items() if v is not None]) + params2 = dict([(k, v) for k, v in other.items() if v is not None]) merged = {**params1, **params2} - + # zum debuggen if 'raw_message' in message and 'raw_message' in other: merged['raw_message'] = '"{}","{}"'.format(message['raw_message'], other['raw_message']) - + self.callback.add_message(merged) del self.message_map[receiver_name][name][timestamp] else: self.message_map[receiver_name][name][timestamp] = message - + # release previous messages for ts in list(timestamps): if ts < timestamp: @@ -73,18 +75,19 @@ class Merger: self.message_map.update({receiver_name: {name: {timestamp: message}}}) def flush(self): - for receiver,v1 in self.message_map.items(): - for name,v2 in v1.items(): + for receiver, v1 in self.message_map.items(): + for name, v2 in v1.items(): for timestamp in v2: self.callback.add_message(self.message_map[receiver][name][timestamp]) self.callback.flush() self.message_map = dict() + class Converter: def __init__(self, callback): self.callback = callback - + def add_message(self, message): if message['aprs_type'] in ['status', 'position']: beacon = self.message_to_beacon(message) @@ -101,12 +104,13 @@ class Converter: beacon = ReceiverBeacon(**message) else: print("Whoops: what is this: {}".format(message)) - + return beacon - + def flush(self): self.callback.flush() + class DummySaver: def add_message(self, message): print(message) @@ -114,6 +118,7 @@ class DummySaver: def flush(self): print("========== flush ==========") + class DbSaver: def __init__(self, session): self.session = session @@ -141,8 +146,10 @@ class DbSaver: print(e) return + import os, gzip, csv + class FileSaver: def __init__(self): self.aircraft_messages = list() @@ -157,13 +164,13 @@ class FileSaver: self.fout_rb = gzip.open(receiver_beacon_filename, 'wt') else: raise FileExistsError - + self.aircraft_writer = csv.writer(self.fout_ab, delimiter=',') self.aircraft_writer.writerow(AircraftBeacon.get_columns()) self.receiver_writer = csv.writer(self.fout_rb, delimiter=',') self.receiver_writer.writerow(ReceiverBeacon.get_columns()) - + return 1 def add_message(self, beacon): @@ -177,7 +184,7 @@ class FileSaver: self.receiver_writer.writerows(self.receiver_messages) self.aircraft_messages = list() self.receiver_messages = list() - + def close(self): self.fout_ab.close() self.fout_rb.close() diff --git a/ogn/model/aircraft_beacon.py b/ogn/model/aircraft_beacon.py index 414e139..a52f2e7 100644 --- a/ogn/model/aircraft_beacon.py +++ b/ogn/model/aircraft_beacon.py @@ -77,10 +77,10 @@ class AircraftBeacon(Beacon): 'timestamp', 'track', 'ground_speed', - + #'raw_message', #'reference_timestamp', - + 'address_type', 'aircraft_type', 'stealth', @@ -96,7 +96,7 @@ class AircraftBeacon(Beacon): 'hardware_version', 'real_address', 'signal_power', - + 'distance', 'radial', 'quality', @@ -113,7 +113,7 @@ class AircraftBeacon(Beacon): self.timestamp, self.track, self.ground_speed, - + #self.raw_message, #self.reference_timestamp, @@ -140,4 +140,4 @@ class AircraftBeacon(Beacon): Index('ix_aircraft_beacons_date_device_id_address', func.date(AircraftBeacon.timestamp), AircraftBeacon.device_id, AircraftBeacon.address) -Index('ix_aircraft_beacons_date_receiver_id_distance', func.date(AircraftBeacon.timestamp), AircraftBeacon.receiver_id, AircraftBeacon.distance) \ No newline at end of file +Index('ix_aircraft_beacons_date_receiver_id_distance', func.date(AircraftBeacon.timestamp), AircraftBeacon.receiver_id, AircraftBeacon.distance) diff --git a/ogn/model/airport.py b/ogn/model/airport.py index b69d265..0dd305a 100644 --- a/ogn/model/airport.py +++ b/ogn/model/airport.py @@ -1,6 +1,5 @@ from geoalchemy2.types import Geometry from sqlalchemy import Column, String, Integer, Float, SmallInteger -from sqlalchemy.orm import relationship from .base import Base diff --git a/ogn/model/beacon.py b/ogn/model/beacon.py index 3392ae2..742ff38 100644 --- a/ogn/model/beacon.py +++ b/ogn/model/beacon.py @@ -1,6 +1,6 @@ from geoalchemy2.shape import to_shape from geoalchemy2.types import Geometry -from sqlalchemy import Column, String, Integer, SmallInteger, Float, DateTime, BigInteger +from sqlalchemy import Column, String, SmallInteger, Float, DateTime from sqlalchemy.ext.declarative import AbstractConcreteBase from .base import Base diff --git a/ogn/model/country.py b/ogn/model/country.py index 062f0e5..a786869 100644 --- a/ogn/model/country.py +++ b/ogn/model/country.py @@ -1,6 +1,5 @@ from geoalchemy2.types import Geometry from sqlalchemy import Column, String, Integer, Float, SmallInteger, BigInteger -from sqlalchemy.orm import relationship from .base import Base diff --git a/ogn/model/device.py b/ogn/model/device.py index 35c70e4..74d89b6 100644 --- a/ogn/model/device.py +++ b/ogn/model/device.py @@ -1,5 +1,4 @@ from sqlalchemy import Column, Integer, String, Float, Boolean, SmallInteger, DateTime -from sqlalchemy.orm import relationship from .base import Base diff --git a/ogn/model/flights2d.py b/ogn/model/flights2d.py index fd595e1..69b1cf7 100644 --- a/ogn/model/flights2d.py +++ b/ogn/model/flights2d.py @@ -1,5 +1,5 @@ from geoalchemy2.types import Geometry -from sqlalchemy import Column, String, Integer, Float, SmallInteger, Date, Index, ForeignKey +from sqlalchemy import Column, Integer, Date, Index, ForeignKey from sqlalchemy.orm import relationship from .base import Base @@ -22,5 +22,6 @@ class Flight2D(Base): self.date, self.path_wkt) + Index('ix_flights2d_date_device_id', Flight2D.date, Flight2D.device_id) -#Index('ix_flights2d_date_path', Flight2D.date, Flight2D.path_wkt) --> CREATE INDEX ix_flights2d_date_path ON flights2d USING GIST("date", path) \ No newline at end of file +#Index('ix_flights2d_date_path', Flight2D.date, Flight2D.path_wkt) --> CREATE INDEX ix_flights2d_date_path ON flights2d USING GIST("date", path) diff --git a/ogn/model/receiver_beacon.py b/ogn/model/receiver_beacon.py index 2cafda9..a342bb8 100644 --- a/ogn/model/receiver_beacon.py +++ b/ogn/model/receiver_beacon.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, Float, String, Integer, SmallInteger, ForeignKey, Index +from sqlalchemy import Column, Float, String, Integer, ForeignKey, Index from sqlalchemy.orm import relationship from sqlalchemy.sql import func from .beacon import Beacon @@ -70,10 +70,10 @@ class ReceiverBeacon(Beacon): 'dstcall', 'receiver_name', 'timestamp', - + # 'raw_message', # 'reference_timestamp', - + 'version', 'platform', 'cpu_load', @@ -101,7 +101,7 @@ class ReceiverBeacon(Beacon): self.dstcall, self.receiver_name, self.timestamp, - + # self.raw_message, # self.reference_timestamp, @@ -124,4 +124,5 @@ class ReceiverBeacon(Beacon): int(self.good_senders) if self.good_senders else None, int(self.good_and_bad_senders) if self.good_and_bad_senders else None] -Index('ix_receiver_beacons_date_receiver_id', func.date(ReceiverBeacon.timestamp), ReceiverBeacon.receiver_id) \ No newline at end of file + +Index('ix_receiver_beacons_date_receiver_id', func.date(ReceiverBeacon.timestamp), ReceiverBeacon.receiver_id) diff --git a/ogn/model/receiver_coverage.py b/ogn/model/receiver_coverage.py index 95449d0..b98c90e 100644 --- a/ogn/model/receiver_coverage.py +++ b/ogn/model/receiver_coverage.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, String, Integer, SmallInteger, Float, Date, ForeignKey, Index +from sqlalchemy import Column, String, Integer, SmallInteger, Float, Date, ForeignKey from sqlalchemy.orm import relationship, backref @@ -21,4 +21,4 @@ class ReceiverCoverage(Base): # Relations receiver_id = Column(Integer, ForeignKey('receivers.id', ondelete='SET NULL'), index=True) - receiver = relationship('Receiver', foreign_keys=[receiver_id], backref=backref('receiver_coverages', order_by='ReceiverCoverage.date.asc()')) \ No newline at end of file + receiver = relationship('Receiver', foreign_keys=[receiver_id], backref=backref('receiver_coverages', order_by='ReceiverCoverage.date.asc()')) diff --git a/ogn/model/relation_stats.py b/ogn/model/relation_stats.py index aeba950..df76be7 100644 --- a/ogn/model/relation_stats.py +++ b/ogn/model/relation_stats.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, Integer, Date, DateTime, Float, ForeignKey, SmallInteger, Boolean, String, Index +from sqlalchemy import Column, Integer, Date, Float, ForeignKey, Index from sqlalchemy.orm import relationship from .base import Base @@ -27,5 +27,6 @@ class RelationStats(Base): self.normalized_signal_quality, self.beacon_count) + Index('ix_relation_stats_date_device_id', RelationStats.date, RelationStats.device_id, RelationStats.receiver_id) -Index('ix_relation_stats_date_receiver_id', RelationStats.date, RelationStats.receiver_id, RelationStats.device_id) \ No newline at end of file +Index('ix_relation_stats_date_receiver_id', RelationStats.date, RelationStats.receiver_id, RelationStats.device_id) diff --git a/ogn/model/takeoff_landing.py b/ogn/model/takeoff_landing.py index 88ef14c..c66ca45 100644 --- a/ogn/model/takeoff_landing.py +++ b/ogn/model/takeoff_landing.py @@ -1,6 +1,5 @@ -from sqlalchemy import Boolean, Column, Integer, SmallInteger, DateTime, ForeignKey, Index +from sqlalchemy import Boolean, Column, Integer, SmallInteger, DateTime, ForeignKey from sqlalchemy.orm import relationship -from sqlalchemy.sql import func from .base import Base diff --git a/ogn/utils.py b/ogn/utils.py index 3a39b4c..bd6f7e6 100644 --- a/ogn/utils.py +++ b/ogn/utils.py @@ -1,13 +1,11 @@ import csv import gzip from io import StringIO +from datetime import datetime, timedelta from aerofiles.seeyou import Reader -from geopy.exc import GeopyError -from geopy.geocoders import Nominatim from ogn.parser.utils import FEETS_TO_METER import requests -from urllib.request import Request from .model import DeviceInfoOrigin, DeviceInfo, Airport, Location @@ -24,6 +22,17 @@ nm2m = 1852 mi2m = 1609.34 +def get_days(start, end): + days = [start + timedelta(days=x) for x in range(0, (end - start).days + 1)] + return days + + +def date_to_timestamps(date): + start = datetime(date.year, date.month, date.day, 0, 0, 0) + end = datetime(date.year, date.month, date.day, 23, 59, 59) + return (start, end) + + def get_ddb(csv_file=None, address_origin=DeviceInfoOrigin.unknown): if csv_file is None: r = requests.get(DDB_URL) @@ -74,11 +83,11 @@ def get_flarmnet(fln_file=None, address_origin=DeviceInfoOrigin.flarmnet): def get_trackable(ddb): - l = [] + result = [] for i in ddb: if i.tracked and i.address_type in address_prefixes: - l.append("{}{}".format(address_prefixes[i.address_type], i.address)) - return l + result.append("{}{}".format(address_prefixes[i.address_type], i.address)) + return result def get_airports(cupfile): diff --git a/tests/collect/test_database.py b/tests/collect/test_database.py index f625d67..b84a002 100644 --- a/tests/collect/test_database.py +++ b/tests/collect/test_database.py @@ -2,7 +2,7 @@ import unittest import os from ogn.model import AircraftBeacon, ReceiverBeacon, Device, Receiver, DeviceInfo -from ogn.collect.database import add_missing_devices, add_missing_receivers, import_ddb_file +from ogn.collect.database import add_missing_devices, add_missing_receivers, import_ddb class TestDB(unittest.TestCase): @@ -31,8 +31,8 @@ class TestDB(unittest.TestCase): def test_update_devices(self): session = self.session - ab01 = AircraftBeacon(receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:00') - rb01 = ReceiverBeacon(name='Bene', timestamp='2017-12-10 09:59:50') + ab01 = AircraftBeacon(name='FLRDD4711', receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:00') + rb01 = ReceiverBeacon(name='Bene', receiver_name='GLIDERN1', timestamp='2017-12-10 09:59:50') d01 = Device(address='DD4711') r01 = Receiver(name='Koenigsdf') session.bulk_save_objects([ab01, rb01, d01, r01]) @@ -54,7 +54,7 @@ class TestDB(unittest.TestCase): def test_import_ddb_file(self): session = self.session - import_ddb_file(session, path=os.path.dirname(__file__) + '/../custom_ddb.txt') + import_ddb(session, path=os.path.dirname(__file__) + '/../custom_ddb.txt') device_infos = session.query(DeviceInfo).all() self.assertEqual(len(device_infos), 6) diff --git a/tests/collect/test_stats.py b/tests/collect/test_stats.py index 7dc1ba1..1f8040e 100644 --- a/tests/collect/test_stats.py +++ b/tests/collect/test_stats.py @@ -1,10 +1,10 @@ import unittest import os -from datetime import datetime +from datetime import datetime, date from ogn.model import AircraftBeacon, ReceiverBeacon, Receiver, Device, DeviceStats -from ogn.collect.stats import update_device_stats +from ogn.collect.stats import update_devices class TestDB(unittest.TestCase): @@ -22,16 +22,16 @@ class TestDB(unittest.TestCase): init() # Prepare Beacons - self.ab01 = AircraftBeacon(timestamp='2017-12-10 10:00:01') - self.ab02 = AircraftBeacon(timestamp='2017-12-10 10:00:02') - self.ab03 = AircraftBeacon(timestamp='2017-12-10 10:00:03') - self.ab04 = AircraftBeacon(timestamp='2017-12-10 10:00:04') - self.ab05 = AircraftBeacon(timestamp='2017-12-10 10:00:05') - self.ab06 = AircraftBeacon(timestamp='2017-12-10 10:00:05') + self.ab01 = AircraftBeacon(name='FLRDD4711', receiver_name='Koenigsdf', timestamp='2017-12-10 10:00:01') + self.ab02 = AircraftBeacon(name='FLRDD4711', receiver_name='Koenigsdf', timestamp='2017-12-10 10:00:02') + self.ab03 = AircraftBeacon(name='FLRDD4711', receiver_name='Koenigsdf', timestamp='2017-12-10 10:00:03') + self.ab04 = AircraftBeacon(name='FLRDD4711', receiver_name='Koenigsdf', timestamp='2017-12-10 10:00:04') + self.ab05 = AircraftBeacon(name='FLRDD4711', receiver_name='Koenigsdf', timestamp='2017-12-10 10:00:05') + self.ab06 = AircraftBeacon(name='FLRDD4711', receiver_name='Koenigsdf', timestamp='2017-12-10 10:00:05') - self.rb01 = ReceiverBeacon(timestamp='2017-12-10 09:55:00', altitude=601, version='0.2.5', platform='ARM') - self.rb02 = ReceiverBeacon(timestamp='2017-12-10 10:00:00', altitude=601, version='0.2.7', platform='ARM') - self.rb03 = ReceiverBeacon(timestamp='2017-12-10 10:05:00', altitude=601, version='0.2.6', platform='ARM') + self.rb01 = ReceiverBeacon(name='Koenigsdf', receiver_name='GLIDERN1', timestamp='2017-12-10 09:55:00', altitude=601, version='0.2.5', platform='ARM') + self.rb02 = ReceiverBeacon(name='Koenigsdf', receiver_name='GLIDERN1', timestamp='2017-12-10 10:00:00', altitude=601, version='0.2.7', platform='ARM') + self.rb03 = ReceiverBeacon(name='Koenigsdf', receiver_name='GLIDERN1', timestamp='2017-12-10 10:05:00', altitude=601, version='0.2.6', platform='ARM') self.r01 = Receiver(name='Koenigsdf') self.r02 = Receiver(name='Bene') @@ -61,7 +61,7 @@ class TestDB(unittest.TestCase): session.add(self.ab01) session.commit() - update_device_stats(session, date='2017-12-10') + update_devices(session) devicestats = session.query(DeviceStats).all() self.assertEqual(len(devicestats), 1) @@ -88,7 +88,7 @@ class TestDB(unittest.TestCase): session.add(self.ab02) session.commit() - update_device_stats(session, date='2017-12-10') + update_devices(session, date='2017-12-10') devicestats = session.query(DeviceStats).all() self.assertEqual(len(devicestats), 1) @@ -114,7 +114,7 @@ class TestDB(unittest.TestCase): session.add(self.ab03) session.commit() - update_device_stats(session, date='2017-12-10') + update_devices(session, date='2017-12-10') devicestats = session.query(DeviceStats).all() self.assertEqual(len(devicestats), 1) @@ -142,7 +142,7 @@ class TestDB(unittest.TestCase): session.add(self.ab04) session.commit() - update_device_stats(session, date='2017-12-10') + update_devices(session, date='2017-12-10') devicestats = session.query(DeviceStats).all() self.assertEqual(len(devicestats), 1) @@ -168,7 +168,7 @@ class TestDB(unittest.TestCase): session.add(self.ab05) session.commit() - update_device_stats(session, date='2017-12-10') + update_devices(session, date='2017-12-10') devicestats = session.query(DeviceStats).all() self.assertEqual(len(devicestats), 1) @@ -195,7 +195,7 @@ class TestDB(unittest.TestCase): session.add(self.ab06) session.commit() - update_device_stats(session, date='2017-12-10') + update_devices(session, date='2017-12-10') devicestats = session.query(DeviceStats).all() self.assertEqual(len(devicestats), 1) diff --git a/tests/test_utils.py b/tests/test_utils.py index a5fa8be..b2e7d35 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,12 +1,19 @@ import os import unittest +from datetime import date from ogn.model import AircraftType -from ogn.utils import get_ddb, get_trackable, get_country_code, get_airports +from ogn.utils import get_days, get_ddb, get_trackable, get_airports import unittest.mock as mock class TestStringMethods(unittest.TestCase): + def test_get_days(self): + start = date(2018, 2, 27) + end = date(2018, 3, 2) + days = get_days(start, end) + self.assertEqual(days, [date(2018, 2, 27), date(2018, 2, 28), date(2018, 3, 1), date(2018, 3, 2)]) + def test_get_devices(self): devices = get_ddb() self.assertGreater(len(devices), 1000)