From 371e4564e8e222415b3411e714e4a070e1be1948 Mon Sep 17 00:00:00 2001 From: "Fabian P. Schmidt" Date: Sun, 15 Nov 2015 09:23:57 +0100 Subject: [PATCH] Handle tasks (eg. updateddb) with celery. New dependencies: celery and redis. --- ogn/collect/__init__.py | 0 ogn/collect/celery.py | 30 ++++++++++++++++++++++++++++++ ogn/collect/fetchddb.py | 29 +++++++++++++++++++++++++++++ ogn/{ => collect}/logbook.py | 22 ++++++++++++++++------ ogn/commands/database.py | 9 +++++++++ ogn/db.py | 15 --------------- ogn/db_utils.py | 18 ------------------ requirements.txt | 1 + 8 files changed, 85 insertions(+), 39 deletions(-) create mode 100644 ogn/collect/__init__.py create mode 100644 ogn/collect/celery.py create mode 100644 ogn/collect/fetchddb.py rename ogn/{ => collect}/logbook.py (87%) delete mode 100644 ogn/db.py delete mode 100644 ogn/db_utils.py diff --git a/ogn/collect/__init__.py b/ogn/collect/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ogn/collect/celery.py b/ogn/collect/celery.py new file mode 100644 index 0000000..f6892dd --- /dev/null +++ b/ogn/collect/celery.py @@ -0,0 +1,30 @@ +from __future__ import absolute_import + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from celery import Celery, Task +from celery.signals import worker_init, worker_shutdown + +app = Celery('ogn.collect', + broker='redis://localhost:6379/0', + backend='redis://localhost:6379/0', + include=["ogn.collect.fetchddb"]) + +DB_URI = 'sqlite:///beacons.db' + +@worker_init.connect +def connect_db(signal, sender): + # Load settings like DB_URI... + engine = create_engine(DB_URI, echo=False) + + Session = sessionmaker(bind=engine) + sender.app.session = Session() + +@worker_shutdown.connect +def close_db(signal, sender): + sender.app.session.close() + + +if __name__ == '__main__': + app.start() diff --git a/ogn/collect/fetchddb.py b/ogn/collect/fetchddb.py new file mode 100644 index 0000000..65d6728 --- /dev/null +++ b/ogn/collect/fetchddb.py @@ -0,0 +1,29 @@ +from __future__ import absolute_import + +from celery.utils.log import get_task_logger +from ogn.collect.celery import app + +from ogn.model import Flarm +from ogn.utils import get_ddb + +logger = get_task_logger(__name__) + + +@app.task +def update_ddb_data(): + logger.info("Update ddb data.") + + app.session.query(Flarm).delete() + + devices = get_ddb() + logger.info("Devices: %s"%str(devices)) + app.session.bulk_save_objects(devices) + + app.session.commit() + return len(devices) + +# TODO: Reimplement. +def import_ddb_data(filename='custom.txt'): + flarms = get_ddb(filename) + db.session.bulk_save_objects(flarms) + session.commit() diff --git a/ogn/logbook.py b/ogn/collect/logbook.py similarity index 87% rename from ogn/logbook.py rename to ogn/collect/logbook.py index a77d9ef..20c6a3f 100644 --- a/ogn/logbook.py +++ b/ogn/collect/logbook.py @@ -1,25 +1,32 @@ +from __future__ import absolute_import + from datetime import datetime, timedelta +from celery.utils.log import get_task_logger +from ogn.collect.celery import app + from sqlalchemy.sql import func, null from sqlalchemy import and_, or_, insert, between from sqlalchemy.sql.expression import case, true, false, label -from ogn.db import session from ogn.model import Flarm, AircraftBeacon, TakeoffLanding +logger = get_task_logger(__name__) + +@app.task def compute_takeoff_and_landing(): takeoff_speed = 30 landing_speed = 30 # get last takeoff_landing time as starting point for the following search - last_takeoff_landing_query = session.query(func.max(TakeoffLanding.timestamp)) + last_takeoff_landing_query = app.session.query(func.max(TakeoffLanding.timestamp)) last_takeoff_landing = last_takeoff_landing_query.one()[0] if last_takeoff_landing is None: last_takeoff_landing = datetime(2015, 1, 1, 0, 0, 0) # make a query with current, previous and next position, so we can detect takeoffs and landings - sq = session.query(AircraftBeacon.address, + sq = app.session.query(AircraftBeacon.address, func.lag(AircraftBeacon.address).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('address_prev'), func.lead(AircraftBeacon.address).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('address_next'), AircraftBeacon.timestamp, @@ -46,7 +53,7 @@ def compute_takeoff_and_landing(): .subquery() # find takeoffs and landings (look at the trigger_speed) - takeoff_landing_query = session.query(sq.c.address, sq.c.timestamp, sq.c.latitude, sq.c.longitude, sq.c.track, sq.c.ground_speed, sq.c.altitude, case([(sq.c.ground_speed>takeoff_speed, True), (sq.c.ground_speedtakeoff_speed, True), (sq.c.ground_speed takeoff_speed, @@ -58,5 +65,8 @@ def compute_takeoff_and_landing(): # ... and save them ins = insert(TakeoffLanding).from_select((TakeoffLanding.address, TakeoffLanding.timestamp, TakeoffLanding.latitude, TakeoffLanding.longitude, TakeoffLanding.track, TakeoffLanding.ground_speed, TakeoffLanding.altitude, TakeoffLanding.is_takeoff), takeoff_landing_query) - session.execute(ins) - session.commit() + app.session.execute(ins) + app.session.commit() + + + diff --git a/ogn/commands/database.py b/ogn/commands/database.py index 532e5c0..cd2f6a3 100755 --- a/ogn/commands/database.py +++ b/ogn/commands/database.py @@ -11,3 +11,12 @@ def init(): from dbutils import engine Base.metadata.create_all(engine) print("Done.") + + +@manager.command +def updateddb(): + """Update the ddb data.""" + print("Updating ddb data...") + result = update_ddb_data.delay() + counter = result.get() + print("Imported %i devieces."%counter) diff --git a/ogn/db.py b/ogn/db.py deleted file mode 100644 index 69fd212..0000000 --- a/ogn/db.py +++ /dev/null @@ -1,15 +0,0 @@ -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker - -from .model import Base, AircraftBeacon, ReceiverBeacon, Flarm - - -# prepare db -#engine = create_engine('sqlite:///:memory:', echo=False) -engine = create_engine('sqlite:///ogn.db', echo=False) -#engine = create_engine('postgresql://postgres:secretpass@localhost:5432/ogn') - -Base.metadata.create_all(engine) - -Session = sessionmaker(bind=engine) -session = Session() diff --git a/ogn/db_utils.py b/ogn/db_utils.py deleted file mode 100644 index ac1170a..0000000 --- a/ogn/db_utils.py +++ /dev/null @@ -1,18 +0,0 @@ -from ogn.db import session -from ogn.model import Flarm -from ogn.ognutils import get_ddb - - -def fill_flarm_db(): - session.query(Flarm).delete() - - flarms = get_ddb() - session.bulk_save_objects(flarms) - - flarms = get_ddb('custom.txt') - session.bulk_save_objects(flarms) - - session.commit() - -if __name__ == '__main__': - fill_flarm_db() diff --git a/requirements.txt b/requirements.txt index d4a9b90..2e74710 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ nose==1.3.7 coveralls==0.4.4 geopy==1.11.0 manage.py==0.2.10 +celery[redis]>=3.1,<3.2