Test and fix update_devices

pull/68/head
Konstantin Gründger 2017-12-16 21:08:01 +01:00
rodzic 1da1d472df
commit 52c40582c4
4 zmienionych plików z 154 dodań i 38 usunięć

Wyświetl plik

@ -4,10 +4,10 @@ from sqlalchemy import insert, distinct
from sqlalchemy.sql import null, and_, or_, func, not_ from sqlalchemy.sql import null, and_, or_, func, not_
from sqlalchemy.sql.expression import case from sqlalchemy.sql.expression import case
from ogn.collect.celery import app
from ogn.model import DeviceInfo, DeviceInfoOrigin, AircraftBeacon, ReceiverBeacon, Device, Receiver from ogn.model import DeviceInfo, DeviceInfoOrigin, AircraftBeacon, ReceiverBeacon, Device, Receiver
from ogn.utils import get_ddb, get_country_code from ogn.utils import get_ddb, get_country_code
from .celery import app
logger = get_task_logger(__name__) logger = get_task_logger(__name__)
@ -26,72 +26,81 @@ def update_device_infos(session, address_origin, csvfile=None):
@app.task @app.task
def import_ddb(): def import_ddb(session=None):
"""Import registered devices from the DDB.""" """Import registered devices from the DDB."""
if session is None:
session = app.session
logger.info("Import registered devices fom the DDB...") logger.info("Import registered devices fom the DDB...")
address_origin = DeviceInfoOrigin.ogn_ddb address_origin = DeviceInfoOrigin.ogn_ddb
counter = update_device_infos(app.session, address_origin) counter = update_device_infos(session, address_origin)
logger.info("Imported {} devices.".format(counter)) logger.info("Imported {} devices.".format(counter))
@app.task @app.task
def import_file(path='tests/custom_ddb.txt'): def import_file(session=None, path='tests/custom_ddb.txt'):
"""Import registered devices from a local file.""" """Import registered devices from a local file."""
if session is None:
session = app.session
logger.info("Import registered devices from '{}'...".format(path)) logger.info("Import registered devices from '{}'...".format(path))
address_origin = DeviceInfoOrigin.user_defined address_origin = DeviceInfoOrigin.user_defined
counter = update_device_infos(app.session, address_origin, csvfile=path) counter = update_device_infos(session, address_origin, csvfile=path)
logger.info("Imported {} devices.".format(counter)) logger.info("Imported {} devices.".format(counter))
@app.task @app.task
def update_devices(): def update_devices(session=None):
"""Add/update entries in devices table and update foreign keys in aircraft beacons.""" """Add/update entries in devices table and update foreign keys in aircraft beacons."""
if session is None:
session = app.session
# Create missing Device from AircraftBeacon # Create missing Device from AircraftBeacon
available_devices = app.session.query(Device.address) \ available_devices = session.query(Device.address) \
.subquery() .subquery()
missing_devices_query = app.session.query(distinct(AircraftBeacon.address)) \ missing_devices_query = session.query(distinct(AircraftBeacon.address)) \
.filter(and_(AircraftBeacon.device_id == null(), not_(AircraftBeacon.address.like('00%')), AircraftBeacon.error_count == 0)) \ .filter(and_(AircraftBeacon.device_id == null(), not_(AircraftBeacon.address.like('00%')), AircraftBeacon.error_count == 0)) \
.filter(~AircraftBeacon.address.in_(available_devices)) .filter(~AircraftBeacon.address.in_(available_devices))
ins = insert(Device).from_select([Device.address], missing_devices_query) ins = insert(Device).from_select([Device.address], missing_devices_query)
res = app.session.execute(ins) res = session.execute(ins)
insert_count = res.rowcount insert_count = res.rowcount
app.session.commit() session.commit()
# For each address in the new beacons: get firstseen, lastseen and last values != NULL # For each address in the new beacons: get firstseen, lastseen and last values != NULL
last_valid_values = app.session.query( last_valid_values = session.query(
distinct(AircraftBeacon.address).label('address'), distinct(AircraftBeacon.address).label('address'),
func.first_value(AircraftBeacon.timestamp) func.first_value(AircraftBeacon.timestamp)
.over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.timestamp == null(), None), (AircraftBeacon.timestamp != null(), AircraftBeacon.timestamp)])) .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.timestamp == null(), None)], else_=AircraftBeacon.timestamp).asc().nullslast())
.label('firstseen'), .label('firstseen'),
func.last_value(AircraftBeacon.timestamp) func.first_value(AircraftBeacon.timestamp)
.over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.timestamp == null(), None), (AircraftBeacon.timestamp != null(), AircraftBeacon.timestamp)])) .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.timestamp == null(), None)], else_=AircraftBeacon.timestamp).desc().nullslast())
.label('lastseen'), .label('lastseen'),
func.first_value(AircraftBeacon.aircraft_type) func.first_value(AircraftBeacon.aircraft_type)
.over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.aircraft_type == null(), None), (AircraftBeacon.aircraft_type != null(), AircraftBeacon.aircraft_type)])) .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.aircraft_type == null(), None)], else_=AircraftBeacon.timestamp).desc().nullslast())
.label('aircraft_type'), .label('aircraft_type'),
func.first_value(AircraftBeacon.stealth) func.first_value(AircraftBeacon.stealth)
.over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.stealth == null(), None), (AircraftBeacon.stealth != null(), AircraftBeacon.stealth)])) .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.stealth == null(), None)], else_=AircraftBeacon.timestamp).desc().nullslast())
.label('stealth'), .label('stealth'),
func.first_value(AircraftBeacon.software_version) func.first_value(AircraftBeacon.software_version)
.over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.software_version == null(), None), (AircraftBeacon.software_version != null(), AircraftBeacon.software_version)])) .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.software_version == null(), None)], else_=AircraftBeacon.timestamp).desc().nullslast())
.label('software_version'), .label('software_version'),
func.first_value(AircraftBeacon.hardware_version) func.first_value(AircraftBeacon.hardware_version)
.over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.hardware_version == null(), None), (AircraftBeacon.hardware_version != null(), AircraftBeacon.hardware_version)])) .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.hardware_version == null(), None)], else_=AircraftBeacon.timestamp).desc().nullslast())
.label('hardware_version'), .label('hardware_version'),
func.first_value(AircraftBeacon.real_address) func.first_value(AircraftBeacon.real_address)
.over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.real_address == null(), None), (AircraftBeacon.real_address != null(), AircraftBeacon.real_address)])) .over(partition_by=AircraftBeacon.address, order_by=case([(AircraftBeacon.real_address == null(), None)], else_=AircraftBeacon.timestamp).desc().nullslast())
.label('real_address')) \ .label('real_address')) \
.filter(and_(AircraftBeacon.device_id == null(), AircraftBeacon.error_count == 0)) \ .filter(and_(AircraftBeacon.device_id == null(), AircraftBeacon.error_count == 0)) \
.subquery() .subquery()
update_values = app.session.query( update_values = session.query(
Device.address, Device.address,
case([(or_(Device.firstseen == null(), Device.firstseen > last_valid_values.c.firstseen), last_valid_values.c.firstseen), case([(or_(Device.firstseen == null(), Device.firstseen > last_valid_values.c.firstseen), last_valid_values.c.firstseen),
(Device.firstseen <= last_valid_values.c.firstseen, Device.firstseen)]).label('firstseen'), (Device.firstseen <= last_valid_values.c.firstseen, Device.firstseen)]).label('firstseen'),
@ -110,7 +119,7 @@ def update_devices():
.filter(Device.address == last_valid_values.c.address) \ .filter(Device.address == last_valid_values.c.address) \
.subquery() .subquery()
update_receivers = app.session.query(Device) \ update_receivers = session.query(Device) \
.filter(Device.address == update_values.c.address) \ .filter(Device.address == update_values.c.address) \
.update({ .update({
Device.firstseen: update_values.c.firstseen, Device.firstseen: update_values.c.firstseen,
@ -123,14 +132,14 @@ def update_devices():
synchronize_session='fetch') synchronize_session='fetch')
# Update relations to aircraft beacons # Update relations to aircraft beacons
upd = app.session.query(AircraftBeacon) \ upd = session.query(AircraftBeacon) \
.filter(AircraftBeacon.device_id == null()) \ .filter(AircraftBeacon.device_id == null()) \
.filter(AircraftBeacon.address == Device.address) \ .filter(AircraftBeacon.address == Device.address) \
.update({ .update({
AircraftBeacon.device_id: Device.id}, AircraftBeacon.device_id: Device.id},
synchronize_session='fetch') synchronize_session='fetch')
app.session.commit() session.commit()
logger.info("Devices: {} inserted, {} updated".format(insert_count, update_receivers)) logger.info("Devices: {} inserted, {} updated".format(insert_count, update_receivers))
logger.info("Updated {} AircraftBeacons".format(upd)) logger.info("Updated {} AircraftBeacons".format(upd))
@ -139,22 +148,26 @@ def update_devices():
@app.task @app.task
def update_receivers(): def update_receivers(session=None):
"""Add/update_receivers entries in receiver table and update receivers foreign keys and distance in aircraft beacons and update foreign keys in receiver beacons.""" """Add/update_receivers entries in receiver table and update receivers foreign keys and distance in aircraft beacons and update foreign keys in receiver beacons."""
if session is None:
session = app.session
# Create missing Receiver from ReceiverBeacon # Create missing Receiver from ReceiverBeacon
available_receivers = app.session.query(Receiver.name) \ available_receivers = session.query(Receiver.name) \
.subquery() .subquery()
missing_receiver_query = app.session.query(distinct(ReceiverBeacon.name)) \ missing_receiver_query = session.query(distinct(ReceiverBeacon.name)) \
.filter(ReceiverBeacon.receiver_id == null()) \ .filter(ReceiverBeacon.receiver_id == null()) \
.filter(~ReceiverBeacon.name.in_(available_receivers)) .filter(~ReceiverBeacon.name.in_(available_receivers))
ins = insert(Receiver).from_select([Receiver.name], missing_receiver_query) ins = insert(Receiver).from_select([Receiver.name], missing_receiver_query)
res = app.session.execute(ins) res = session.execute(ins)
insert_count = res.rowcount insert_count = res.rowcount
# For each name in the new beacons: get firstseen, lastseen and last values != NULL # For each name in the new beacons: get firstseen, lastseen and last values != NULL
last_valid_values = app.session.query( last_valid_values = session.query(
distinct(ReceiverBeacon.name).label('name'), distinct(ReceiverBeacon.name).label('name'),
func.first_value(ReceiverBeacon.timestamp) func.first_value(ReceiverBeacon.timestamp)
.over(partition_by=ReceiverBeacon.name, order_by=case([(ReceiverBeacon.timestamp == null(), None), (ReceiverBeacon.timestamp != null(), ReceiverBeacon.timestamp)])) .over(partition_by=ReceiverBeacon.name, order_by=case([(ReceiverBeacon.timestamp == null(), None), (ReceiverBeacon.timestamp != null(), ReceiverBeacon.timestamp)]))
@ -177,7 +190,7 @@ def update_receivers():
.filter(ReceiverBeacon.receiver_id == null()) \ .filter(ReceiverBeacon.receiver_id == null()) \
.subquery() .subquery()
update_values = app.session.query( update_values = session.query(
Receiver.name, Receiver.name,
case([(or_(Receiver.firstseen == null(), Receiver.firstseen > last_valid_values.c.firstseen), last_valid_values.c.firstseen), case([(or_(Receiver.firstseen == null(), Receiver.firstseen > last_valid_values.c.firstseen), last_valid_values.c.firstseen),
(Receiver.firstseen <= last_valid_values.c.firstseen, Receiver.firstseen)]).label('firstseen'), (Receiver.firstseen <= last_valid_values.c.firstseen, Receiver.firstseen)]).label('firstseen'),
@ -196,7 +209,7 @@ def update_receivers():
.filter(Receiver.name == last_valid_values.c.name) \ .filter(Receiver.name == last_valid_values.c.name) \
.subquery() .subquery()
update_receivers = app.session.query(Receiver) \ update_receivers = session.query(Receiver) \
.filter(Receiver.name == update_values.c.name) \ .filter(Receiver.name == update_values.c.name) \
.update({ .update({
Receiver.firstseen: update_values.c.firstseen, Receiver.firstseen: update_values.c.firstseen,
@ -209,19 +222,19 @@ def update_receivers():
synchronize_session='fetch') synchronize_session='fetch')
# Update relations to aircraft beacons # Update relations to aircraft beacons
update_aircraft_beacons = app.session.query(AircraftBeacon) \ update_aircraft_beacons = session.query(AircraftBeacon) \
.filter(and_(AircraftBeacon.receiver_id == null(), AircraftBeacon.receiver_name == Receiver.name)) \ .filter(and_(AircraftBeacon.receiver_id == null(), AircraftBeacon.receiver_name == Receiver.name)) \
.update({AircraftBeacon.receiver_id: Receiver.id, .update({AircraftBeacon.receiver_id: Receiver.id,
AircraftBeacon.distance: func.ST_Distance_Sphere(AircraftBeacon.location_wkt, Receiver.location_wkt)}, AircraftBeacon.distance: func.ST_Distance_Sphere(AircraftBeacon.location_wkt, Receiver.location_wkt)},
synchronize_session='fetch') synchronize_session='fetch')
# Update relations to receiver beacons # Update relations to receiver beacons
update_receiver_beacons = app.session.query(ReceiverBeacon) \ update_receiver_beacons = session.query(ReceiverBeacon) \
.filter(and_(ReceiverBeacon.receiver_id == null(), ReceiverBeacon.name == Receiver.name)) \ .filter(and_(ReceiverBeacon.receiver_id == null(), ReceiverBeacon.name == Receiver.name)) \
.update({ReceiverBeacon.receiver_id: Receiver.id}, .update({ReceiverBeacon.receiver_id: Receiver.id},
synchronize_session='fetch') synchronize_session='fetch')
app.session.commit() session.commit()
logger.info("Receivers: {} inserted, {} updated.".format(insert_count, update_receivers)) logger.info("Receivers: {} inserted, {} updated.".format(insert_count, update_receivers))
logger.info("Updated relations: {} aircraft beacons, {} receiver beacons".format(update_aircraft_beacons, update_receiver_beacons)) logger.info("Updated relations: {} aircraft beacons, {} receiver beacons".format(update_aircraft_beacons, update_receiver_beacons))
@ -231,9 +244,13 @@ def update_receivers():
@app.task @app.task
def update_country_code(): def update_country_code(session=None):
# update country code in receivers table if None # update country code in receivers table if None
unknown_country_query = app.session.query(Receiver) \
if session is None:
session = app.session
unknown_country_query = session.query(Receiver) \
.filter(Receiver.country_code == null()) \ .filter(Receiver.country_code == null()) \
.filter(Receiver.location_wkt != null()) \ .filter(Receiver.location_wkt != null()) \
.order_by(Receiver.name) .order_by(Receiver.name)
@ -247,6 +264,6 @@ def update_country_code():
logger.info("Updated country_code for {} to {}".format(receiver.name, receiver.country_code)) logger.info("Updated country_code for {} to {}".format(receiver.name, receiver.country_code))
counter += 1 counter += 1
app.session.commit() session.commit()
return "Updated country_code for {} Receivers".format(counter) return "Updated country_code for {} Receivers".format(counter)

Wyświetl plik

@ -52,6 +52,17 @@ class AircraftBeacon(Beacon):
Index('ix_aircraft_beacon_receiver_id_receiver_name', 'receiver_id', 'receiver_name') Index('ix_aircraft_beacon_receiver_id_receiver_name', 'receiver_id', 'receiver_name')
Index('ix_aircraft_beacon_device_id_address', 'device_id', 'address') Index('ix_aircraft_beacon_device_id_address', 'device_id', 'address')
def __init__(self, receiver_name, address, timestamp, aircraft_type, stealth, error_count, software_version, hardware_version, real_address):
self.receiver_name
self.address = address
self.timestamp = timestamp
self.aircraft_type = aircraft_type
self.stealth = stealth
self.error_count = error_count
self.software_version = software_version
self.hardware_version = hardware_version
self.real_address = real_address
def __repr__(self): def __repr__(self):
return "<AircraftBeacon %s: %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s>" % ( return "<AircraftBeacon %s: %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s>" % (
self.address_type, self.address_type,

Wyświetl plik

@ -0,0 +1,90 @@
import unittest
import os
from ogn.model import AircraftBeacon, Device
from ogn.collect.database import update_devices
class TestDB(unittest.TestCase):
session = None
engine = None
app = None
def setUp(self):
os.environ['OGN_CONFIG_MODULE'] = 'config.test'
from ogn.commands.dbutils import engine, session
self.session = session
self.engine = engine
from ogn.commands.database import init
init()
# Create basic data and insert
self.ab00 = AircraftBeacon(receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:00', aircraft_type=1, stealth=False, error_count=0, software_version=None, hardware_version=None, real_address=None)
self.ab01 = AircraftBeacon(receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:01', aircraft_type=1, stealth=False, error_count=0, software_version=0.26, hardware_version=None, real_address=None)
self.ab02 = AircraftBeacon(receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:02', aircraft_type=1, stealth=False, error_count=1, software_version=0.27, hardware_version=None, real_address=None)
self.ab03 = AircraftBeacon(receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:03', aircraft_type=1, stealth=False, error_count=0, software_version=None, hardware_version=5, real_address='DD1234')
self.ab04 = AircraftBeacon(receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:04', aircraft_type=1, stealth=False, error_count=0, software_version=0.25, hardware_version=123, real_address='DDxxxx')
self.ab05 = AircraftBeacon(receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:05', aircraft_type=1, stealth=False, error_count=0, software_version=None, hardware_version=None, real_address='DD0815')
def tearDown(self):
session = self.session
session.execute("DELETE FROM device")
session.execute("DELETE FROM receiver")
session.execute("DELETE FROM aircraft_beacon")
session.commit()
def test_update_devices(self):
session = self.session
# Compute 1st beacon
session.add(self.ab00)
session.commit()
update_devices(session)
devices = session.query(Device).all()
self.assertEqual(len(devices), 1)
self.assertEqual(devices[0].address, 'DD4711')
self.assertEqual(devices[0].software_version, None)
self.assertEqual(self.ab00.device_id, devices[0].id)
# Compute 2nd beacon: changed software version
session.add(self.ab01)
session.commit()
update_devices(session)
devices = session.query(Device).all()
self.assertEqual(len(devices), 1)
self.assertEqual(devices[0].address, 'DD4711')
self.assertEqual(devices[0].software_version, 0.26)
# Compute 3rd beacon: changed software version, but with error_count > 0
session.add(self.ab02)
session.commit()
update_devices(session)
devices = session.query(Device).all()
self.assertEqual(len(devices), 1)
self.assertEqual(devices[0].address, 'DD4711')
self.assertEqual(devices[0].software_version, 0.26)
self.assertEqual(devices[0].hardware_version, None)
self.assertEqual(devices[0].real_address, None)
# Compute 4.-6. beacon
session.add(self.ab03)
session.add(self.ab05) # order is not important
session.add(self.ab04)
session.commit()
update_devices(session)
devices = session.query(Device).all()
self.assertEqual(len(devices), 1)
self.assertEqual(devices[0].address, 'DD4711')
self.assertEqual(devices[0].software_version, 0.25)
self.assertEqual(devices[0].hardware_version, 123)
self.assertEqual(devices[0].real_address, 'DD0815')
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -1,8 +1,6 @@
import unittest import unittest
import os import os
from sqlalchemy.sql import null
from ogn.model import Logbook, Airport, Device, TakeoffLanding from ogn.model import Logbook, Airport, Device, TakeoffLanding
from ogn.collect.logbook import update_logbook from ogn.collect.logbook import update_logbook