Merge pull request #21 from kerel-fs/cli/-celery

Model: Fix AddressOrigin; CLI: Remove celery dependency
pull/22/head
Meisterschueler 2015-12-09 13:46:44 +01:00
commit d65e8c6d97
12 zmienionych plików z 141 dodań i 112 usunięć

Wyświetl plik

@ -58,24 +58,27 @@ optional arguments:
available commands:
[db]
import_ddb Import registered devices from the DDB.
import_file Import registered devices from local file.
init Initialize the database.
update_ddb_file Update devices with data from local file.
update_ddb_ogn Update devices with data from ogn.
[gateway]
run Run the aprs client.
[logbook]
compute Compute takeoffs and landings.
show Show a logbook for <airport_name> located at given position.
[show.devices]
stats Show some stats on registered devices.
[show.receiver]
hardware_stats Show some statistics of receiver hardware.
list_all Show a list of all receivers.
software_stats Show some statistics of receiver software.
```
The task server must be running for `db.updateddb`.
Only the command `logbook.compute` requires a running task server (celery) at the moment.
## TODO
- [x] Write celery backend and add task 'fetchddb'
@ -92,9 +95,13 @@ The task server must be running for `db.updateddb`.
- [ ] Introduce scheduled tasks with 'celery beat' (eg. updateddb)
### Scheduled tasks
- ogn.collect.fetchddb (generate Flarm table)
- ogn.collect.receiver (generate Receiver table)
- ogn.collect.logbook (generate TakeoffLanding table)
- ogn.collect.database
- import_ddb - Import registered devices from the ddb
- import_file - Import registered devices from a local file
- ogn.collect.receiver
- populate - generate Receiver table (not implemented)
- ogn.collect.logbook
- compute - generate TakeoffLanding table
## How to use virtualenv
```

Wyświetl plik

@ -1,5 +1,3 @@
from __future__ import absolute_import
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
@ -9,7 +7,7 @@ from celery.signals import worker_init, worker_shutdown
app = Celery('ogn.collect',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0',
include=["ogn.collect.fetchddb", "ogn.collect.logbook"])
include=["ogn.collect.database", "ogn.collect.logbook"])
DB_URI = 'sqlite:///beacons.db'

Wyświetl plik

@ -0,0 +1,35 @@
from ogn.model import Device
from celery.utils.log import get_task_logger
from ogn.collect.celery import app
logger = get_task_logger(__name__)
def update_devices(session, origin, devices):
session.query(Device) \
.filter(Device.address_origin == origin) \
.delete()
session.bulk_save_objects(devices)
session.commit()
return len(devices)
@app.task
def import_ddb():
"""Import registered devices from the DDB."""
logger.info("Import registered devices fom the DDB...")
counter = update_devices(app.session, AddressOrigin.ogn_ddb, get_ddb())
logger.info("Imported %i devices." % counter)
@app.task
def import_file(path='tests/custom_ddb.txt'):
"""Import registered devices from a local file."""
logger.info("Import registered devices from '{}'...".format(path))
counter = update_devices(app.session, AddressOrigin.user_defined, get_ddb(path))
logger.info("Imported %i devices." % counter)

Wyświetl plik

@ -1,43 +0,0 @@
from __future__ import absolute_import
from celery.utils.log import get_task_logger
from ogn.collect.celery import app
from ogn.model import AddressOrigin, Device
from ogn.utils import get_ddb
logger = get_task_logger(__name__)
@app.task
def update_ddb_from_ogn():
logger.info("Update ddb data from ogn.")
app.session.query(Device) \
.filter(AddressOrigin(Device.address_origin) is AddressOrigin.ogn_ddb) \
.delete()
devices = get_ddb()
logger.debug("New Devices: %s" % str(devices))
app.session.bulk_save_objects(devices)
app.session.commit()
return len(devices)
@app.task
def update_ddb_from_file():
logger.info("Import ddb data from file.")
app.session.query(Device) \
.filter(AddressOrigin(Device.address_origin) is AddressOrigin.userdefined) \
.delete()
devices = get_ddb('ogn/custom_ddb.txt')
logger.debug("New Devices: %s" % str(devices))
app.session.bulk_save_objects(devices)
app.session.commit()
return len(devices)

Wyświetl plik

@ -1,5 +1,3 @@
from __future__ import absolute_import
from datetime import datetime, timedelta
from celery.utils.log import get_task_logger

Wyświetl plik

@ -1,5 +1,6 @@
from .database import manager as database_manager
from .showreceiver import manager as show_receiver_manager
from .showdevices import manager as show_devices_manager
from .logbook import manager as logbook_manager
from manager import Manager
@ -8,4 +9,5 @@ manager = Manager()
manager.merge(database_manager, namespace='db')
manager.merge(show_receiver_manager, namespace='show.receiver')
manager.merge(show_devices_manager, namespace='show.devices')
manager.merge(logbook_manager, namespace='logbook')

Wyświetl plik

@ -1,8 +1,7 @@
from sqlalchemy import func, and_, true, false
from ogn.model import Base, AddressOrigin, Device
from ogn.collect.fetchddb import update_ddb_from_ogn, update_ddb_from_file
from ogn.commands.dbutils import engine, session
from ogn.model import Base, AddressOrigin
from ogn.utils import get_ddb
from ogn.collect.database import update_devices
from manager import Manager
manager = Manager()
@ -11,54 +10,25 @@ manager = Manager()
@manager.command
def init():
"""Initialize the database."""
Base.metadata.create_all(engine)
print("Done.")
@manager.command
def update_ddb_ogn():
"""Update devices with data from ogn."""
print("Updating ddb data...")
result = update_ddb_from_ogn.delay()
counter = result.get()
def import_ddb():
"""Import registered devices from the DDB."""
print("Import registered devices fom the DDB...")
counter = update_devices(session, AddressOrigin.ogn_ddb, get_ddb())
print("Imported %i devices." % counter)
@manager.command
def update_ddb_file():
"""Update devices with data from local file."""
print("Updating ddb data...")
result = update_ddb_from_file.delay()
counter = result.get()
def import_file(path='tests/custom_ddb.txt'):
"""Import registered devices from a local file."""
# (flushes previously manually imported entries)
print("Import registered devices from '{}'...".format(path))
counter = update_devices(session, AddressOrigin.user_defined, get_ddb(path))
print("Imported %i devices." % counter)
@manager.command
def stats():
"""Show some devices stats."""
sq_nt = session.query(Device.address) \
.filter(and_(Device.tracked == false(), Device.identified == true())) \
.subquery()
sq_ni = session.query(Device.address) \
.filter(and_(Device.tracked == true(), Device.identified == false())) \
.subquery()
sq_ntni = session.query(Device.address) \
.filter(and_(Device.tracked == false(), Device.identified == false())) \
.subquery()
query = session.query(Device.address_origin, func.count(Device.id), func.count(sq_nt.c.address), func.count(sq_ni.c.address), func.count(sq_ntni.c.address)) \
.outerjoin(sq_nt, sq_nt.c.address == Device.address) \
.outerjoin(sq_ni, sq_ni.c.address == Device.address) \
.outerjoin(sq_ntni, sq_ntni.c.address == Device.address) \
.group_by(Device.address_origin)
print('--- Devices ---')
for [address_origin, device_count, nt_count, ni_count, ntni_count] in query.all():
print('{:12s} Total:{:5d} - not tracked:{:3d}, not identified:{:3d}, not tracked & not identified: {:3d}'
.format(AddressOrigin(address_origin).name,
device_count,
nt_count,
ni_count,
ntni_count))

Wyświetl plik

@ -0,0 +1,53 @@
from ogn.commands.dbutils import engine, session
from ogn.model import AddressOrigin, Device
from sqlalchemy import func, and_, true, false
from manager import Manager
manager = Manager()
def get_devices_stats(session):
sq_nt = session.query(Device.address) \
.filter(and_(Device.tracked == false(), Device.identified == true())) \
.subquery()
sq_ni = session.query(Device.address) \
.filter(and_(Device.tracked == true(), Device.identified == false())) \
.subquery()
sq_ntni = session.query(Device.address) \
.filter(and_(Device.tracked == false(), Device.identified == false())) \
.subquery()
query = session.query(Device.address_origin,
func.count(Device.id),
func.count(sq_nt.c.address),
func.count(sq_ni.c.address),
func.count(sq_ntni.c.address)) \
.outerjoin(sq_nt, sq_nt.c.address == Device.address) \
.outerjoin(sq_ni, sq_ni.c.address == Device.address) \
.outerjoin(sq_ntni, sq_ntni.c.address == Device.address) \
.group_by(Device.address_origin)
stats = {}
for [address_origin, device_count, nt_count, ni_count, ntni_count] in query.all():
origin = AddressOrigin(address_origin).name()
stats[origin] = {'device_count': device_count,
'nt_count': nt_count,
'ni_count': ni_count,
'ntni_count': ntni_count}
return stats
@manager.command
def stats():
"""Show some stats on registered devices."""
print('--- Devices ---')
stats = get_devices_stats(session)
for origin in stats:
print('{:12s} Total:{:5d} - not tracked:{:3d}, not identified:{:3d}, not tracked & not identified: {:3d}'
.format(origin,
stats[origin]['device_count'],
stats[origin]['nt_count'],
stats[origin]['ni_count'],
stats[origin]['ntni_count']))

Wyświetl plik

@ -1,2 +0,0 @@
#DEVICE_TYPE,DEVICE_ID,AIRCRAFT_MODEL,REGISTRATION,CN,TRACKED,IDENTIFIED
'F','DD0000','YourAircraft','D-1234','CN','Y','Y'

Wyświetl plik

@ -1,8 +1,19 @@
from enum import Enum, unique
@unique
class AddressOrigin(Enum):
class AddressOrigin:
ogn_ddb = 1
flarmnet = 2
userdefined = 3
user_defined = 3
def __init__(self, origin):
if origin in [1, 2, 3]:
self.origin = origin
else:
raise ValueError('no address origin with id {} known'.format(origin))
def name(self):
if self.origin == self.ogn_ddb:
return 'OGN-DDB'
elif self.origin == self.flarmnet:
return 'FlarmNet'
elif self.origin == self.user_defined:
return 'user-defined'
return ''

Wyświetl plik

@ -25,7 +25,7 @@ def get_ddb(csvfile=None):
else:
r = open(csvfile, 'r')
rows = ''.join(i for i in r.readlines() if i[0] != '#')
address_origin = AddressOrigin.userdefined
address_origin = AddressOrigin.user_defined
data = csv.reader(StringIO(rows), quotechar="'", quoting=csv.QUOTE_ALL)

Wyświetl plik

@ -21,7 +21,7 @@ class TestStringMethods(unittest.TestCase):
self.assertTrue(device.tracked)
self.assertTrue(device.identified)
self.assertEqual(device.address_origin, AddressOrigin.userdefined)
self.assertEqual(device.address_origin, AddressOrigin.user_defined)
def test_get_trackable(self):
devices = get_ddb('tests/custom_ddb.txt')