kopia lustrzana https://github.com/glidernet/ogn-python
Update of tasks and their descriptions
rodzic
56ced0b3a5
commit
8180056604
25
README.md
25
README.md
|
@ -162,25 +162,22 @@ Commands:
|
||||||
|
|
||||||
Most commands are command groups, so if you execute this command you will get further (sub)commands.
|
Most commands are command groups, so if you execute this command you will get further (sub)commands.
|
||||||
|
|
||||||
### Available tasks (deprecated - needs rework)
|
### Available tasks
|
||||||
|
|
||||||
- `ogn.collect.database.import_ddb` - Import registered devices from the DDB.
|
- `ogn_python.collect.celery.update_takeoff_landings` - Compute takeoffs and landings.
|
||||||
- `ogn.collect.database.import_file` - Import registered devices from a local file.
|
- `ogn_python.collect.celery.update_logbook_entries` - Add/update logbook entries.
|
||||||
- `ogn.collect.database.update_country_code` - Update country code in receivers table if None.
|
- `ogn_python.collect.celery.update_logbook_max_altitude` - Add max altitudes in logbook when flight is complete (takeoff and landing).
|
||||||
- `ogn.collect.database.update_devices` - Add/update entries in devices table and update foreign keys in aircraft beacons.
|
- `ogn_python.collect.celery.import_ddb` - Import registered devices from the DDB.
|
||||||
- `ogn.collect.database.update_receivers` - Add/update_receivers entries in receiver table and update receivers foreign keys and distance in aircraft beacons and update foreign keys in receiver beacons.
|
- `ogn_python.collect.celery.update_receivers_country_code` - Update country code in receivers table if None.
|
||||||
- `ogn.collect.logbook.update_logbook` - Add/update logbook entries.
|
- `ogn_python.collect.celery.purge_old_data` - Delete AircraftBeacons and ReceiverBeacons older than given 'age'.
|
||||||
- `ogn.collect.logbook.update_max_altitude` - Add max altitudes in logbook when flight is complete (takeoff and landing).
|
- `ogn_python.collect.celery.update_stats` - Create stats and update receivers/devices with stats.
|
||||||
- `ogn.collect.stats.update_device_stats` - Add/update entries in device stats table.
|
|
||||||
- `ogn.collect.stats.update_receiver_stats` - Add/update entries in receiver stats table.
|
|
||||||
- `ogn.collect.takeoff_landing.update_takeoff_landing` - Compute takeoffs and landings.
|
|
||||||
|
|
||||||
If the task server is up and running, tasks could be started manually.
|
If the task server is up and running, tasks could be started manually. Here we compute takeoffs and landings for the past 90 minutes:
|
||||||
|
|
||||||
```
|
```
|
||||||
python3
|
python3
|
||||||
>>>from ogn.collect.database import import_ddb
|
>>>from ogn_python.collect.celery import update_takeoff_landings
|
||||||
>>>import_ddb.delay()
|
>>>update_takeoff_landings.delay(minutes=90)
|
||||||
```
|
```
|
||||||
|
|
||||||
## License
|
## License
|
||||||
|
|
|
@ -14,6 +14,7 @@ from ogn_python.collect.stats import create_device_stats, update_device_stats_ju
|
||||||
|
|
||||||
from ogn_python import db
|
from ogn_python import db
|
||||||
from ogn_python import celery
|
from ogn_python import celery
|
||||||
|
from ogn_python.model import receiver
|
||||||
|
|
||||||
|
|
||||||
logger = get_task_logger(__name__)
|
logger = get_task_logger(__name__)
|
||||||
|
@ -30,19 +31,20 @@ def update_takeoff_landings(last_minutes):
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name='update_logbook_entries')
|
@celery.task(name='update_logbook_entries')
|
||||||
def update_logbook_entries():
|
def update_logbook_entries(day_offset):
|
||||||
"""Add/update logbook entries."""
|
"""Add/update logbook entries."""
|
||||||
|
|
||||||
today = datetime.datetime.today()
|
date = datetime.datetime.today() + datetime.timedelta(days=day_offset)
|
||||||
result = logbook_update_entries(session=db.session, date=today, logger=logger)
|
result = logbook_update_entries(session=db.session, date=date, logger=logger)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name='update_logbook_max_altitude')
|
@celery.task(name='update_logbook_max_altitude')
|
||||||
def update_logbook_max_altitude():
|
def update_logbook_max_altitude(day_offset):
|
||||||
"""Add max altitudes in logbook when flight is complete (takeoff and landing)."""
|
"""Add max altitudes in logbook when flight is complete (takeoff and landing)."""
|
||||||
|
|
||||||
result = logbook_update_max_altitudes(session=db.session, logger=logger)
|
date = datetime.datetime.today() + datetime.timedelta(days=day_offset)
|
||||||
|
result = logbook_update_max_altitudes(session=db.session, date=date, logger=logger)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@ -68,16 +70,19 @@ def purge_old_data(max_hours):
|
||||||
|
|
||||||
from ogn_python.model import AircraftBeacon, ReceiverBeacon
|
from ogn_python.model import AircraftBeacon, ReceiverBeacon
|
||||||
min_timestamp = datetime.datetime.utcnow() - datetime.timedelta(hours=max_hours)
|
min_timestamp = datetime.datetime.utcnow() - datetime.timedelta(hours=max_hours)
|
||||||
db.session.query(AircraftBeacon) \
|
aircraft_beacons_deleted = db.session.query(AircraftBeacon) \
|
||||||
.filter(AircraftBeacon.timestamp < min_timestamp) \
|
.filter(AircraftBeacon.timestamp < min_timestamp) \
|
||||||
.delete()
|
.delete()
|
||||||
|
|
||||||
db.session.query(ReceiverBeacon) \
|
receiver_beacons_deleted = db.session.query(ReceiverBeacon) \
|
||||||
.filter(ReceiverBeacon.timestamp < min_timestamp) \
|
.filter(ReceiverBeacon.timestamp < min_timestamp) \
|
||||||
.delete()
|
.delete()
|
||||||
|
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
result = "{} AircraftBeacons deleted, {} ReceiverBeacons deleted".format(aircraft_beacons_deleted, receiver_beacons_deleted)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name='update_stats')
|
@celery.task(name='update_stats')
|
||||||
def update_stats(day_offset):
|
def update_stats(day_offset):
|
||||||
|
|
|
@ -17,21 +17,11 @@ def update_entries(session, date, logger=None):
|
||||||
logger.info("Compute logbook.")
|
logger.info("Compute logbook.")
|
||||||
|
|
||||||
# limit time range to given date and set window partition and window order
|
# limit time range to given date and set window partition and window order
|
||||||
if date is not None:
|
(start, end) = date_to_timestamps(date)
|
||||||
(start, end) = date_to_timestamps(date)
|
pa = (TakeoffLanding.device_id)
|
||||||
filters = [between(TakeoffLanding.timestamp, start, end)]
|
wo = and_(TakeoffLanding.device_id,
|
||||||
pa = (TakeoffLanding.device_id)
|
TakeoffLanding.airport_id,
|
||||||
wo = and_(TakeoffLanding.device_id,
|
TakeoffLanding.timestamp)
|
||||||
TakeoffLanding.airport_id,
|
|
||||||
TakeoffLanding.timestamp)
|
|
||||||
else:
|
|
||||||
filters = []
|
|
||||||
pa = (func.date(TakeoffLanding.timestamp),
|
|
||||||
TakeoffLanding.device_id)
|
|
||||||
wo = and_(func.date(TakeoffLanding.timestamp),
|
|
||||||
TakeoffLanding.device_id,
|
|
||||||
TakeoffLanding.airport_id,
|
|
||||||
TakeoffLanding.timestamp)
|
|
||||||
|
|
||||||
# make a query with current, previous and next "takeoff_landing" event, so we can find complete flights
|
# make a query with current, previous and next "takeoff_landing" event, so we can find complete flights
|
||||||
sq = session.query(
|
sq = session.query(
|
||||||
|
@ -50,7 +40,7 @@ def update_entries(session, date, logger=None):
|
||||||
TakeoffLanding.airport_id,
|
TakeoffLanding.airport_id,
|
||||||
func.lag(TakeoffLanding.airport_id).over(partition_by=pa, order_by=wo).label('airport_id_prev'),
|
func.lag(TakeoffLanding.airport_id).over(partition_by=pa, order_by=wo).label('airport_id_prev'),
|
||||||
func.lead(TakeoffLanding.airport_id).over(partition_by=pa, order_by=wo).label('airport_id_next')) \
|
func.lead(TakeoffLanding.airport_id).over(partition_by=pa, order_by=wo).label('airport_id_next')) \
|
||||||
.filter(*filters) \
|
.filter(between(TakeoffLanding.timestamp, start, end)) \
|
||||||
.subquery()
|
.subquery()
|
||||||
|
|
||||||
# find complete flights
|
# find complete flights
|
||||||
|
@ -143,7 +133,7 @@ def update_entries(session, date, logger=None):
|
||||||
return finish_message
|
return finish_message
|
||||||
|
|
||||||
|
|
||||||
def update_max_altitudes(session, logger=None):
|
def update_max_altitudes(session, date, logger=None):
|
||||||
"""Add max altitudes in logbook when flight is complete (takeoff and landing)."""
|
"""Add max altitudes in logbook when flight is complete (takeoff and landing)."""
|
||||||
|
|
||||||
if logger is None:
|
if logger is None:
|
||||||
|
@ -154,9 +144,11 @@ def update_max_altitudes(session, logger=None):
|
||||||
if session is None:
|
if session is None:
|
||||||
session = app.session
|
session = app.session
|
||||||
|
|
||||||
|
(start, end) = date_to_timestamps(date)
|
||||||
|
|
||||||
logbook_entries = session.query(Logbook.id) \
|
logbook_entries = session.query(Logbook.id) \
|
||||||
.filter(and_(Logbook.takeoff_timestamp != null(), Logbook.landing_timestamp != null(), Logbook.max_altitude == null())) \
|
.filter(and_(Logbook.takeoff_timestamp != null(), Logbook.landing_timestamp != null(), Logbook.max_altitude == null())) \
|
||||||
.limit(1000) \
|
.filter(between(Logbook.reftime, start, end)) \
|
||||||
.subquery()
|
.subquery()
|
||||||
|
|
||||||
max_altitudes = session.query(Logbook.id, func.max(AircraftBeacon.altitude).label('max_altitude')) \
|
max_altitudes = session.query(Logbook.id, func.max(AircraftBeacon.altitude).label('max_altitude')) \
|
||||||
|
|
|
@ -35,7 +35,7 @@ CELERYBEAT_SCHEDULE = {
|
||||||
},
|
},
|
||||||
'update-max-altitudes': {
|
'update-max-altitudes': {
|
||||||
'task': 'update_logbook_max_altitude',
|
'task': 'update_logbook_max_altitude',
|
||||||
'schedule': timedelta(minutes=1),
|
'schedule': timedelta(hours=1),
|
||||||
},
|
},
|
||||||
'update-stats-hourly': {
|
'update-stats-hourly': {
|
||||||
'task': 'update_stats',
|
'task': 'update_stats',
|
||||||
|
|
Ładowanie…
Reference in New Issue