Added collector for ognrange

pull/68/head
Konstantin Gründger 2018-01-07 14:33:04 +01:00
rodzic 3ba9c9eaf9
commit 6b6f86f1ca
5 zmienionych plików z 179 dodań i 0 usunięć

Wyświetl plik

@ -29,6 +29,7 @@ app = Celery('ogn.collect',
"ogn.collect.logbook",
"ogn.collect.stats",
"ogn.collect.takeoff_landing",
"ogn.collect.ognrange"
])
app.config_from_envvar("OGN_CONFIG_MODULE")

Wyświetl plik

@ -0,0 +1,88 @@
from celery.utils.log import get_task_logger
from sqlalchemy import String
from sqlalchemy import and_, or_, insert, update, exists
from sqlalchemy.sql import func, null
from sqlalchemy.sql.expression import true, false
from ogn.collect.celery import app
from ogn.model import AircraftBeacon, ReceiverCoverage
logger = get_task_logger(__name__)
@app.task
def update_receiver_coverage(session=None):
"""Add/update receiver coverage entries."""
logger.info("Compute receiver coverage.")
if session is None:
session = app.session
# Filter aircraft beacons and shrink precision of MGRS from 1m to 1km resolution: 30UXC 00061 18429 -> 30UXC 00 18
sq = session.query((func.left(AircraftBeacon.location_mgrs, 5, type_=String) + func.substring(AircraftBeacon.location_mgrs, 6, 2, type_=String) + func.substring(AircraftBeacon.location_mgrs, 11, 2, type_=String)).label('reduced_mgrs'),
AircraftBeacon.receiver_id,
func.date(AircraftBeacon.timestamp).label('date'),
AircraftBeacon.signal_quality,
AircraftBeacon.altitude,
AircraftBeacon.device_id) \
.filter(and_(AircraftBeacon.location_mgrs != null(),
AircraftBeacon.receiver_id != null(),
AircraftBeacon.device_id != null())) \
.subquery()
# ... and group them by reduced MGRS, receiver and date
query = session.query(sq.c.reduced_mgrs,
sq.c.receiver_id,
sq.c.date,
func.max(sq.c.signal_quality).label('max_signal_quality'),
func.min(sq.c.altitude).label('min_altitude'),
func.max(sq.c.altitude).label('max_altitude'),
func.count(sq.c.altitude).label('aircraft_beacon_count'),
func.count(func.distinct(sq.c.device_id)).label('device_count')) \
.group_by(sq.c.reduced_mgrs,
sq.c.receiver_id,
sq.c.date) \
.subquery()
# if a receiver coverage entry exist --> update it
upd = update(ReceiverCoverage) \
.where(and_(ReceiverCoverage.location_mgrs == query.c.reduced_mgrs,
ReceiverCoverage.receiver_id == query.c.receiver_id,
ReceiverCoverage.date == query.c.date)) \
.values({"max_signal_quality": query.c.max_signal_quality,
"min_altitude": query.c.min_altitude,
"max_altitude": query.c.max_altitude,
"aircraft_beacon_count": query.c.aircraft_beacon_count,
"device_count": query.c.device_count})
result = session.execute(upd)
update_counter = result.rowcount
session.commit()
logger.debug("Updated receiver coverage entries: {}".format(update_counter))
# if a receiver coverage entry doesnt exist --> insert it
new_coverage_entries = session.query(query) \
.filter(~exists().where(
and_(ReceiverCoverage.location_mgrs == query.c.reduced_mgrs,
ReceiverCoverage.receiver_id == query.c.receiver_id,
ReceiverCoverage.date == query.c.date)))
ins = insert(ReceiverCoverage).from_select((
ReceiverCoverage.location_mgrs,
ReceiverCoverage.receiver_id,
ReceiverCoverage.date,
ReceiverCoverage.max_signal_quality,
ReceiverCoverage.min_altitude,
ReceiverCoverage.max_altitude,
ReceiverCoverage.aircraft_beacon_count,
ReceiverCoverage.device_count),
new_coverage_entries)
result = session.execute(ins)
insert_counter = result.rowcount
session.commit()
logger.debug("New receiver coverage entries: {}".format(insert_counter))
return "Receiver coverage entries: {} inserted, {} updated".format(insert_counter, update_counter)

Wyświetl plik

@ -13,5 +13,6 @@ from .receiver_stats import ReceiverStats
from .takeoff_landing import TakeoffLanding
from .airport import Airport
from .logbook import Logbook
from .receiver_coverage import ReceiverCoverage
from .geo import Location

Wyświetl plik

@ -0,0 +1,23 @@
from sqlalchemy import Column, String, Integer, Float, Date, ForeignKey, Index
from sqlalchemy.orm import relationship
from .base import Base
class ReceiverCoverage(Base):
__tablename__ = "receiver_coverage"
location_mgrs = Column(String(9), primary_key=True)
receiver_id = Column(Integer, ForeignKey('receiver.id', ondelete='SET NULL'), primary_key=True)
date = Column(Date, primary_key=True)
max_signal_quality = Column(Float)
max_altitude = Column(Integer)
min_altitude = Column(Integer)
aircraft_beacon_count = Column(Integer)
device_count = Column(Integer)
# Relations
receiver = relationship('Receiver', foreign_keys=[receiver_id], backref='receiver_coverages')

Wyświetl plik

@ -0,0 +1,66 @@
import unittest
import os
from ogn.model import AircraftBeacon, Receiver, ReceiverCoverage, Device
from ogn.collect.ognrange import update_receiver_coverage
class TestDB(unittest.TestCase):
session = None
engine = None
app = None
def setUp(self):
os.environ['OGN_CONFIG_MODULE'] = 'config.test'
from ogn.commands.dbutils import engine, session
self.session = session
self.engine = engine
from ogn.commands.database import init
init()
# Create basic data and insert
self.dd0815 = Device(address='DD0815')
self.dd4711 = Device(address='DD4711')
self.r01 = Receiver(name='Koenigsdf')
self.r02 = Receiver(name='Bene')
session.add(self.dd0815)
session.add(self.dd4711)
session.add(self.r01)
session.add(self.r02)
session.commit()
# Create beacons and insert
self.ab01 = AircraftBeacon(device_id=self.dd0815.id, receiver_id=self.r01.id, timestamp='2017-12-10 10:00:00', location_mgrs='89ABC1234567890', altitude=800)
self.ab02 = AircraftBeacon(device_id=self.dd0815.id, receiver_id=self.r01.id, timestamp='2017-12-10 10:00:01', location_mgrs='89ABC1299967999', altitude=850)
session.add(self.ab01)
session.add(self.ab02)
session.commit()
def tearDown(self):
session = self.session
session.execute("DELETE FROM aircraft_beacon")
session.execute("DELETE FROM receiver_coverage")
session.execute("DELETE FROM device")
session.execute("DELETE FROM receiver")
session.commit()
def test_update_receiver_coverage(self):
session = self.session
update_receiver_coverage(session)
coverages = session.query(ReceiverCoverage).all()
self.assertEqual(len(coverages), 1)
coverage = coverages[0]
self.assertEqual(coverage.location_mgrs, '89ABC1267')
self.assertEqual(coverage.receiver_id, self.r01.id)
self.assertEqual(coverage.min_altitude, 800)
self.assertEqual(coverage.max_altitude, 850)
if __name__ == '__main__':
unittest.main()