diff --git a/ogn/collect/database.py b/ogn/collect/database.py index cd30823..78e1537 100644 --- a/ogn/collect/database.py +++ b/ogn/collect/database.py @@ -4,10 +4,10 @@ from sqlalchemy import insert, distinct from sqlalchemy.sql import null, and_, or_, func, not_ from sqlalchemy.sql.expression import case +from ogn.collect.celery import app from ogn.model import DeviceInfo, DeviceInfoOrigin, AircraftBeacon, ReceiverBeacon, Device, Receiver from ogn.utils import get_ddb, get_country_code -from .celery import app logger = get_task_logger(__name__) @@ -26,72 +26,81 @@ def update_device_infos(session, address_origin, csvfile=None): @app.task -def import_ddb(): +def import_ddb(session=None): """Import registered devices from the DDB.""" + if session is None: + session = app.session + logger.info("Import registered devices fom the DDB...") address_origin = DeviceInfoOrigin.ogn_ddb - counter = update_device_infos(app.session, address_origin) + counter = update_device_infos(session, address_origin) logger.info("Imported {} devices.".format(counter)) @app.task -def import_file(path='tests/custom_ddb.txt'): +def import_file(session=None, path='tests/custom_ddb.txt'): """Import registered devices from a local file.""" + if session is None: + session = app.session + logger.info("Import registered devices from '{}'...".format(path)) address_origin = DeviceInfoOrigin.user_defined - counter = update_device_infos(app.session, address_origin, csvfile=path) + counter = update_device_infos(session, address_origin, csvfile=path) logger.info("Imported {} devices.".format(counter)) @app.task -def update_devices(): +def update_devices(session=None): """Add/update entries in devices table and update foreign keys in aircraft beacons.""" + if session is None: + session = app.session + # Create missing Device from AircraftBeacon - available_devices = app.session.query(Device.address) \ + available_devices = session.query(Device.address) \ .subquery() - missing_devices_query = app.session.query(distinct(AircraftBeacon.address)) \ + missing_devices_query = session.query(distinct(AircraftBeacon.address)) \ .filter(and_(AircraftBeacon.device_id == null(), not_(AircraftBeacon.address.like('00%')), AircraftBeacon.error_count == 0)) \ .filter(~AircraftBeacon.address.in_(available_devices)) ins = insert(Device).from_select([Device.address], missing_devices_query) - res = app.session.execute(ins) + res = session.execute(ins) insert_count = res.rowcount - app.session.commit() + session.commit() # For each address in the new beacons: get firstseen, lastseen and last values != NULL - last_valid_values = app.session.query( + last_valid_values = session.query( distinct(AircraftBeacon.address).label('address'), func.first_value(AircraftBeacon.timestamp) - .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.timestamp == null(), None), (AircraftBeacon.timestamp != null(), AircraftBeacon.timestamp)])) + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.timestamp == null(), None)], else_=AircraftBeacon.timestamp).asc().nullslast()) .label('firstseen'), - func.last_value(AircraftBeacon.timestamp) - .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.timestamp == null(), None), (AircraftBeacon.timestamp != null(), AircraftBeacon.timestamp)])) + func.first_value(AircraftBeacon.timestamp) + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.timestamp == null(), None)], else_=AircraftBeacon.timestamp).desc().nullslast()) .label('lastseen'), func.first_value(AircraftBeacon.aircraft_type) - .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.aircraft_type == null(), None), (AircraftBeacon.aircraft_type != null(), AircraftBeacon.aircraft_type)])) + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.aircraft_type == null(), None)], else_=AircraftBeacon.timestamp).desc().nullslast()) .label('aircraft_type'), func.first_value(AircraftBeacon.stealth) - .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.stealth == null(), None), (AircraftBeacon.stealth != null(), AircraftBeacon.stealth)])) + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.stealth == null(), None)], else_=AircraftBeacon.timestamp).desc().nullslast()) .label('stealth'), func.first_value(AircraftBeacon.software_version) - .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.software_version == null(), None), (AircraftBeacon.software_version != null(), AircraftBeacon.software_version)])) + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.software_version == null(), None)], else_=AircraftBeacon.timestamp).desc().nullslast()) .label('software_version'), func.first_value(AircraftBeacon.hardware_version) - .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.hardware_version == null(), None), (AircraftBeacon.hardware_version != null(), AircraftBeacon.hardware_version)])) + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.hardware_version == null(), None)], else_=AircraftBeacon.timestamp).desc().nullslast()) .label('hardware_version'), func.first_value(AircraftBeacon.real_address) - .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.real_address == null(), None), (AircraftBeacon.real_address != null(), AircraftBeacon.real_address)])) + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.real_address == null(), None)], else_=AircraftBeacon.timestamp).desc().nullslast()) .label('real_address')) \ .filter(and_(AircraftBeacon.device_id == null(), AircraftBeacon.error_count == 0)) \ .subquery() - update_values = app.session.query( + update_values = session.query( Device.address, case([(or_(Device.firstseen == null(), Device.firstseen > last_valid_values.c.firstseen), last_valid_values.c.firstseen), (Device.firstseen <= last_valid_values.c.firstseen, Device.firstseen)]).label('firstseen'), @@ -110,7 +119,7 @@ def update_devices(): .filter(Device.address == last_valid_values.c.address) \ .subquery() - update_receivers = app.session.query(Device) \ + update_receivers = session.query(Device) \ .filter(Device.address == update_values.c.address) \ .update({ Device.firstseen: update_values.c.firstseen, @@ -123,14 +132,14 @@ def update_devices(): synchronize_session='fetch') # Update relations to aircraft beacons - upd = app.session.query(AircraftBeacon) \ + upd = session.query(AircraftBeacon) \ .filter(AircraftBeacon.device_id == null()) \ .filter(AircraftBeacon.address == Device.address) \ .update({ AircraftBeacon.device_id: Device.id}, synchronize_session='fetch') - app.session.commit() + session.commit() logger.info("Devices: {} inserted, {} updated".format(insert_count, update_receivers)) logger.info("Updated {} AircraftBeacons".format(upd)) @@ -139,22 +148,26 @@ def update_devices(): @app.task -def update_receivers(): +def update_receivers(session=None): """Add/update_receivers entries in receiver table and update receivers foreign keys and distance in aircraft beacons and update foreign keys in receiver beacons.""" + + if session is None: + session = app.session + # Create missing Receiver from ReceiverBeacon - available_receivers = app.session.query(Receiver.name) \ + available_receivers = session.query(Receiver.name) \ .subquery() - missing_receiver_query = app.session.query(distinct(ReceiverBeacon.name)) \ + missing_receiver_query = session.query(distinct(ReceiverBeacon.name)) \ .filter(ReceiverBeacon.receiver_id == null()) \ .filter(~ReceiverBeacon.name.in_(available_receivers)) ins = insert(Receiver).from_select([Receiver.name], missing_receiver_query) - res = app.session.execute(ins) + res = session.execute(ins) insert_count = res.rowcount # For each name in the new beacons: get firstseen, lastseen and last values != NULL - last_valid_values = app.session.query( + last_valid_values = session.query( distinct(ReceiverBeacon.name).label('name'), func.first_value(ReceiverBeacon.timestamp) .over(partition_by=ReceiverBeacon.name, order_by=case([(ReceiverBeacon.timestamp == null(), None), (ReceiverBeacon.timestamp != null(), ReceiverBeacon.timestamp)])) @@ -177,7 +190,7 @@ def update_receivers(): .filter(ReceiverBeacon.receiver_id == null()) \ .subquery() - update_values = app.session.query( + update_values = session.query( Receiver.name, case([(or_(Receiver.firstseen == null(), Receiver.firstseen > last_valid_values.c.firstseen), last_valid_values.c.firstseen), (Receiver.firstseen <= last_valid_values.c.firstseen, Receiver.firstseen)]).label('firstseen'), @@ -196,7 +209,7 @@ def update_receivers(): .filter(Receiver.name == last_valid_values.c.name) \ .subquery() - update_receivers = app.session.query(Receiver) \ + update_receivers = session.query(Receiver) \ .filter(Receiver.name == update_values.c.name) \ .update({ Receiver.firstseen: update_values.c.firstseen, @@ -209,19 +222,19 @@ def update_receivers(): synchronize_session='fetch') # Update relations to aircraft beacons - update_aircraft_beacons = app.session.query(AircraftBeacon) \ + update_aircraft_beacons = session.query(AircraftBeacon) \ .filter(and_(AircraftBeacon.receiver_id == null(), AircraftBeacon.receiver_name == Receiver.name)) \ .update({AircraftBeacon.receiver_id: Receiver.id, AircraftBeacon.distance: func.ST_Distance_Sphere(AircraftBeacon.location_wkt, Receiver.location_wkt)}, synchronize_session='fetch') # Update relations to receiver beacons - update_receiver_beacons = app.session.query(ReceiverBeacon) \ + update_receiver_beacons = session.query(ReceiverBeacon) \ .filter(and_(ReceiverBeacon.receiver_id == null(), ReceiverBeacon.name == Receiver.name)) \ .update({ReceiverBeacon.receiver_id: Receiver.id}, synchronize_session='fetch') - app.session.commit() + session.commit() logger.info("Receivers: {} inserted, {} updated.".format(insert_count, update_receivers)) logger.info("Updated relations: {} aircraft beacons, {} receiver beacons".format(update_aircraft_beacons, update_receiver_beacons)) @@ -231,9 +244,13 @@ def update_receivers(): @app.task -def update_country_code(): +def update_country_code(session=None): # update country code in receivers table if None - unknown_country_query = app.session.query(Receiver) \ + + if session is None: + session = app.session + + unknown_country_query = session.query(Receiver) \ .filter(Receiver.country_code == null()) \ .filter(Receiver.location_wkt != null()) \ .order_by(Receiver.name) @@ -247,6 +264,6 @@ def update_country_code(): logger.info("Updated country_code for {} to {}".format(receiver.name, receiver.country_code)) counter += 1 - app.session.commit() + session.commit() return "Updated country_code for {} Receivers".format(counter) diff --git a/ogn/model/aircraft_beacon.py b/ogn/model/aircraft_beacon.py index ef578c2..5894995 100644 --- a/ogn/model/aircraft_beacon.py +++ b/ogn/model/aircraft_beacon.py @@ -52,6 +52,17 @@ class AircraftBeacon(Beacon): Index('ix_aircraft_beacon_receiver_id_receiver_name', 'receiver_id', 'receiver_name') Index('ix_aircraft_beacon_device_id_address', 'device_id', 'address') + def __init__(self, receiver_name, address, timestamp, aircraft_type, stealth, error_count, software_version, hardware_version, real_address): + self.receiver_name + self.address = address + self.timestamp = timestamp + self.aircraft_type = aircraft_type + self.stealth = stealth + self.error_count = error_count + self.software_version = software_version + self.hardware_version = hardware_version + self.real_address = real_address + def __repr__(self): return "" % ( self.address_type, diff --git a/tests/collect/test_database.py b/tests/collect/test_database.py new file mode 100644 index 0000000..1119dfe --- /dev/null +++ b/tests/collect/test_database.py @@ -0,0 +1,90 @@ +import unittest +import os + +from ogn.model import AircraftBeacon, Device +from ogn.collect.database import update_devices + + +class TestDB(unittest.TestCase): + session = None + engine = None + app = None + + def setUp(self): + os.environ['OGN_CONFIG_MODULE'] = 'config.test' + from ogn.commands.dbutils import engine, session + self.session = session + self.engine = engine + + from ogn.commands.database import init + init() + + # Create basic data and insert + self.ab00 = AircraftBeacon(receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:00', aircraft_type=1, stealth=False, error_count=0, software_version=None, hardware_version=None, real_address=None) + self.ab01 = AircraftBeacon(receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:01', aircraft_type=1, stealth=False, error_count=0, software_version=0.26, hardware_version=None, real_address=None) + self.ab02 = AircraftBeacon(receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:02', aircraft_type=1, stealth=False, error_count=1, software_version=0.27, hardware_version=None, real_address=None) + self.ab03 = AircraftBeacon(receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:03', aircraft_type=1, stealth=False, error_count=0, software_version=None, hardware_version=5, real_address='DD1234') + self.ab04 = AircraftBeacon(receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:04', aircraft_type=1, stealth=False, error_count=0, software_version=0.25, hardware_version=123, real_address='DDxxxx') + self.ab05 = AircraftBeacon(receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:05', aircraft_type=1, stealth=False, error_count=0, software_version=None, hardware_version=None, real_address='DD0815') + + def tearDown(self): + session = self.session + session.execute("DELETE FROM device") + session.execute("DELETE FROM receiver") + session.execute("DELETE FROM aircraft_beacon") + session.commit() + + def test_update_devices(self): + session = self.session + + # Compute 1st beacon + session.add(self.ab00) + session.commit() + + update_devices(session) + + devices = session.query(Device).all() + self.assertEqual(len(devices), 1) + self.assertEqual(devices[0].address, 'DD4711') + self.assertEqual(devices[0].software_version, None) + self.assertEqual(self.ab00.device_id, devices[0].id) + + # Compute 2nd beacon: changed software version + session.add(self.ab01) + session.commit() + + update_devices(session) + devices = session.query(Device).all() + self.assertEqual(len(devices), 1) + self.assertEqual(devices[0].address, 'DD4711') + self.assertEqual(devices[0].software_version, 0.26) + + # Compute 3rd beacon: changed software version, but with error_count > 0 + session.add(self.ab02) + session.commit() + + update_devices(session) + devices = session.query(Device).all() + self.assertEqual(len(devices), 1) + self.assertEqual(devices[0].address, 'DD4711') + self.assertEqual(devices[0].software_version, 0.26) + self.assertEqual(devices[0].hardware_version, None) + self.assertEqual(devices[0].real_address, None) + + # Compute 4.-6. beacon + session.add(self.ab03) + session.add(self.ab05) # order is not important + session.add(self.ab04) + session.commit() + + update_devices(session) + devices = session.query(Device).all() + self.assertEqual(len(devices), 1) + self.assertEqual(devices[0].address, 'DD4711') + self.assertEqual(devices[0].software_version, 0.25) + self.assertEqual(devices[0].hardware_version, 123) + self.assertEqual(devices[0].real_address, 'DD0815') + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/collect/test_logbook.py b/tests/collect/test_logbook.py index 088d72c..cc5e91c 100644 --- a/tests/collect/test_logbook.py +++ b/tests/collect/test_logbook.py @@ -1,8 +1,6 @@ import unittest import os -from sqlalchemy.sql import null - from ogn.model import Logbook, Airport, Device, TakeoffLanding from ogn.collect.logbook import update_logbook