diff --git a/README.md b/README.md index ebc0600..9972b53 100644 --- a/README.md +++ b/README.md @@ -118,13 +118,13 @@ The following scripts run in the foreground and should be deamonized - Start a task server (make sure redis is up and running) ``` - celery -A app.collect worker -l info + celery -A celery_app worker -l info ``` - Start the task scheduler (make sure a task server is up and running) ``` - celery -A app.collect beat -l info + celery -A celery_app beat -l info ``` ### Flask - Command Line Interface @@ -163,14 +163,10 @@ Most commands are command groups, so if you execute this command you will get fu ### Available tasks -- `app.collect.celery.update_takeoff_landings` - Compute takeoffs and landings. -- `app.collect.celery.update_logbook_entries` - Add/update logbook entries. -- `app.collect.celery.update_logbook_max_altitude` - Add max altitudes in logbook when flight is complete (takeoff and landing). -- `app.collect.celery.import_ddb` - Import registered devices from the DDB. -- `app.collect.celery.update_receivers_country_code` - Update country code in receivers table if None. -- `app.collect.celery.purge_old_data` - Delete AircraftBeacons and ReceiverBeacons older than given 'age'. -- `app.collect.celery.update_stats` - Create stats and update receivers/devices with stats. -- `app.collect.celery.update_ognrange` - Create receiver coverage stats for Melissas ognrange. +- `app.tasks.update_takeoff_landings` - Compute takeoffs and landings. +- `app.tasks.celery.update_logbook_entries` - Add/update logbook entries. +- `app.tasks.celery.update_logbook_max_altitude` - Add max altitudes in logbook when flight is complete (takeoff and landing). +- `app.tasks.celery.import_ddb` - Import registered devices from the DDB. If the task server is up and running, tasks could be started manually. Here we compute takeoffs and landings for the past 90 minutes: diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py new file mode 100644 index 0000000..31f918a --- /dev/null +++ b/app/tasks/__init__.py @@ -0,0 +1,5 @@ +from .sql_tasks import update_statistics, update_sender_direction_statistics + +from .orm_tasks import transfer_to_database +from .orm_tasks import update_takeoff_landings, update_logbook, update_logbook_max_altitude +from .orm_tasks import import_ddb diff --git a/app/tasks/orm_tasks.py b/app/tasks/orm_tasks.py new file mode 100644 index 0000000..b308f0c --- /dev/null +++ b/app/tasks/orm_tasks.py @@ -0,0 +1,51 @@ +from datetime import datetime, timedelta + +from app.collect.logbook import update_takeoff_landings as logbook_update_takeoff_landings, update_logbook as logbook_update +from app.collect.logbook import update_max_altitudes as logbook_update_max_altitudes + +from app.collect.database import import_ddb as device_infos_import_ddb + +from app.collect.gateway import transfer_from_redis_to_database + +from app import db, celery + +@celery.task(name="transfer_to_database") +def transfer_to_database(): + """Transfer APRS data from Redis to database.""" + + result = transfer_from_redis_to_database() + return result + + +@celery.task(name="update_takeoff_landings") +def update_takeoff_landings(last_minutes): + """Compute takeoffs and landings.""" + + end = datetime.utcnow() + start = end - timedelta(minutes=last_minutes) + result = logbook_update_takeoff_landings(start=start, end=end) + return result + + +@celery.task(name="update_logbook") +def update_logbook(offset_days=None): + """Add/update logbook entries.""" + + result = logbook_update(offset_days=offset_days) + return result + + +@celery.task(name="update_logbook_max_altitude") +def update_logbook_max_altitude(offset_days=0): + """Add max altitudes in logbook when flight is complete (takeoff and landing).""" + + result = logbook_update_max_altitudes(offset_days=offset_days) + return result + + +@celery.task(name="import_ddb") +def import_ddb(): + """Import registered devices from the DDB.""" + + result = device_infos_import_ddb() + return result diff --git a/app/tasks/sql_tasks.py b/app/tasks/sql_tasks.py new file mode 100644 index 0000000..1cb2d2d --- /dev/null +++ b/app/tasks/sql_tasks.py @@ -0,0 +1,126 @@ +from datetime import datetime, timedelta + +from app import db, celery + + +@celery.task(name="update_statistics") +def update_statistics(date_str=None): + """ Update relation_statistics, sender_statistics, receiver_statistics (all depend on coverage_statistics).""" + + if date_str is None: + date_str = datetime.utcnow().strftime("%Y-%m-%d") + + # Update relation statistics + db.session.execute(f""" + DELETE FROM relation_statistics + WHERE date = '{date_str}'; + + INSERT INTO relation_statistics AS rs (date, sender_id, receiver_id, is_trustworthy, max_distance, max_normalized_quality, messages_count, coverages_count) + SELECT + tmp.date, + tmp.sender_id, + tmp.receiver_id, + + is_trustworthy, + + MAX(tmp.max_distance) AS max_distance, + MAX(tmp.max_normalized_quality) AS max_normalized_quality, + SUM(tmp.messages_count) AS messages_count, + COUNT(DISTINCT tmp.location_mgrs_short) AS coverages_count + FROM coverage_statistics AS tmp + WHERE tmp.date = '{date_str}' + GROUP BY date, sender_id, receiver_id, is_trustworthy; + """) + + # Update sender statistics + db.session.execute(f""" + DELETE FROM sender_statistics + WHERE date = '{date_str}'; + + INSERT INTO sender_statistics AS rs (date, sender_id, is_trustworthy, max_distance, max_normalized_quality, messages_count, coverages_count, receivers_count) + SELECT + tmp.date, + tmp.sender_id, + + is_trustworthy, + + MAX(tmp.max_distance) AS max_distance, + MAX(tmp.max_normalized_quality) AS max_normalized_quality, + SUM(tmp.messages_count) AS messages_count, + COUNT(DISTINCT tmp.location_mgrs_short) AS coverages_count, + COUNT(DISTINCT tmp.receiver_id) AS receivers_count + FROM coverage_statistics AS tmp + WHERE tmp.date = '{date_str}' + GROUP BY date, sender_id, is_trustworthy; + """) + + # Update receiver statistics + db.session.execute(f""" + DELETE FROM receiver_statistics + WHERE date = '{date_str}'; + + INSERT INTO receiver_statistics AS rs (date, receiver_id, is_trustworthy, max_distance, max_normalized_quality, messages_count, coverages_count, senders_count) + SELECT + tmp.date, + tmp.receiver_id, + + is_trustworthy, + + MAX(tmp.max_distance) AS max_distance, + MAX(tmp.max_normalized_quality) AS max_normalized_quality, + SUM(tmp.messages_count) AS messages_count, + COUNT(DISTINCT tmp.location_mgrs_short) AS coverages_count, + COUNT(DISTINCT tmp.sender_id) AS senders_count + FROM coverage_statistics AS tmp + WHERE tmp.date = '{date_str}' + GROUP BY date, receiver_id, is_trustworthy; + """) + + db.session.commit() + + +@celery.task(name="update_sender_direction_statistics") +def update_sender_direction_statistics(): + """ Update sender_direction_statistics.""" + + db.session.execute(""" + DELETE FROM sender_direction_statistics; + + INSERT INTO sender_direction_statistics(sender_id, receiver_id, directions_count, messages_count, direction_data) + SELECT + sq2.sender_id, + sq2.receiver_id, + COUNT(sq2.*) AS directions_count, + SUM(sq2.messages_count) AS messages_count, + json_agg(json_build_object('direction', direction, 'messages_count', messages_count, 'max_range', max_range)) AS direction_data + FROM ( + SELECT + sq.sender_id, + sq.receiver_id, + sq.direction, + COUNT(sq.*) AS messages_count, + MAX(sq.max_range) AS max_range + FROM ( + SELECT + s.id AS sender_id, + r.id AS receiver_id, + 10000 * 10^(sp.normalized_quality/20.0) AS max_range, + CASE + WHEN sp.bearing-sp.track < 0 + THEN CAST((sp.bearing-sp.track+360)/10 AS INTEGER)*10 + ELSE CAST((sp.bearing-sp.track)/10 AS INTEGER)*10 + END AS direction + FROM sender_positions AS sp + INNER JOIN senders s ON sp.name = s.name + INNER JOIN receivers r ON sp.receiver_name = r.name + WHERE + sp.track IS NOT NULL AND sp.bearing IS NOT NULL AND sp.normalized_quality IS NOT NULL + AND sp.agl >= 200 + AND turn_rate BETWEEN -10.0 AND 10.0 + AND climb_rate BETWEEN -3.0 AND 3.0 + ) AS sq + GROUP BY sq.sender_id, sq.receiver_id, sq.direction + ORDER BY sq.sender_id, sq.receiver_id, sq.direction + ) AS sq2 + GROUP BY sq2.sender_id, sq2.receiver_id; + """) diff --git a/app/templates/airports.html b/app/templates/airports.html index 912b46f..939c535 100644 --- a/app/templates/airports.html +++ b/app/templates/airports.html @@ -33,7 +33,7 @@ {{ loop.index }} {{ sel_country }} {{ airport|to_html_link|safe }} - Logbook + Logbook {% endfor %} diff --git a/app/templates/sender_detail.html b/app/templates/sender_detail.html index e1c8c27..b0c8352 100644 --- a/app/templates/sender_detail.html +++ b/app/templates/sender_detail.html @@ -74,7 +74,7 @@ {% for entry in sender.logbook_entries %} {{ loop.index }} - {% if ns.mydate != entry.reference.strftime('%Y-%m-%d') %}{% set ns.mydate = entry.reference.strftime('%Y-%m-%d') %}{{ ns.mydate }}{% endif %} + {% if ns.mydate != entry.reference_timestamp.strftime('%Y-%m-%d') %}{% set ns.mydate = entry.reference_timestamp.strftime('%Y-%m-%d') %}{{ ns.mydate }}{% endif %} {% if entry.takeoff_airport is not none %}{{ entry.takeoff_airport.name }}{% endif %} {% if entry.landing_airport is not none %}{{ entry.landing_airport.name }}{% endif %} {% if entry.takeoff_timestamp is not none %} {{ entry.takeoff_timestamp.strftime('%H:%M') }} {% endif %} diff --git a/celery_app.py b/celery_app.py new file mode 100644 index 0000000..3aeaa4c --- /dev/null +++ b/celery_app.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python + +from app import init_celery + +app = init_celery() +app.conf.imports = app.conf.imports + ("app.tasks",) diff --git a/celery_worker.py b/celery_worker.py deleted file mode 100644 index 155364e..0000000 --- a/celery_worker.py +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env python -import os -from app import celery, create_app -from app.collect.celery import * - -app = create_app(os.getenv('FLASK_CONFIG') or 'default') -app.app_context().push() diff --git a/setup.py b/setup.py index 0737b9b..48ce806 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ setup( 'Flask-WTF==0.14.3', 'Flask-Caching==1.9.0', 'geopy==2.0.0', - 'celery==5.0.2', + 'celery==4.4.7', 'Flask-Redis==0.4.0', 'redis==3.5.3', 'aerofiles==1.0.0', diff --git a/tests/collect/test_logbook.py b/tests/collect/test_logbook.py index 0630d4f..ec202b7 100644 --- a/tests/collect/test_logbook.py +++ b/tests/collect/test_logbook.py @@ -32,13 +32,13 @@ class TestLogbook(TestBaseDB): self.takeoff_ohlstadt_dd4711 = TakeoffLanding(is_takeoff=True, timestamp="2016-06-01 10:00:00", airport_id=self.ohlstadt.id, sender_id=self.dd4711.id) def get_logbook_entries(self): - return db.session.query(Logbook).order_by(Logbook.takeoff_airport_id, Logbook.reference).all() + return db.session.query(Logbook).order_by(Logbook.takeoff_airport_id, Logbook.reference_timestamp).all() def test_single_takeoff(self): db.session.add(self.takeoff_koenigsdorf_dd0815) db.session.commit() - update_logbook(date=datetime.date(2016, 6, 1)) + update_logbook() entries = self.get_logbook_entries() self.assertEqual(len(entries), 1) self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id) @@ -48,7 +48,7 @@ class TestLogbook(TestBaseDB): db.session.add(self.landing_koenigsdorf_dd0815) db.session.commit() - update_logbook(date=datetime.date(2016, 6, 1)) + update_logbook() entries = self.get_logbook_entries() self.assertEqual(len(entries), 1) self.assertEqual(entries[0].takeoff_airport_id, None) @@ -59,7 +59,7 @@ class TestLogbook(TestBaseDB): db.session.add(self.takeoff_ohlstadt_dd4711) db.session.commit() - update_logbook(date=datetime.date(2016, 6, 1)) + update_logbook() entries = self.get_logbook_entries() self.assertEqual(len(entries), 2) self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id) @@ -70,19 +70,19 @@ class TestLogbook(TestBaseDB): db.session.add(self.landing_koenigsdorf_dd0815) db.session.commit() - update_logbook(date=datetime.date(2016, 6, 1)) + update_logbook() entries = self.get_logbook_entries() self.assertEqual(len(entries), 1) self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id) self.assertEqual(entries[0].landing_airport_id, self.koenigsdorf.id) + def test_takeoff_and_landing_on_different_days(self): db.session.add(self.takeoff_koenigsdorf_dd0815) db.session.add(self.landing_koenigsdorf_dd0815_later) db.session.commit() - update_logbook(date=datetime.date(2016, 6, 1)) - update_logbook(date=datetime.date(2016, 6, 2)) + update_logbook() entries = self.get_logbook_entries() self.assertEqual(len(entries), 2) self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id) @@ -94,7 +94,7 @@ class TestLogbook(TestBaseDB): db.session.add(self.takeoff_koenigsdorf_dd0815) db.session.commit() - update_logbook(date=datetime.date(2016, 6, 1)) + update_logbook(0) entries = self.get_logbook_entries() self.assertEqual(len(entries), 1) self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id) @@ -102,7 +102,7 @@ class TestLogbook(TestBaseDB): db.session.add(self.landing_koenigsdorf_dd0815) db.session.commit() - update_logbook(date=datetime.date(2016, 6, 1)) + update_logbook() entries = self.get_logbook_entries() self.assertEqual(len(entries), 1) self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id) @@ -111,7 +111,7 @@ class TestLogbook(TestBaseDB): db.session.add(self.takeoff_ohlstadt_dd4711) db.session.commit() - update_logbook(date=datetime.date(2016, 6, 1)) + update_logbook(0) entries = self.get_logbook_entries() self.assertEqual(len(entries), 2) self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id) @@ -121,22 +121,22 @@ class TestLogbook(TestBaseDB): db.session.add(self.landing_koenigsdorf_dd0815) db.session.commit() - update_logbook(date=datetime.date(2016, 6, 1)) + update_logbook() entries = self.get_logbook_entries() self.assertEqual(len(entries), 1) self.assertEqual(entries[0].takeoff_airport_id, None) self.assertEqual(entries[0].landing_airport_id, self.koenigsdorf.id) - self.assertEqual(entries[0].reftime, self.landing_koenigsdorf_dd0815.timestamp) + self.assertEqual(entries[0].reference_timestamp, self.landing_koenigsdorf_dd0815.timestamp) db.session.add(self.takeoff_koenigsdorf_dd0815) db.session.commit() - update_logbook(date=datetime.date(2016, 6, 1)) + update_logbook() entries = self.get_logbook_entries() self.assertEqual(len(entries), 1) self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id) self.assertEqual(entries[0].landing_airport_id, self.koenigsdorf.id) - self.assertEqual(entries[0].reftime, self.takeoff_koenigsdorf_dd0815.timestamp) + self.assertEqual(entries[0].reference_timestamp, self.takeoff_koenigsdorf_dd0815.timestamp) if __name__ == "__main__":