diff --git a/ogn/commands/database.py b/ogn/commands/database.py index f12be0f..971d9c6 100644 --- a/ogn/commands/database.py +++ b/ogn/commands/database.py @@ -101,6 +101,62 @@ def update_devices(): ins = insert(Device).from_select([Device.address], missing_devices_query) res = session.execute(ins) insert_count = res.rowcount + session.commit() + + # For each address in the new beacons: get firstseen, lastseen and last values != NULL + last_valid_values = session.query(distinct(AircraftBeacon.address).label('address'), + func.first_value(AircraftBeacon.timestamp) \ + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.timestamp == null(), None), (AircraftBeacon.timestamp != null(), AircraftBeacon.timestamp)])) \ + .label('firstseen'), + func.last_value(AircraftBeacon.timestamp) \ + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.timestamp == null(), None), (AircraftBeacon.timestamp != null(), AircraftBeacon.timestamp)])) \ + .label('lastseen'), + func.first_value(AircraftBeacon.aircraft_type) \ + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.aircraft_type == null(), None), (AircraftBeacon.aircraft_type != null(), AircraftBeacon.aircraft_type)])) \ + .label('aircraft_type'), + func.first_value(AircraftBeacon.stealth) \ + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.stealth == null(), None), (AircraftBeacon.stealth != null(), AircraftBeacon.stealth)])) \ + .label('stealth'), + func.first_value(AircraftBeacon.software_version) \ + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.software_version == null(), None), (AircraftBeacon.software_version != null(), AircraftBeacon.software_version)])) \ + .label('software_version'), + func.first_value(AircraftBeacon.hardware_version) \ + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.hardware_version == null(), None), (AircraftBeacon.hardware_version != null(), AircraftBeacon.hardware_version)])) \ + .label('hardware_version'), + func.first_value(AircraftBeacon.real_address) \ + .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.real_address == null(), None), (AircraftBeacon.real_address != null(), AircraftBeacon.real_address)])) \ + .label('real_address')) \ + .filter(and_(AircraftBeacon.device_id == null(), AircraftBeacon.error_count == 0)) \ + .subquery() + + update_values = session.query(Device.address, + case([(or_(Device.firstseen == null(), Device.firstseen > last_valid_values.c.firstseen), last_valid_values.c.firstseen), + (Device.firstseen <= last_valid_values.c.firstseen, Device.firstseen)]).label('firstseen'), + case([(or_(Device.lastseen == null(), Device.lastseen < last_valid_values.c.lastseen), last_valid_values.c.lastseen), + (Device.lastseen >= last_valid_values.c.lastseen, Device.lastseen)]).label('lastseen'), + case([(or_(Device.aircraft_type == null(), Device.lastseen < last_valid_values.c.lastseen), last_valid_values.c.aircraft_type), + (Device.lastseen >= last_valid_values.c.lastseen, Device.aircraft_type)]).label('aircraft_type'), + case([(or_(Device.stealth == null(), Device.lastseen < last_valid_values.c.lastseen), last_valid_values.c.stealth), + (Device.lastseen >= last_valid_values.c.lastseen, Device.stealth)]).label('stealth'), + case([(or_(Device.software_version == null(), Device.lastseen < last_valid_values.c.lastseen), last_valid_values.c.software_version), + (Device.lastseen >= last_valid_values.c.lastseen, Device.software_version)]).label('software_version'), + case([(or_(Device.hardware_version == null(), Device.lastseen < last_valid_values.c.lastseen), last_valid_values.c.hardware_version), + (Device.lastseen >= last_valid_values.c.lastseen, Device.hardware_version)]).label('hardware_version'), + case([(or_(Device.real_address == null(), Device.lastseen < last_valid_values.c.lastseen), last_valid_values.c.real_address), + (Device.lastseen >= last_valid_values.c.lastseen, Device.real_address)]).label('real_address')) \ + .filter(Device.address == last_valid_values.c.address) \ + .subquery() + + update_receivers = session.query(Device) \ + .filter(Device.address == update_values.c.address) \ + .update({Device.firstseen: update_values.c.firstseen, + Device.lastseen: update_values.c.lastseen, + Device.aircraft_type: update_values.c.aircraft_type, + Device.stealth: update_values.c.stealth, + Device.software_version: update_values.c.software_version, + Device.hardware_version: update_values.c.hardware_version, + Device.real_address: update_values.c.real_address}, + synchronize_session='fetch') # Update relations to aircraft beacons upd = session.query(AircraftBeacon) \ @@ -110,7 +166,7 @@ def update_devices(): synchronize_session='fetch') session.commit() - print("Inserted {} Devices".format(insert_count)) + print("Devices: {} inserted, {} updated".format(insert_count, update_receivers)) print("Updated {} AircraftBeacons".format(upd)) @@ -129,34 +185,28 @@ def update_receivers(): res = session.execute(ins) insert_count = res.rowcount - # Update missing or changed values, update them and set country code to None if location changed - new_values_range = session.query(ReceiverBeacon.name, - func.min(ReceiverBeacon.timestamp).label('firstseen'), - func.max(ReceiverBeacon.timestamp).label('lastseen')) \ - .filter(ReceiverBeacon.receiver_id == null()) \ - .group_by(ReceiverBeacon.name) \ - .subquery() - - last_values = session.query(ReceiverBeacon.name, - func.max(new_values_range.c.firstseen).label('firstseen'), - func.max(new_values_range.c.lastseen).label('lastseen'), - func.max(ReceiverBeacon.location_wkt).label('location_wkt'), - func.max(ReceiverBeacon.altitude).label('altitude'), - func.max(ReceiverBeacon.version).label('version'), - func.max(ReceiverBeacon.platform).label('platform')) \ - .filter(and_(ReceiverBeacon.name == new_values_range.c.name, - ReceiverBeacon.timestamp == new_values_range.c.lastseen)) \ - .group_by(ReceiverBeacon.name) \ - .subquery() - - last_valid_values = session.query(last_values) \ - .filter(and_(last_values.c.firstseen != null(), - last_values.c.lastseen != null(), - last_values.c.location_wkt != null(), - last_values.c.altitude != null(), - last_values.c.version != null(), - last_values.c.platform != null())) \ - .subquery() + # For each name in the new beacons: get firstseen, lastseen and last values != NULL + last_valid_values = session.query(distinct(ReceiverBeacon.name).label('name'), + func.first_value(ReceiverBeacon.timestamp) \ + .over(partition_by=ReceiverBeacon.name, order_by=case([(ReceiverBeacon.timestamp == null(), None), (ReceiverBeacon.timestamp != null(), ReceiverBeacon.timestamp)])) \ + .label('firstseen'), + func.last_value(ReceiverBeacon.timestamp) \ + .over(partition_by=ReceiverBeacon.name, order_by=case([(ReceiverBeacon.timestamp == null(), None), (ReceiverBeacon.timestamp != null(), ReceiverBeacon.timestamp)])) \ + .label('lastseen'), + func.first_value(ReceiverBeacon.location_wkt) \ + .over(partition_by=ReceiverBeacon.name, order_by=case([(ReceiverBeacon.location_wkt == null(), None), (ReceiverBeacon.location_wkt != null(), ReceiverBeacon.location_wkt)])) \ + .label('location_wkt'), + func.first_value(ReceiverBeacon.altitude) \ + .over(partition_by=ReceiverBeacon.name, order_by=case([(ReceiverBeacon.altitude == null(), None), (ReceiverBeacon.altitude != null(), ReceiverBeacon.altitude)])) \ + .label('altitude'), + func.first_value(ReceiverBeacon.version) \ + .over(partition_by=ReceiverBeacon.name, order_by=case([(ReceiverBeacon.version == null(), None), (ReceiverBeacon.version != null(), ReceiverBeacon.version)])) \ + .label('version'), + func.first_value(ReceiverBeacon.platform) \ + .over(partition_by=ReceiverBeacon.name, order_by=case([(ReceiverBeacon.platform == null(), None), (ReceiverBeacon.platform != null(), ReceiverBeacon.platform)])) \ + .label('platform')) \ + .filter(ReceiverBeacon.receiver_id == null()) \ + .subquery() update_values = session.query(Receiver.name, case([(or_(Receiver.firstseen == null(), Receiver.firstseen > last_valid_values.c.firstseen), last_valid_values.c.firstseen), @@ -171,7 +221,7 @@ def update_receivers(): (Receiver.lastseen >= last_valid_values.c.lastseen, Receiver.version)]).label('version'), case([(or_(Receiver.lastseen == null(), Receiver.lastseen < last_valid_values.c.lastseen), last_valid_values.c.platform), (Receiver.lastseen >= last_valid_values.c.lastseen, Receiver.platform)]).label('platform'), - case([(or_(Receiver.location_wkt == null(), not_(func.ST_Equals(Receiver.location_wkt, last_valid_values.c.location_wkt))), None), + case([(or_(Receiver.location_wkt == null(), not_(func.ST_Equals(Receiver.location_wkt, last_valid_values.c.location_wkt))), None), # set country code to None if location changed (func.ST_Equals(Receiver.location_wkt, last_valid_values.c.location_wkt), Receiver.country_code)]).label('country_code')) \ .filter(Receiver.name == last_valid_values.c.name) \ .subquery()