kopia lustrzana https://github.com/glidernet/ogn-python
Renamed celery to celery_tasks
rodzic
c3c4f029bd
commit
b924fa369f
|
@ -1,106 +0,0 @@
|
||||||
import datetime
|
|
||||||
|
|
||||||
from celery.utils.log import get_task_logger
|
|
||||||
|
|
||||||
from app.collect.takeoff_landings import update_entries as takeoff_update_entries
|
|
||||||
|
|
||||||
from app.collect.logbook import update_entries as logbook_update_entries
|
|
||||||
from app.collect.logbook import update_max_altitudes as logbook_update_max_altitudes
|
|
||||||
|
|
||||||
from app.collect.database import import_ddb as device_infos_import_ddb
|
|
||||||
from app.collect.database import update_country_code as receivers_update_country_code
|
|
||||||
|
|
||||||
from app.collect.stats import create_device_stats, update_device_stats_jumps, create_receiver_stats, create_relation_stats, update_qualities, update_receivers, update_devices
|
|
||||||
|
|
||||||
from app.collect.ognrange import update_entries as receiver_coverage_update_entries
|
|
||||||
|
|
||||||
from app import db
|
|
||||||
from app import celery
|
|
||||||
|
|
||||||
|
|
||||||
logger = get_task_logger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="update_takeoff_landings")
|
|
||||||
def update_takeoff_landings(last_minutes):
|
|
||||||
"""Compute takeoffs and landings."""
|
|
||||||
|
|
||||||
end = datetime.datetime.utcnow()
|
|
||||||
start = end - datetime.timedelta(minutes=last_minutes)
|
|
||||||
result = takeoff_update_entries(session=db.session, start=start, end=end, logger=logger)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="update_logbook_entries")
|
|
||||||
def update_logbook_entries(day_offset):
|
|
||||||
"""Add/update logbook entries."""
|
|
||||||
|
|
||||||
date = datetime.datetime.today() + datetime.timedelta(days=day_offset)
|
|
||||||
result = logbook_update_entries(session=db.session, date=date, logger=logger)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="update_logbook_max_altitude")
|
|
||||||
def update_logbook_max_altitude(day_offset):
|
|
||||||
"""Add max altitudes in logbook when flight is complete (takeoff and landing)."""
|
|
||||||
|
|
||||||
date = datetime.datetime.today() + datetime.timedelta(days=day_offset)
|
|
||||||
result = logbook_update_max_altitudes(session=db.session, date=date, logger=logger)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="import_ddb")
|
|
||||||
def import_ddb():
|
|
||||||
"""Import registered devices from the DDB."""
|
|
||||||
|
|
||||||
result = device_infos_import_ddb(session=db.session, logger=logger)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="update_receivers_country_code")
|
|
||||||
def update_receivers_country_code():
|
|
||||||
"""Update country code in receivers table if None."""
|
|
||||||
|
|
||||||
result = receivers_update_country_code(session=db.session, logger=logger)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="purge_old_data")
|
|
||||||
def purge_old_data(max_hours):
|
|
||||||
"""Delete AircraftBeacons and ReceiverBeacons older than given 'age'."""
|
|
||||||
|
|
||||||
from app.model import AircraftBeacon, ReceiverBeacon
|
|
||||||
|
|
||||||
min_timestamp = datetime.datetime.utcnow() - datetime.timedelta(hours=max_hours)
|
|
||||||
aircraft_beacons_deleted = db.session.query(AircraftBeacon).filter(AircraftBeacon.timestamp < min_timestamp).delete()
|
|
||||||
|
|
||||||
receiver_beacons_deleted = db.session.query(ReceiverBeacon).filter(ReceiverBeacon.timestamp < min_timestamp).delete()
|
|
||||||
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
result = "{} AircraftBeacons deleted, {} ReceiverBeacons deleted".format(aircraft_beacons_deleted, receiver_beacons_deleted)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="update_stats")
|
|
||||||
def update_stats(day_offset):
|
|
||||||
"""Create stats and update receivers/devices with stats."""
|
|
||||||
|
|
||||||
date = datetime.datetime.today() + datetime.timedelta(days=day_offset)
|
|
||||||
|
|
||||||
create_device_stats(session=db.session, date=date)
|
|
||||||
update_device_stats_jumps(session=db.session, date=date)
|
|
||||||
create_receiver_stats(session=db.session, date=date)
|
|
||||||
create_relation_stats(session=db.session, date=date)
|
|
||||||
update_qualities(session=db.session, date=date)
|
|
||||||
update_receivers(session=db.session)
|
|
||||||
update_devices(session=db.session)
|
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="update_ognrange")
|
|
||||||
def update_ognrange(day_offset):
|
|
||||||
"""Create receiver coverage stats for Melissas ognrange."""
|
|
||||||
|
|
||||||
date = datetime.datetime.today() + datetime.timedelta(days=day_offset)
|
|
||||||
|
|
||||||
receiver_coverage_update_entries(session=db.session, date=date)
|
|
|
@ -1,12 +1,93 @@
|
||||||
from datetime import datetime
|
import datetime
|
||||||
|
|
||||||
from flask import current_app
|
from celery.utils.log import get_task_logger
|
||||||
|
|
||||||
from app import create_app
|
from app.collect.takeoff_landings import update_entries as takeoff_update_entries
|
||||||
from app import redis_client, celery
|
|
||||||
|
from app.collect.logbook import update_entries as logbook_update_entries
|
||||||
|
from app.collect.logbook import update_max_altitudes as logbook_update_max_altitudes
|
||||||
|
|
||||||
|
from app.collect.database import import_ddb as device_infos_import_ddb
|
||||||
|
from app.collect.database import update_country_code as receivers_update_country_code
|
||||||
|
|
||||||
|
from app.collect.ognrange import update_entries as receiver_coverage_update_entries
|
||||||
|
|
||||||
from app.gateway.bulkimport import DbFeeder
|
from app.gateway.bulkimport import DbFeeder
|
||||||
|
|
||||||
|
from app import db
|
||||||
|
from app import redis_client, celery
|
||||||
|
|
||||||
|
logger = get_task_logger(__name__)
|
||||||
|
|
||||||
|
@celery.task(name="update_takeoff_landings")
|
||||||
|
def update_takeoff_landings(last_minutes):
|
||||||
|
"""Compute takeoffs and landings."""
|
||||||
|
|
||||||
|
end = datetime.datetime.utcnow()
|
||||||
|
start = end - datetime.timedelta(minutes=last_minutes)
|
||||||
|
result = takeoff_update_entries(session=db.session, start=start, end=end, logger=logger)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@celery.task(name="update_logbook_entries")
|
||||||
|
def update_logbook_entries(day_offset):
|
||||||
|
"""Add/update logbook entries."""
|
||||||
|
|
||||||
|
date = datetime.datetime.today() + datetime.timedelta(days=day_offset)
|
||||||
|
result = logbook_update_entries(session=db.session, date=date, logger=logger)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@celery.task(name="update_logbook_max_altitude")
|
||||||
|
def update_logbook_max_altitude(day_offset):
|
||||||
|
"""Add max altitudes in logbook when flight is complete (takeoff and landing)."""
|
||||||
|
|
||||||
|
date = datetime.datetime.today() + datetime.timedelta(days=day_offset)
|
||||||
|
result = logbook_update_max_altitudes(session=db.session, date=date, logger=logger)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@celery.task(name="import_ddb")
|
||||||
|
def import_ddb():
|
||||||
|
"""Import registered devices from the DDB."""
|
||||||
|
|
||||||
|
result = device_infos_import_ddb(session=db.session, logger=logger)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@celery.task(name="update_receivers_country_code")
|
||||||
|
def update_receivers_country_code():
|
||||||
|
"""Update country code in receivers table if None."""
|
||||||
|
|
||||||
|
result = receivers_update_country_code(session=db.session, logger=logger)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@celery.task(name="purge_old_data")
|
||||||
|
def purge_old_data(max_hours):
|
||||||
|
"""Delete AircraftBeacons and ReceiverBeacons older than given 'age'."""
|
||||||
|
|
||||||
|
from app.model import AircraftBeacon, ReceiverBeacon
|
||||||
|
|
||||||
|
min_timestamp = datetime.datetime.utcnow() - datetime.timedelta(hours=max_hours)
|
||||||
|
aircraft_beacons_deleted = db.session.query(AircraftBeacon).filter(AircraftBeacon.timestamp < min_timestamp).delete()
|
||||||
|
|
||||||
|
receiver_beacons_deleted = db.session.query(ReceiverBeacon).filter(ReceiverBeacon.timestamp < min_timestamp).delete()
|
||||||
|
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
result = "{} AircraftBeacons deleted, {} ReceiverBeacons deleted".format(aircraft_beacons_deleted, receiver_beacons_deleted)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@celery.task(name="update_ognrange")
|
||||||
|
def update_ognrange(day_offset):
|
||||||
|
"""Create receiver coverage stats for Melissas ognrange."""
|
||||||
|
|
||||||
|
date = datetime.datetime.today() + datetime.timedelta(days=day_offset)
|
||||||
|
|
||||||
|
receiver_coverage_update_entries(session=db.session, date=date)
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="transfer_beacons_to_database")
|
@celery.task(name="transfer_beacons_to_database")
|
||||||
def transfer_beacons_to_database():
|
def transfer_beacons_to_database():
|
||||||
|
@ -20,7 +101,7 @@ def transfer_beacons_to_database():
|
||||||
redis_client.delete(key)
|
redis_client.delete(key)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
reference_timestamp = datetime.strptime(key[11:].decode('utf-8'), "%Y-%m-%d %H:%M:%S.%f")
|
reference_timestamp = datetime.datetime.strptime(key[11:].decode('utf-8'), "%Y-%m-%d %H:%M:%S.%f")
|
||||||
aprs_string = value.decode('utf-8')
|
aprs_string = value.decode('utf-8')
|
||||||
redis_client.delete(key)
|
redis_client.delete(key)
|
||||||
|
|
||||||
|
@ -28,9 +109,3 @@ def transfer_beacons_to_database():
|
||||||
counter += 1
|
counter += 1
|
||||||
|
|
||||||
return f"Beacons transfered from redis to TimescaleDB: {counter}"
|
return f"Beacons transfered from redis to TimescaleDB: {counter}"
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
app = create_app()
|
|
||||||
with app.app_context():
|
|
||||||
result = transfer_beacons_to_database.delay()
|
|
||||||
print(result)
|
|
||||||
|
|
Ładowanie…
Reference in New Issue