Handle tasks (eg. updateddb) with celery.

New dependencies: celery and redis.
pull/1/head
Fabian P. Schmidt 2015-11-15 09:23:57 +01:00
rodzic 6cb9dd7801
commit 371e4564e8
8 zmienionych plików z 85 dodań i 39 usunięć

Wyświetl plik

Wyświetl plik

@ -0,0 +1,30 @@
from __future__ import absolute_import
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from celery import Celery, Task
from celery.signals import worker_init, worker_shutdown
app = Celery('ogn.collect',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0',
include=["ogn.collect.fetchddb"])
DB_URI = 'sqlite:///beacons.db'
@worker_init.connect
def connect_db(signal, sender):
# Load settings like DB_URI...
engine = create_engine(DB_URI, echo=False)
Session = sessionmaker(bind=engine)
sender.app.session = Session()
@worker_shutdown.connect
def close_db(signal, sender):
sender.app.session.close()
if __name__ == '__main__':
app.start()

Wyświetl plik

@ -0,0 +1,29 @@
from __future__ import absolute_import
from celery.utils.log import get_task_logger
from ogn.collect.celery import app
from ogn.model import Flarm
from ogn.utils import get_ddb
logger = get_task_logger(__name__)
@app.task
def update_ddb_data():
logger.info("Update ddb data.")
app.session.query(Flarm).delete()
devices = get_ddb()
logger.info("Devices: %s"%str(devices))
app.session.bulk_save_objects(devices)
app.session.commit()
return len(devices)
# TODO: Reimplement.
def import_ddb_data(filename='custom.txt'):
flarms = get_ddb(filename)
db.session.bulk_save_objects(flarms)
session.commit()

Wyświetl plik

@ -1,25 +1,32 @@
from __future__ import absolute_import
from datetime import datetime, timedelta from datetime import datetime, timedelta
from celery.utils.log import get_task_logger
from ogn.collect.celery import app
from sqlalchemy.sql import func, null from sqlalchemy.sql import func, null
from sqlalchemy import and_, or_, insert, between from sqlalchemy import and_, or_, insert, between
from sqlalchemy.sql.expression import case, true, false, label from sqlalchemy.sql.expression import case, true, false, label
from ogn.db import session
from ogn.model import Flarm, AircraftBeacon, TakeoffLanding from ogn.model import Flarm, AircraftBeacon, TakeoffLanding
logger = get_task_logger(__name__)
@app.task
def compute_takeoff_and_landing(): def compute_takeoff_and_landing():
takeoff_speed = 30 takeoff_speed = 30
landing_speed = 30 landing_speed = 30
# get last takeoff_landing time as starting point for the following search # get last takeoff_landing time as starting point for the following search
last_takeoff_landing_query = session.query(func.max(TakeoffLanding.timestamp)) last_takeoff_landing_query = app.session.query(func.max(TakeoffLanding.timestamp))
last_takeoff_landing = last_takeoff_landing_query.one()[0] last_takeoff_landing = last_takeoff_landing_query.one()[0]
if last_takeoff_landing is None: if last_takeoff_landing is None:
last_takeoff_landing = datetime(2015, 1, 1, 0, 0, 0) last_takeoff_landing = datetime(2015, 1, 1, 0, 0, 0)
# make a query with current, previous and next position, so we can detect takeoffs and landings # make a query with current, previous and next position, so we can detect takeoffs and landings
sq = session.query(AircraftBeacon.address, sq = app.session.query(AircraftBeacon.address,
func.lag(AircraftBeacon.address).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('address_prev'), func.lag(AircraftBeacon.address).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('address_prev'),
func.lead(AircraftBeacon.address).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('address_next'), func.lead(AircraftBeacon.address).over(order_by=and_(AircraftBeacon.address, AircraftBeacon.timestamp)).label('address_next'),
AircraftBeacon.timestamp, AircraftBeacon.timestamp,
@ -46,7 +53,7 @@ def compute_takeoff_and_landing():
.subquery() .subquery()
# find takeoffs and landings (look at the trigger_speed) # find takeoffs and landings (look at the trigger_speed)
takeoff_landing_query = session.query(sq.c.address, sq.c.timestamp, sq.c.latitude, sq.c.longitude, sq.c.track, sq.c.ground_speed, sq.c.altitude, case([(sq.c.ground_speed>takeoff_speed, True), (sq.c.ground_speed<landing_speed, False)]).label('is_takeoff')) \ takeoff_landing_query = app.session.query(sq.c.address, sq.c.timestamp, sq.c.latitude, sq.c.longitude, sq.c.track, sq.c.ground_speed, sq.c.altitude, case([(sq.c.ground_speed>takeoff_speed, True), (sq.c.ground_speed<landing_speed, False)]).label('is_takeoff')) \
.filter(sq.c.address_prev == sq.c.address == sq.c.address_next) \ .filter(sq.c.address_prev == sq.c.address == sq.c.address_next) \
.filter(or_(and_(sq.c.ground_speed_prev < takeoff_speed, # takeoff .filter(or_(and_(sq.c.ground_speed_prev < takeoff_speed, # takeoff
sq.c.ground_speed > takeoff_speed, sq.c.ground_speed > takeoff_speed,
@ -58,5 +65,8 @@ def compute_takeoff_and_landing():
# ... and save them # ... and save them
ins = insert(TakeoffLanding).from_select((TakeoffLanding.address, TakeoffLanding.timestamp, TakeoffLanding.latitude, TakeoffLanding.longitude, TakeoffLanding.track, TakeoffLanding.ground_speed, TakeoffLanding.altitude, TakeoffLanding.is_takeoff), takeoff_landing_query) ins = insert(TakeoffLanding).from_select((TakeoffLanding.address, TakeoffLanding.timestamp, TakeoffLanding.latitude, TakeoffLanding.longitude, TakeoffLanding.track, TakeoffLanding.ground_speed, TakeoffLanding.altitude, TakeoffLanding.is_takeoff), takeoff_landing_query)
session.execute(ins) app.session.execute(ins)
session.commit() app.session.commit()

Wyświetl plik

@ -11,3 +11,12 @@ def init():
from dbutils import engine from dbutils import engine
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
print("Done.") print("Done.")
@manager.command
def updateddb():
"""Update the ddb data."""
print("Updating ddb data...")
result = update_ddb_data.delay()
counter = result.get()
print("Imported %i devieces."%counter)

Wyświetl plik

@ -1,15 +0,0 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from .model import Base, AircraftBeacon, ReceiverBeacon, Flarm
# prepare db
#engine = create_engine('sqlite:///:memory:', echo=False)
engine = create_engine('sqlite:///ogn.db', echo=False)
#engine = create_engine('postgresql://postgres:secretpass@localhost:5432/ogn')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()

Wyświetl plik

@ -1,18 +0,0 @@
from ogn.db import session
from ogn.model import Flarm
from ogn.ognutils import get_ddb
def fill_flarm_db():
session.query(Flarm).delete()
flarms = get_ddb()
session.bulk_save_objects(flarms)
flarms = get_ddb('custom.txt')
session.bulk_save_objects(flarms)
session.commit()
if __name__ == '__main__':
fill_flarm_db()

Wyświetl plik

@ -3,3 +3,4 @@ nose==1.3.7
coveralls==0.4.4 coveralls==0.4.4
geopy==1.11.0 geopy==1.11.0
manage.py==0.2.10 manage.py==0.2.10
celery[redis]>=3.1,<3.2