kopia lustrzana https://github.com/glidernet/ogn-python
Refactoring
rodzic
d22d9209ac
commit
cb02e35611
|
@ -1,8 +1,7 @@
|
|||
from celery.utils.log import get_task_logger
|
||||
|
||||
from sqlalchemy import insert, distinct
|
||||
from sqlalchemy.sql import null, and_, or_, func, not_
|
||||
from sqlalchemy.sql.expression import case
|
||||
from sqlalchemy.sql import null, and_, func, not_
|
||||
|
||||
from ogn.collect.celery import app
|
||||
from ogn.model import Country, DeviceInfo, DeviceInfoOrigin, AircraftBeacon, ReceiverBeacon, Device, Receiver
|
||||
|
|
|
@ -1,17 +1,18 @@
|
|||
from celery.utils.log import get_task_logger
|
||||
|
||||
from sqlalchemy import and_, or_, insert, update, exists
|
||||
from sqlalchemy import and_, or_, insert, update, exists, between
|
||||
from sqlalchemy.sql import func, null
|
||||
from sqlalchemy.sql.expression import true, false
|
||||
|
||||
from ogn.collect.celery import app
|
||||
from ogn.model import TakeoffLanding, Logbook, AircraftBeacon
|
||||
from ogn.utils import date_to_timestamps
|
||||
|
||||
logger = get_task_logger(__name__)
|
||||
|
||||
|
||||
@app.task
|
||||
def update_logbook(session=None):
|
||||
def update_logbook(session=None, date=None):
|
||||
"""Add/update logbook entries."""
|
||||
|
||||
logger.info("Compute logbook.")
|
||||
|
@ -24,11 +25,13 @@ def update_logbook(session=None):
|
|||
TakeoffLanding.device_id,
|
||||
TakeoffLanding.airport_id,
|
||||
TakeoffLanding.timestamp)
|
||||
|
||||
|
||||
# 'pa' is the window partition for the sql window function
|
||||
pa = (func.date(TakeoffLanding.timestamp),
|
||||
TakeoffLanding.device_id)
|
||||
|
||||
(start, end) = date_to_timestamps(date)
|
||||
|
||||
# make a query with current, previous and next "takeoff_landing" event, so we can find complete flights
|
||||
sq = session.query(
|
||||
TakeoffLanding.device_id,
|
||||
|
@ -46,6 +49,7 @@ def update_logbook(session=None):
|
|||
TakeoffLanding.airport_id,
|
||||
func.lag(TakeoffLanding.airport_id).over(order_by=wo).label('airport_id_prev'),
|
||||
func.lead(TakeoffLanding.airport_id).over(order_by=wo).label('airport_id_next')) \
|
||||
.filter(between(TakeoffLanding.timestamp, start, end)) \
|
||||
.subquery()
|
||||
|
||||
# find complete flights (with takeoff and landing on the same day)
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
from celery.utils.log import get_task_logger
|
||||
|
||||
from sqlalchemy import String
|
||||
from sqlalchemy import and_, or_, insert, update, exists
|
||||
from sqlalchemy import and_, insert, update, exists
|
||||
from sqlalchemy.sql import func, null
|
||||
from sqlalchemy.sql.expression import true, false
|
||||
|
||||
from ogn.collect.celery import app
|
||||
from ogn.model import AircraftBeacon, ReceiverCoverage
|
||||
|
|
|
@ -1,15 +1,14 @@
|
|||
from datetime import datetime
|
||||
|
||||
from celery.utils.log import get_task_logger
|
||||
|
||||
from sqlalchemy import insert, distinct
|
||||
from sqlalchemy import insert, distinct, between
|
||||
from sqlalchemy.sql import null, and_, func, or_, update
|
||||
from sqlalchemy.sql.expression import literal_column, case
|
||||
from sqlalchemy.sql.expression import case
|
||||
|
||||
from ogn.model import AircraftBeacon, DeviceStats, ReceiverStats, RelationStats, Receiver, Device
|
||||
|
||||
from .celery import app
|
||||
from ogn.model.receiver_beacon import ReceiverBeacon
|
||||
from ogn.utils import date_to_timestamps
|
||||
|
||||
logger = get_task_logger(__name__)
|
||||
|
||||
|
@ -24,6 +23,8 @@ def create_device_stats(session=None, date=None):
|
|||
if not date:
|
||||
logger.warn("A date is needed for calculating stats. Exiting")
|
||||
return None
|
||||
else:
|
||||
(start, end) = date_to_timestamps(date)
|
||||
|
||||
# First kill the stats for the selected date
|
||||
deleted_counter = session.query(DeviceStats) \
|
||||
|
@ -35,7 +36,7 @@ def create_device_stats(session=None, date=None):
|
|||
func.dense_rank()
|
||||
.over(partition_by=AircraftBeacon.device_id, order_by=AircraftBeacon.receiver_id)
|
||||
.label('dr')) \
|
||||
.filter(and_(func.date(AircraftBeacon.timestamp) == date, AircraftBeacon.device_id != null())) \
|
||||
.filter(and_(between(AircraftBeacon.timestamp, start, end), AircraftBeacon.device_id != null())) \
|
||||
.filter(or_(AircraftBeacon.error_count == 0, AircraftBeacon.error_count == null())) \
|
||||
.subquery()
|
||||
|
||||
|
@ -49,7 +50,7 @@ def create_device_stats(session=None, date=None):
|
|||
func.max(sq.c.altitude)
|
||||
.over(partition_by=sq.c.device_id)
|
||||
.label('max_altitude'),
|
||||
func.count(sq.c.id)
|
||||
func.count(sq.c.device_id)
|
||||
.over(partition_by=sq.c.device_id)
|
||||
.label('aircraft_beacon_count'),
|
||||
func.first_value(sq.c.timestamp)
|
||||
|
@ -98,6 +99,8 @@ def create_receiver_stats(session=None, date=None):
|
|||
if not date:
|
||||
logger.warn("A date is needed for calculating stats. Exiting")
|
||||
return None
|
||||
else:
|
||||
(start, end) = date_to_timestamps(date)
|
||||
|
||||
# First kill the stats for the selected date
|
||||
deleted_counter = session.query(ReceiverStats) \
|
||||
|
@ -106,7 +109,7 @@ def create_receiver_stats(session=None, date=None):
|
|||
|
||||
# Select one day
|
||||
sq = session.query(ReceiverBeacon) \
|
||||
.filter(func.date(ReceiverBeacon.timestamp) == date) \
|
||||
.filter(between(ReceiverBeacon.timestamp, start, end)) \
|
||||
.subquery()
|
||||
|
||||
# Calculate stats, firstseen, lastseen and last values != NULL
|
||||
|
@ -132,7 +135,7 @@ def create_receiver_stats(session=None, date=None):
|
|||
.over(partition_by=sq.c.receiver_id, order_by=case([(sq.c.platform == null(), None)], else_=sq.c.timestamp).desc().nullslast())
|
||||
.label('platform')) \
|
||||
.subquery()
|
||||
|
||||
|
||||
# And insert them
|
||||
ins = insert(ReceiverStats).from_select(
|
||||
[ReceiverStats.receiver_id, ReceiverStats.date, ReceiverStats.firstseen, ReceiverStats.lastseen, ReceiverStats.location_wkt, ReceiverStats.altitude, ReceiverStats.version, ReceiverStats.platform],
|
||||
|
@ -145,27 +148,27 @@ def create_receiver_stats(session=None, date=None):
|
|||
# Update aircraft_beacon_count, aircraft_count and max_distance (without any error and max quality of 36dB@10km which is enough for 640km ... )
|
||||
aircraft_beacon_stats = session.query(func.date(AircraftBeacon.timestamp).label('date'),
|
||||
AircraftBeacon.receiver_id,
|
||||
func.count(AircraftBeacon.id).label('aircraft_beacon_count'),
|
||||
func.count(AircraftBeacon.timestamp).label('aircraft_beacon_count'),
|
||||
func.count(func.distinct(AircraftBeacon.device_id)).label('aircraft_count'),
|
||||
func.max(AircraftBeacon.distance).label('max_distance')) \
|
||||
.filter(and_(func.date(AircraftBeacon.timestamp) == date,
|
||||
.filter(and_(between(AircraftBeacon.timestamp, start, end),
|
||||
AircraftBeacon.error_count == 0,
|
||||
AircraftBeacon.quality <= 40)) \
|
||||
.group_by(func.date(AircraftBeacon.timestamp),
|
||||
AircraftBeacon.receiver_id) \
|
||||
.subquery()
|
||||
|
||||
|
||||
upd = update(ReceiverStats) \
|
||||
.where(and_(ReceiverStats.date == aircraft_beacon_stats.c.date,
|
||||
ReceiverStats.receiver_id == aircraft_beacon_stats.c.receiver_id)) \
|
||||
.values({'aircraft_beacon_count': aircraft_beacon_stats.c.aircraft_beacon_count,
|
||||
'aircraft_count': aircraft_beacon_stats.c.aircraft_count,
|
||||
'max_distance': aircraft_beacon_stats.c.max_distance})
|
||||
|
||||
|
||||
result = session.execute(upd)
|
||||
update_counter = result.rowcount
|
||||
session.commit()
|
||||
logger.warn("Updated {} ReceiverStats".format(update_counter))
|
||||
logger.warn("Updated {} ReceiverStats".format(update_counter))
|
||||
|
||||
return "ReceiverStats for {}: {} deleted, {} inserted, {} updated".format(date, deleted_counter, insert_counter, update_counter)
|
||||
|
||||
|
@ -178,8 +181,10 @@ def update_device_stats_jumps(session=None, date=None):
|
|||
session = app.session
|
||||
|
||||
if not date:
|
||||
logger.warn("A date is needed for calculating device stats jumps. Exiting")
|
||||
logger.warn("A date is needed for calculating stats. Exiting")
|
||||
return None
|
||||
else:
|
||||
(start, end) = date_to_timestamps(date)
|
||||
|
||||
sq = session.query(AircraftBeacon.device_id,
|
||||
AircraftBeacon.timestamp.label('t0'),
|
||||
|
@ -188,10 +193,10 @@ def update_device_stats_jumps(session=None, date=None):
|
|||
func.lead(AircraftBeacon.location_wkt).over(partition_by=AircraftBeacon.device_id, order_by=AircraftBeacon.timestamp).label('l1'),
|
||||
AircraftBeacon.altitude.label('a0'),
|
||||
func.lead(AircraftBeacon.altitude).over(partition_by=AircraftBeacon.device_id, order_by=AircraftBeacon.timestamp).label('a1')) \
|
||||
.filter(and_(func.date(AircraftBeacon.timestamp) == date,
|
||||
.filter(and_(between(AircraftBeacon.timestamp, start, end),
|
||||
AircraftBeacon.error_count == 0)) \
|
||||
.subquery()
|
||||
|
||||
|
||||
sq2 = session.query(sq.c.device_id,
|
||||
(func.st_distancesphere(sq.c.l1, sq.c.l0) / (func.extract('epoch', sq.c.t1) - func.extract('epoch', sq.c.t0))).label('horizontal_speed'),
|
||||
((sq.c.a1 - sq.c.a0) / (func.extract('epoch', sq.c.t1) - func.extract('epoch', sq.c.t0))).label('vertical_speed')) \
|
||||
|
@ -199,22 +204,22 @@ def update_device_stats_jumps(session=None, date=None):
|
|||
sq.c.t1 != null(),
|
||||
sq.c.t0 < sq.c.t1)) \
|
||||
.subquery()
|
||||
|
||||
|
||||
sq3 = session.query(sq2.c.device_id,
|
||||
case([(or_(func.abs(sq2.c.horizontal_speed) > 1000, func.abs(sq2.c.vertical_speed) > 100), 1)], else_=0).label('jump')) \
|
||||
.subquery()
|
||||
|
||||
|
||||
sq4 = session.query(sq3.c.device_id,
|
||||
func.sum(sq3.c.jump).label('jumps')) \
|
||||
.group_by(sq3.c.device_id) \
|
||||
.subquery()
|
||||
|
||||
|
||||
upd = update(DeviceStats) \
|
||||
.where(and_(DeviceStats.date == date,
|
||||
DeviceStats.device_id == sq4.c.device_id)) \
|
||||
.values({'ambiguous': sq4.c.jumps > 10,
|
||||
'jumps': sq4.c.jumps})
|
||||
|
||||
|
||||
result = session.execute(upd)
|
||||
update_counter = result.rowcount
|
||||
session.commit()
|
||||
|
@ -222,6 +227,7 @@ def update_device_stats_jumps(session=None, date=None):
|
|||
|
||||
return "DeviceStats jumps for {}: {} updated".format(date, update_counter)
|
||||
|
||||
|
||||
@app.task
|
||||
def create_relation_stats(session=None, date=None):
|
||||
"""Add/update relation stats."""
|
||||
|
@ -232,6 +238,8 @@ def create_relation_stats(session=None, date=None):
|
|||
if not date:
|
||||
logger.warn("A date is needed for calculating stats. Exiting")
|
||||
return None
|
||||
else:
|
||||
(start, end) = date_to_timestamps(date)
|
||||
|
||||
# First kill the stats for the selected date
|
||||
deleted_counter = session.query(RelationStats) \
|
||||
|
@ -244,9 +252,9 @@ def create_relation_stats(session=None, date=None):
|
|||
AircraftBeacon.device_id,
|
||||
AircraftBeacon.receiver_id,
|
||||
func.max(AircraftBeacon.quality),
|
||||
func.count(AircraftBeacon.id)
|
||||
) \
|
||||
.filter(and_(func.date(AircraftBeacon.timestamp) == date,
|
||||
func.count(AircraftBeacon.timestamp)
|
||||
) \
|
||||
.filter(and_(between(AircraftBeacon.timestamp, start, end),
|
||||
AircraftBeacon.distance > 1000,
|
||||
AircraftBeacon.error_count == 0,
|
||||
AircraftBeacon.quality <= 40,
|
||||
|
@ -265,104 +273,105 @@ def create_relation_stats(session=None, date=None):
|
|||
|
||||
return "RelationStats for {}: {} deleted, {} inserted".format(date, deleted_counter, insert_counter)
|
||||
|
||||
|
||||
@app.task
|
||||
def update_qualities(session=None, date=None):
|
||||
"""Calculate relative qualities of receivers and devices."""
|
||||
|
||||
|
||||
if session is None:
|
||||
session = app.session
|
||||
|
||||
|
||||
if not date:
|
||||
logger.warn("A date is needed for update stats. Exiting")
|
||||
logger.warn("A date is needed for calculating stats. Exiting")
|
||||
return None
|
||||
|
||||
# Calculate avg quality of devices
|
||||
dev_sq = session.query(RelationStats.date,
|
||||
RelationStats.device_id,
|
||||
func.avg(RelationStats.quality).label('quality')) \
|
||||
RelationStats.device_id,
|
||||
func.avg(RelationStats.quality).label('quality')) \
|
||||
.filter(RelationStats.date == date) \
|
||||
.group_by(RelationStats.date,
|
||||
RelationStats.device_id) \
|
||||
.subquery()
|
||||
|
||||
|
||||
dev_upd = update(DeviceStats) \
|
||||
.where(and_(DeviceStats.date == dev_sq.c.date,
|
||||
DeviceStats.device_id == dev_sq.c.device_id)) \
|
||||
.values({'quality': dev_sq.c.quality})
|
||||
|
||||
|
||||
dev_result = session.execute(dev_upd)
|
||||
dev_update_counter = dev_result.rowcount
|
||||
session.commit()
|
||||
logger.warn("Updated {} DeviceStats: quality".format(dev_update_counter))
|
||||
logger.warn("Updated {} DeviceStats: quality".format(dev_update_counter))
|
||||
|
||||
# Calculate avg quality of receivers
|
||||
rec_sq = session.query(RelationStats.date,
|
||||
RelationStats.receiver_id,
|
||||
func.avg(RelationStats.quality).label('quality')) \
|
||||
RelationStats.receiver_id,
|
||||
func.avg(RelationStats.quality).label('quality')) \
|
||||
.filter(RelationStats.date == date) \
|
||||
.group_by(RelationStats.date,
|
||||
RelationStats.receiver_id) \
|
||||
.subquery()
|
||||
|
||||
|
||||
rec_upd = update(ReceiverStats) \
|
||||
.where(and_(ReceiverStats.date == rec_sq.c.date,
|
||||
ReceiverStats.receiver_id == rec_sq.c.receiver_id)) \
|
||||
.values({'quality': rec_sq.c.quality})
|
||||
|
||||
|
||||
rec_result = session.execute(rec_upd)
|
||||
rec_update_counter = rec_result.rowcount
|
||||
session.commit()
|
||||
logger.warn("Updated {} ReceiverStats: quality".format(rec_update_counter))
|
||||
|
||||
logger.warn("Updated {} ReceiverStats: quality".format(rec_update_counter))
|
||||
|
||||
# Calculate quality_offset of devices
|
||||
dev_sq = session.query(RelationStats.date,
|
||||
RelationStats.device_id,
|
||||
(func.sum(RelationStats.beacon_count*(RelationStats.quality - ReceiverStats.quality))/(func.sum(RelationStats.beacon_count))).label('quality_offset')) \
|
||||
(func.sum(RelationStats.beacon_count * (RelationStats.quality - ReceiverStats.quality)) / (func.sum(RelationStats.beacon_count))).label('quality_offset')) \
|
||||
.filter(RelationStats.date == date) \
|
||||
.filter(and_(RelationStats.receiver_id == ReceiverStats.receiver_id,
|
||||
RelationStats.date == ReceiverStats.date)) \
|
||||
.group_by(RelationStats.date,
|
||||
RelationStats.device_id) \
|
||||
.subquery()
|
||||
|
||||
|
||||
dev_upd = update(DeviceStats) \
|
||||
.where(and_(DeviceStats.date == dev_sq.c.date,
|
||||
DeviceStats.device_id == dev_sq.c.device_id)) \
|
||||
.values({'quality_offset': dev_sq.c.quality_offset})
|
||||
|
||||
|
||||
dev_result = session.execute(dev_upd)
|
||||
dev_update_counter = dev_result.rowcount
|
||||
session.commit()
|
||||
logger.warn("Updated {} DeviceStats: quality_offset".format(dev_update_counter))
|
||||
|
||||
logger.warn("Updated {} DeviceStats: quality_offset".format(dev_update_counter))
|
||||
|
||||
# Calculate quality_offset of receivers
|
||||
rec_sq = session.query(RelationStats.date,
|
||||
RelationStats.receiver_id,
|
||||
(func.sum(RelationStats.beacon_count*(RelationStats.quality - DeviceStats.quality))/(func.sum(RelationStats.beacon_count))).label('quality_offset')) \
|
||||
(func.sum(RelationStats.beacon_count * (RelationStats.quality - DeviceStats.quality)) / (func.sum(RelationStats.beacon_count))).label('quality_offset')) \
|
||||
.filter(RelationStats.date == date) \
|
||||
.filter(and_(RelationStats.device_id == DeviceStats.device_id,
|
||||
RelationStats.date == DeviceStats.date)) \
|
||||
.group_by(RelationStats.date,
|
||||
RelationStats.receiver_id) \
|
||||
.subquery()
|
||||
|
||||
|
||||
rec_upd = update(ReceiverStats) \
|
||||
.where(and_(ReceiverStats.date == rec_sq.c.date,
|
||||
ReceiverStats.receiver_id == rec_sq.c.receiver_id)) \
|
||||
.values({'quality_offset': rec_sq.c.quality_offset})
|
||||
|
||||
|
||||
rec_result = session.execute(rec_upd)
|
||||
rec_update_counter = rec_result.rowcount
|
||||
session.commit()
|
||||
logger.warn("Updated {} ReceiverStats: quality_offset".format(rec_update_counter))
|
||||
|
||||
logger.warn("Updated {} ReceiverStats: quality_offset".format(rec_update_counter))
|
||||
|
||||
return "Updated {} DeviceStats and {} ReceiverStats".format(dev_update_counter, rec_update_counter)
|
||||
|
||||
|
||||
@app.task
|
||||
def update_receivers_stats(session=None):
|
||||
def update_receivers(session=None):
|
||||
"""Update receivers with stats."""
|
||||
|
||||
|
||||
if session is None:
|
||||
session = app.session
|
||||
|
||||
|
@ -388,7 +397,7 @@ def update_receivers_stats(session=None):
|
|||
.label('platform')) \
|
||||
.order_by(ReceiverStats.receiver_id) \
|
||||
.subquery()
|
||||
|
||||
|
||||
upd = update(Receiver) \
|
||||
.where(and_(Receiver.id == receiver_stats.c.receiver_id)) \
|
||||
.values({'firstseen': receiver_stats.c.firstseen,
|
||||
|
@ -397,18 +406,19 @@ def update_receivers_stats(session=None):
|
|||
'altitude': receiver_stats.c.altitude,
|
||||
'version': receiver_stats.c.version,
|
||||
'platform': receiver_stats.c.platform})
|
||||
|
||||
|
||||
result = session.execute(upd)
|
||||
update_counter = result.rowcount
|
||||
session.commit()
|
||||
logger.warn("Updated {} Receivers".format(update_counter))
|
||||
|
||||
logger.warn("Updated {} Receivers".format(update_counter))
|
||||
|
||||
return "Updated {} Receivers".format(update_counter)
|
||||
|
||||
|
||||
@app.task
|
||||
def update_devices_stats(session=None):
|
||||
def update_devices(session=None):
|
||||
"""Update devices with stats."""
|
||||
|
||||
|
||||
if session is None:
|
||||
session = app.session
|
||||
|
||||
|
@ -437,7 +447,7 @@ def update_devices_stats(session=None):
|
|||
.label('real_address')) \
|
||||
.order_by(DeviceStats.device_id) \
|
||||
.subquery()
|
||||
|
||||
|
||||
upd = update(Device) \
|
||||
.where(and_(Device.id == device_stats.c.device_id)) \
|
||||
.values({'firstseen': device_stats.c.firstseen,
|
||||
|
@ -447,10 +457,10 @@ def update_devices_stats(session=None):
|
|||
'software_version': device_stats.c.software_version,
|
||||
'hardware_version': device_stats.c.hardware_version,
|
||||
'real_address': device_stats.c.real_address})
|
||||
|
||||
|
||||
result = session.execute(upd)
|
||||
update_counter = result.rowcount
|
||||
session.commit()
|
||||
logger.warn("Updated {} Devices".format(update_counter))
|
||||
|
||||
logger.warn("Updated {} Devices".format(update_counter))
|
||||
|
||||
return "Updated {} Devices".format(update_counter)
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
from datetime import datetime, timedelta
|
||||
from datetime import timedelta
|
||||
|
||||
from celery.utils.log import get_task_logger
|
||||
|
||||
from sqlalchemy import and_, or_, insert, update, between, exists
|
||||
from sqlalchemy import and_, or_, insert, between, exists
|
||||
from sqlalchemy.sql import func, null
|
||||
from sqlalchemy.sql.expression import case
|
||||
|
||||
from ogn.collect.celery import app
|
||||
from ogn.model import AircraftBeacon, TakeoffLanding, Airport
|
||||
from ogn.utils import date_to_timestamps
|
||||
|
||||
logger = get_task_logger(__name__)
|
||||
|
||||
|
@ -15,7 +16,7 @@ logger = get_task_logger(__name__)
|
|||
@app.task
|
||||
def update_takeoff_landings(session=None, date=None):
|
||||
"""Compute takeoffs and landings."""
|
||||
|
||||
|
||||
logger.info("Compute takeoffs and landings.")
|
||||
|
||||
if session is None:
|
||||
|
@ -27,104 +28,100 @@ def update_takeoff_landings(session=None, date=None):
|
|||
logger.warn("Cannot calculate takeoff and landings without any airport! Please import airports first.")
|
||||
return
|
||||
|
||||
# takeoff / landing detection is based on 3 consecutive points
|
||||
# takeoff / landing detection is based on 3 consecutive points all below a certain altitude AGL
|
||||
takeoff_speed = 55 # takeoff detection: 1st point below, 2nd and 3rd above this limit
|
||||
landing_speed = 40 # landing detection: 1st point above, 2nd and 3rd below this limit
|
||||
duration = 100 # the points must not exceed this duration
|
||||
radius = 5000 # the points must not exceed this radius around the 2nd point
|
||||
max_agl = 100 # takeoff / landing must not exceed this altitude AGL
|
||||
|
||||
# takeoff / landing has to be near an airport
|
||||
airport_radius = 2500 # takeoff / landing must not exceed this radius around the airport
|
||||
airport_delta = 100 # takeoff / landing must not exceed this altitude offset above/below the airport
|
||||
# calculate from - to timestamps
|
||||
(start, end) = date_to_timestamps(date)
|
||||
|
||||
# 'wo' is the window order for the sql window function
|
||||
wo = and_(func.date(AircraftBeacon.timestamp),
|
||||
AircraftBeacon.device_id,
|
||||
AircraftBeacon.timestamp)
|
||||
|
||||
# get beacons for selected day and filter out duplicates (e.g. from multiple receivers)
|
||||
sq = session.query(AircraftBeacon.id,
|
||||
func.row_number().over(partition_by=(func.date(AircraftBeacon.timestamp),
|
||||
AircraftBeacon.device_id,
|
||||
AircraftBeacon.timestamp),
|
||||
order_by=AircraftBeacon.error_count).label('row')) \
|
||||
.filter(func.date(AircraftBeacon.timestamp) == date) \
|
||||
# get beacons for selected day, one per device_id and timestamp
|
||||
sq = session.query(AircraftBeacon) \
|
||||
.distinct(AircraftBeacon.device_id, AircraftBeacon.timestamp) \
|
||||
.order_by(AircraftBeacon.device_id, AircraftBeacon.timestamp, AircraftBeacon.error_count) \
|
||||
.filter(AircraftBeacon.agl < max_agl) \
|
||||
.filter(between(AircraftBeacon.timestamp, start, end)) \
|
||||
.subquery()
|
||||
|
||||
sq2 = session.query(sq.c.id) \
|
||||
.filter(sq.c.row == 1) \
|
||||
.subquery()
|
||||
|
||||
|
||||
# make a query with current, previous and next position
|
||||
sq3 = session.query(
|
||||
AircraftBeacon.device_id,
|
||||
func.lag(AircraftBeacon.device_id).over(order_by=wo).label('device_id_prev'),
|
||||
func.lead(AircraftBeacon.device_id).over(order_by=wo).label('device_id_next'),
|
||||
AircraftBeacon.timestamp,
|
||||
func.lag(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_prev'),
|
||||
func.lead(AircraftBeacon.timestamp).over(order_by=wo).label('timestamp_next'),
|
||||
AircraftBeacon.location_wkt,
|
||||
func.lag(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_prev'),
|
||||
func.lead(AircraftBeacon.location_wkt).over(order_by=wo).label('location_wkt_next'),
|
||||
AircraftBeacon.track,
|
||||
func.lag(AircraftBeacon.track).over(order_by=wo).label('track_prev'),
|
||||
func.lead(AircraftBeacon.track).over(order_by=wo).label('track_next'),
|
||||
AircraftBeacon.ground_speed,
|
||||
func.lag(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_prev'),
|
||||
func.lead(AircraftBeacon.ground_speed).over(order_by=wo).label('ground_speed_next'),
|
||||
AircraftBeacon.altitude,
|
||||
func.lag(AircraftBeacon.altitude).over(order_by=wo).label('altitude_prev'),
|
||||
func.lead(AircraftBeacon.altitude).over(order_by=wo).label('altitude_next')) \
|
||||
.filter(AircraftBeacon.id == sq2.c.id) \
|
||||
sq2 = session.query(
|
||||
sq.c.device_id,
|
||||
func.lag(sq.c.device_id).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('device_id_prev'),
|
||||
func.lead(sq.c.device_id).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('device_id_next'),
|
||||
sq.c.timestamp,
|
||||
func.lag(sq.c.timestamp).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('timestamp_prev'),
|
||||
func.lead(sq.c.timestamp).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('timestamp_next'),
|
||||
sq.c.location,
|
||||
func.lag(sq.c.location).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('location_wkt_prev'),
|
||||
func.lead(sq.c.location).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('location_wkt_next'),
|
||||
sq.c.track,
|
||||
func.lag(sq.c.track).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('track_prev'),
|
||||
func.lead(sq.c.track).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('track_next'),
|
||||
sq.c.ground_speed,
|
||||
func.lag(sq.c.ground_speed).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('ground_speed_prev'),
|
||||
func.lead(sq.c.ground_speed).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('ground_speed_next'),
|
||||
sq.c.altitude,
|
||||
func.lag(sq.c.altitude).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('altitude_prev'),
|
||||
func.lead(sq.c.altitude).over(partition_by=sq.c.device_id, order_by=sq.c.timestamp).label('altitude_next')) \
|
||||
.subquery()
|
||||
|
||||
# consider only positions with the same device id
|
||||
sq4 = session.query(sq3) \
|
||||
.filter(sq3.c.device_id_prev == sq3.c.device_id == sq3.c.device_id_next) \
|
||||
.subquery()
|
||||
|
||||
|
||||
# consider only positions with predecessor and successor and limit distance and duration between points
|
||||
sq3 = session.query(sq2) \
|
||||
.filter(and_(sq2.c.device_id_prev != null(),
|
||||
sq2.c.device_id_next != null())) \
|
||||
.filter(and_(func.ST_DistanceSphere(sq2.c.location, sq2.c.location_wkt_prev) < radius,
|
||||
func.ST_DistanceSphere(sq2.c.location, sq2.c.location_wkt_next) < radius)) \
|
||||
.filter(sq2.c.timestamp_next - sq2.c.timestamp_prev < timedelta(seconds=duration)) \
|
||||
.subquery()
|
||||
|
||||
# find possible takeoffs and landings
|
||||
sq4 = session.query(
|
||||
sq3.c.timestamp,
|
||||
case([(sq3.c.ground_speed > takeoff_speed, sq3.c.location_wkt_prev), # on takeoff we take the location from the previous fix because it is nearer to the airport
|
||||
(sq3.c.ground_speed <= takeoff_speed, sq3.c.location)]).label('location'),
|
||||
case([(sq3.c.ground_speed > landing_speed, sq3.c.track),
|
||||
(sq3.c.ground_speed <= landing_speed, sq3.c.track_prev)]).label('track'), # on landing we take the track from the previous fix because gliders tend to leave the runway quickly
|
||||
sq3.c.ground_speed,
|
||||
sq3.c.altitude,
|
||||
case([(sq3.c.ground_speed > takeoff_speed, True),
|
||||
(sq3.c.ground_speed < landing_speed, False)]).label('is_takeoff'),
|
||||
sq3.c.device_id) \
|
||||
.filter(or_(and_(sq3.c.ground_speed_prev < takeoff_speed, # takeoff
|
||||
sq3.c.ground_speed > takeoff_speed,
|
||||
sq3.c.ground_speed_next > takeoff_speed),
|
||||
and_(sq3.c.ground_speed_prev > landing_speed, # landing
|
||||
sq3.c.ground_speed < landing_speed,
|
||||
sq3.c.ground_speed_next < landing_speed))) \
|
||||
.subquery()
|
||||
|
||||
# consider them if the are near airports ...
|
||||
sq5 = session.query(
|
||||
sq4.c.timestamp,
|
||||
case([(sq4.c.ground_speed > takeoff_speed, sq4.c.location_wkt_prev), # on takeoff we take the location from the previous fix because it is nearer to the airport
|
||||
(sq4.c.ground_speed < landing_speed, sq4.c.location)]).label('location'),
|
||||
case([(sq4.c.ground_speed > takeoff_speed, sq4.c.track),
|
||||
(sq4.c.ground_speed < landing_speed, sq4.c.track_prev)]).label('track'), # on landing we take the track from the previous fix because gliders tend to leave the runway quickly
|
||||
sq4.c.ground_speed,
|
||||
sq4.c.altitude,
|
||||
case([(sq4.c.ground_speed > takeoff_speed, True),
|
||||
(sq4.c.ground_speed < landing_speed, False)]).label('is_takeoff'),
|
||||
sq4.c.device_id) \
|
||||
.filter(sq4.c.timestamp_next - sq4.c.timestamp_prev < timedelta(seconds=duration)) \
|
||||
.filter(and_(func.ST_DistanceSphere(sq4.c.location, sq4.c.location_wkt_prev) < radius,
|
||||
func.ST_DistanceSphere(sq4.c.location, sq4.c.location_wkt_next) < radius)) \
|
||||
.filter(or_(and_(sq4.c.ground_speed_prev < takeoff_speed, # takeoff
|
||||
sq4.c.ground_speed > takeoff_speed,
|
||||
sq4.c.ground_speed_next > takeoff_speed),
|
||||
and_(sq4.c.ground_speed_prev > landing_speed, # landing
|
||||
sq4.c.ground_speed < landing_speed,
|
||||
sq4.c.ground_speed_next < landing_speed))) \
|
||||
sq4.c.track,
|
||||
sq4.c.is_takeoff,
|
||||
sq4.c.device_id,
|
||||
Airport.id.label('airport_id'),
|
||||
func.ST_DistanceSphere(sq4.c.location, Airport.location_wkt).label('airport_distance')) \
|
||||
.filter(and_(func.ST_Within(sq4.c.location, Airport.border),
|
||||
between(Airport.style, 2, 5))) \
|
||||
.subquery()
|
||||
|
||||
# consider them if they are near a airport
|
||||
sq6 = session.query(
|
||||
sq5.c.timestamp,
|
||||
sq5.c.track,
|
||||
sq5.c.is_takeoff,
|
||||
sq5.c.device_id,
|
||||
Airport.id.label('airport_id')) \
|
||||
.filter(and_(func.ST_DistanceSphere(sq5.c.location, Airport.location_wkt) < airport_radius,
|
||||
between(sq5.c.altitude, Airport.altitude - airport_delta, Airport.altitude + airport_delta))) \
|
||||
.filter(between(Airport.style, 2, 5)) \
|
||||
|
||||
# ... and take the nearest airport
|
||||
sq6 = session.query(sq5.c.timestamp, sq5.c.track, sq5.c.is_takeoff, sq5.c.device_id, sq5.c.airport_id) \
|
||||
.distinct(sq5.c.timestamp, sq5.c.track, sq5.c.is_takeoff, sq5.c.device_id, sq5.c.airport_id) \
|
||||
.order_by(sq5.c.timestamp, sq5.c.track, sq5.c.is_takeoff, sq5.c.device_id, sq5.c.airport_id, sq5.c.airport_distance) \
|
||||
.subquery()
|
||||
|
||||
|
||||
# consider them only if they are not already existing in db
|
||||
takeoff_landing_query = session.query(sq6) \
|
||||
.filter(~exists().where(
|
||||
and_(TakeoffLanding.timestamp == sq6.c.timestamp,
|
||||
TakeoffLanding.device_id == sq6.c.device_id,
|
||||
TakeoffLanding.airport_id == sq6.c.airport_id)))
|
||||
|
||||
|
||||
# ... and save them
|
||||
ins = insert(TakeoffLanding).from_select((TakeoffLanding.timestamp,
|
||||
TakeoffLanding.track,
|
||||
|
|
|
@ -6,15 +6,17 @@ from io import StringIO
|
|||
|
||||
from ogn.model import AircraftBeacon, ReceiverBeacon
|
||||
from ogn.utils import open_file
|
||||
from ogn.commands.database import get_database_days
|
||||
|
||||
manager = Manager()
|
||||
|
||||
|
||||
class LogfileDbSaver():
|
||||
def __init__(self):
|
||||
"""Establish the database connection."""
|
||||
try:
|
||||
self.conn = psycopg2.connect(database = "ogn", user = "postgres", password = "postgres", host = "localhost", port = "5432")
|
||||
except:
|
||||
self.conn = psycopg2.connect(database="ogn", user="postgres", password="postgres", host="localhost", port="5432")
|
||||
except Exception as e:
|
||||
raise Exception("I am unable to connect to the database")
|
||||
self.cur = self.conn.cursor()
|
||||
|
||||
|
@ -41,10 +43,12 @@ class LogfileDbSaver():
|
|||
|
||||
index_clause = " AND hasindexes = FALSE" if no_index_only == True else ""
|
||||
|
||||
self.cur.execute(("SELECT DISTINCT(RIGHT(tablename, 10))"
|
||||
" FROM pg_catalog.pg_tables"
|
||||
" WHERE schemaname = 'public' AND tablename LIKE 'aircraft_beacons_%'{}"
|
||||
" ORDER BY RIGHT(tablename, 10);".format(index_clause)))
|
||||
self.cur.execute(("""
|
||||
SELECT DISTINCT(RIGHT(tablename, 10))
|
||||
FROM pg_catalog.pg_tables
|
||||
WHERE schemaname = 'public' AND tablename LIKE 'aircraft_beacons_%'{}
|
||||
ORDER BY RIGHT(tablename, 10);
|
||||
""".format(index_clause)))
|
||||
|
||||
return [datestr[0].replace('_', '-') for datestr in self.cur.fetchall()]
|
||||
|
||||
|
@ -124,7 +128,7 @@ class LogfileDbSaver():
|
|||
receiver_id int);
|
||||
""".format(self.receiver_table))
|
||||
self.conn.commit()
|
||||
except:
|
||||
except Exception as e:
|
||||
raise Exception("I can't create the tables")
|
||||
|
||||
def add(self, beacon):
|
||||
|
@ -157,10 +161,11 @@ class LogfileDbSaver():
|
|||
self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_receiver_beacons_subquery()), gzip_file)
|
||||
|
||||
def create_indices(self):
|
||||
"""Creates indices for aircraft- and receiver-beacons. We need them for the beacon merging operation."""
|
||||
"""Creates indices for aircraft- and receiver-beacons."""
|
||||
|
||||
self.cur.execute("""
|
||||
CREATE INDEX IF NOT EXISTS ix_{0}_timestamp_name_receiver_name ON "{0}" (timestamp, name, receiver_name);
|
||||
CREATE INDEX IF NOT EXISTS ix_{0}_device_id_timestamp_error_count ON "{0}" (device_id, timestamp, error_count);
|
||||
CREATE INDEX IF NOT EXISTS ix_{1}_timestamp_name_receiver_name ON "{1}" (timestamp, name, receiver_name);
|
||||
""".format(self.aircraft_table, self.receiver_table))
|
||||
self.conn.commit()
|
||||
|
@ -194,14 +199,15 @@ class LogfileDbSaver():
|
|||
|
||||
self.cur.execute("""
|
||||
UPDATE receivers AS r
|
||||
SET location = sq.location
|
||||
SET location = sq.location,
|
||||
altitude = sq.altitude
|
||||
FROM
|
||||
(SELECT rb.receiver_id,
|
||||
ROW_NUMBER() OVER (PARTITION BY receiver_id),
|
||||
FIRST_VALUE(rb.location) OVER (PARTITION BY receiver_id ORDER BY CASE WHEN location IS NOT NULL THEN timestamp ELSE NULL END NULLS LAST) AS location
|
||||
(SELECT DISTINCT ON (rb.receiver_id) rb.receiver_id, rb.location, rb.altitude
|
||||
FROM "{1}" AS rb
|
||||
WHERE rb.location IS NOT NULL
|
||||
ORDER BY rb.receiver_id, rb.timestamp
|
||||
) AS sq
|
||||
WHERE r.id = sq.receiver_id AND sq.row_number = 1;
|
||||
WHERE r.id = sq.receiver_id;
|
||||
""".format(self.aircraft_table, self.receiver_table))
|
||||
self.conn.commit()
|
||||
|
||||
|
@ -250,7 +256,7 @@ class LogfileDbSaver():
|
|||
ELSE NULL
|
||||
END AS quality,
|
||||
CAST(ab.altitude - ST_Value(e.rast, ab.location) AS REAL) AS agl
|
||||
|
||||
|
||||
INTO "{0}_temp"
|
||||
FROM "{0}" AS ab, devices AS d, receivers AS r, elevation AS e
|
||||
WHERE ab.address = d.address AND receiver_name = r.name AND ST_Intersects(e.rast, ab.location);
|
||||
|
@ -352,7 +358,7 @@ class LogfileDbSaver():
|
|||
self.cur.execute(query)
|
||||
return len(self.cur.fetchall()) == 1
|
||||
|
||||
def transfer(self):
|
||||
def transfer_aircraft_beacons(self):
|
||||
query = """
|
||||
INSERT INTO aircraft_beacons(location, altitude, name, dstcall, relay, receiver_name, timestamp, track, ground_speed,
|
||||
address_type, aircraft_type, stealth, address, climb_rate, turn_rate, signal_quality, error_count, frequency_offset, gps_quality_horizontal, gps_quality_vertical, software_version, hardware_version, real_address, signal_power,
|
||||
|
@ -365,6 +371,22 @@ class LogfileDbSaver():
|
|||
self.cur.execute(query)
|
||||
self.conn.commit()
|
||||
|
||||
def transfer_receiver_beacons(self):
|
||||
query = """
|
||||
INSERT INTO receiver_beacons(location, altitude, name, receiver_name, dstcall, timestamp,
|
||||
|
||||
version, platform, cpu_load, free_ram, total_ram, ntp_error, rt_crystal_correction, voltage,
|
||||
amperage, cpu_temp, senders_visible, senders_total, rec_input_noise, senders_signal,
|
||||
senders_messages, good_senders_signal, good_senders, good_and_bad_senders,
|
||||
|
||||
receiver_id)
|
||||
{}
|
||||
ON CONFLICT DO NOTHING;
|
||||
""".format(self.get_merged_receiver_beacons_subquery())
|
||||
|
||||
self.cur.execute(query)
|
||||
self.conn.commit()
|
||||
|
||||
def create_flights2d(self):
|
||||
query = """
|
||||
INSERT INTO flights2d
|
||||
|
@ -396,20 +418,17 @@ class LogfileDbSaver():
|
|||
ELSE 1
|
||||
END AS ping
|
||||
FROM (
|
||||
SELECT sq.timestamp t1,
|
||||
lag(sq.timestamp) OVER (partition BY sq.timestamp::date, sq.device_id ORDER BY sq.timestamp) t2,
|
||||
sq.location l1,
|
||||
lag(sq.location) OVER (partition BY sq.timestamp::date, sq.device_id ORDER BY sq.timestamp) l2,
|
||||
sq.device_id d1,
|
||||
lag(sq.device_id) OVER (partition BY sq.timestamp::date, sq.device_id ORDER BY sq.timestamp) d2
|
||||
SELECT sq.timestamp t1,
|
||||
lag(sq.timestamp) OVER (partition BY sq.device_id ORDER BY sq.timestamp) t2,
|
||||
sq.location l1,
|
||||
lag(sq.location) OVER (partition BY sq.device_id ORDER BY sq.timestamp) l2,
|
||||
sq.device_id d1,
|
||||
lag(sq.device_id) OVER (partition BY sq.device_id ORDER BY sq.timestamp) d2
|
||||
FROM (
|
||||
SELECT timestamp,
|
||||
device_id,
|
||||
location,
|
||||
row_number() OVER (partition BY timestamp::date, device_id, timestamp ORDER BY error_count) message_number
|
||||
SELECT DISTINCT ON (device_id, timestamp) timestamp, device_id, location
|
||||
FROM {}
|
||||
WHERE device_id IS NOT NULL) sq
|
||||
WHERE sq.message_number = 1 ) sq2 ) sq3 ) sq4
|
||||
WHERE device_id IS NOT NULL AND ground_speed > 250 AND agl < 100
|
||||
ORDER BY device_id, timestamp, error_count) sq) sq2 ) sq3 ) sq4
|
||||
GROUP BY sq4.timestamp::date,
|
||||
sq4.device_id,
|
||||
sq4.part ) sq5
|
||||
|
@ -441,15 +460,14 @@ class LogfileDbSaver():
|
|||
sq.device_id d1,
|
||||
LAG(sq.device_id) OVER ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) d2,
|
||||
sq.agl a1,
|
||||
LAG(sq.agl) over ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) a2
|
||||
LAG(sq.agl) over ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) a2
|
||||
FROM
|
||||
(
|
||||
SELECT timestamp, device_id, location, agl,
|
||||
Row_number() OVER ( PARTITION BY timestamp::DATE, device_id, timestamp ORDER BY error_count) message_number
|
||||
SELECT DISTINCT ON (device_id, timestamp) timestamp, device_id, location, agl
|
||||
FROM {}
|
||||
) sq
|
||||
WHERE sq.message_number = 1
|
||||
) sq2
|
||||
ORDER BY device_id, timestamp, error_count
|
||||
) sq
|
||||
) sq2
|
||||
WHERE EXTRACT(epoch FROM sq2.t1 - sq2.t2) > 300
|
||||
AND ST_DistanceSphere(sq2.l1, sq2.l2) / EXTRACT(epoch FROM sq2.t1 - sq2.t2) BETWEEN 15 AND 50
|
||||
AND sq2.a1 > 300 AND sq2.a2 > 300
|
||||
|
@ -461,6 +479,7 @@ class LogfileDbSaver():
|
|||
self.cur.execute(query)
|
||||
self.conn.commit()
|
||||
|
||||
|
||||
def convert(sourcefile, datestr, saver):
|
||||
from ogn.gateway.process import string_to_message
|
||||
from ogn.gateway.process_tools import AIRCRAFT_TYPES, RECEIVER_TYPES
|
||||
|
@ -490,7 +509,7 @@ def convert(sourcefile, datestr, saver):
|
|||
if message is None:
|
||||
continue
|
||||
|
||||
dictfilt = lambda x, y: dict([ (i,x[i]) for i in x if i in set(y) ])
|
||||
dictfilt = lambda x, y: dict([(i, x[i]) for i in x if i in set(y)])
|
||||
|
||||
try:
|
||||
if message['beacon_type'] in AIRCRAFT_TYPES:
|
||||
|
@ -562,18 +581,25 @@ def update():
|
|||
saver.update_receiver_location()
|
||||
saver.create_indices()
|
||||
|
||||
|
||||
@manager.command
|
||||
def transfer():
|
||||
def transfer(start=None, end=None):
|
||||
"""Transfer beacons from separate logfile tables to beacon table."""
|
||||
|
||||
with LogfileDbSaver() as saver:
|
||||
datestrs = saver.get_datestrs()
|
||||
if start is not None and end is not None:
|
||||
dates = get_database_days(start, end)
|
||||
datestrs = [date.strftime('%Y_%m_%d') for date in dates]
|
||||
else:
|
||||
datestrs = saver.get_datestrs()
|
||||
pbar = tqdm(datestrs)
|
||||
for datestr in pbar:
|
||||
pbar.set_description("Transfer beacons for {}".format(datestr))
|
||||
saver.set_datestr(datestr)
|
||||
if not saver.is_transfered():
|
||||
saver.transfer()
|
||||
saver.transfer_aircraft_beacons()
|
||||
saver.transfer_receiver_beacons()
|
||||
|
||||
|
||||
@manager.command
|
||||
def create_flights2d():
|
||||
|
@ -587,6 +613,7 @@ def create_flights2d():
|
|||
saver.set_datestr(datestr)
|
||||
saver.create_flights2d()
|
||||
|
||||
|
||||
@manager.command
|
||||
def create_gaps2d():
|
||||
"""Create 'gaps' from logfile tables."""
|
||||
|
@ -599,6 +626,7 @@ def create_gaps2d():
|
|||
saver.set_datestr(datestr)
|
||||
saver.create_gaps2d()
|
||||
|
||||
|
||||
@manager.command
|
||||
def file_export(path):
|
||||
"""Export separate logfile tables to csv files. They can be used for fast bulk import with sql COPY command."""
|
||||
|
@ -610,6 +638,7 @@ def file_export(path):
|
|||
|
||||
with LogfileDbSaver() as saver:
|
||||
datestrs = saver.get_datestrs()
|
||||
datestrs = filter(lambda x: x.startswith('2018-12'), datestrs)
|
||||
pbar = tqdm(datestrs)
|
||||
for datestr in pbar:
|
||||
pbar.set_description("Exporting data for {}".format(datestr))
|
||||
|
|
|
@ -1,17 +1,33 @@
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
from manager import Manager
|
||||
from ogn.collect.database import update_device_infos, update_country_code
|
||||
from ogn.commands.dbutils import engine, session
|
||||
from ogn.model import Base, DeviceInfoOrigin
|
||||
from ogn.utils import get_airports
|
||||
from sqlalchemy import distinct
|
||||
from sqlalchemy.sql import null, func
|
||||
|
||||
from ogn.model import Base, DeviceInfoOrigin, AircraftBeacon
|
||||
from ogn.utils import get_airports, get_days
|
||||
from sqlalchemy.sql import func
|
||||
|
||||
manager = Manager()
|
||||
|
||||
ALEMBIC_CONFIG_FILE = "alembic.ini"
|
||||
|
||||
|
||||
def get_database_days(start, end):
|
||||
"""Returns the first and the last day in aircraft_beacons table."""
|
||||
|
||||
if start is None and end is None:
|
||||
days_from_db = session.query(func.min(AircraftBeacon.timestamp).label('first_day'), func.max(AircraftBeacon.timestamp).label('last_day')).one()
|
||||
start = days_from_db[0].date()
|
||||
end = days_from_db[1].date()
|
||||
else:
|
||||
start = datetime.strptime(start, "%Y-%m-%d")
|
||||
end = datetime.strptime(end, "%Y-%m-%d")
|
||||
|
||||
days = get_days(start, end)
|
||||
|
||||
return days
|
||||
|
||||
|
||||
@manager.command
|
||||
def init():
|
||||
"""Initialize the database."""
|
||||
|
@ -24,8 +40,8 @@ def init():
|
|||
session.execute('CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;')
|
||||
session.commit()
|
||||
Base.metadata.create_all(engine)
|
||||
session.execute("SELECT create_hypertable('aircraft_beacons', 'timestamp', chunk_target_size => '2GB');")
|
||||
session.execute("SELECT create_hypertable('receiver_beacons', 'timestamp', chunk_target_size => '2GB');")
|
||||
session.execute("SELECT create_hypertable('aircraft_beacons', 'timestamp', chunk_target_size => '2GB', if_not_exists => TRUE);")
|
||||
session.execute("SELECT create_hypertable('receiver_beacons', 'timestamp', chunk_target_size => '2GB', if_not_exists => TRUE);")
|
||||
session.commit()
|
||||
#alembic_cfg = Config(ALEMBIC_CONFIG_FILE)
|
||||
#command.stamp(alembic_cfg, "head")
|
||||
|
@ -92,11 +108,13 @@ def import_airports(path='tests/SeeYou.cup'):
|
|||
airports = get_airports(path)
|
||||
session.bulk_save_objects(airports)
|
||||
session.commit()
|
||||
session.execute("UPDATE airports SET border = ST_Expand(location, 0.05)")
|
||||
session.commit()
|
||||
print("Imported {} airports.".format(len(airports)))
|
||||
|
||||
|
||||
@manager.command
|
||||
def update_country_codes():
|
||||
"""Update country codes of all receivers."""
|
||||
|
||||
update_country_code(session=session)
|
||||
|
||||
update_country_code(session=session)
|
||||
|
|
|
@ -1,36 +1,42 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from datetime import timedelta, datetime, date
|
||||
from datetime import datetime
|
||||
|
||||
from manager import Manager
|
||||
from ogn.collect.logbook import update_logbook
|
||||
from ogn.collect.takeoff_landings import update_takeoff_landings
|
||||
from ogn.commands.dbutils import session
|
||||
from ogn.model import Device, DeviceInfo, TakeoffLanding, Airport, Logbook
|
||||
from sqlalchemy import and_, or_
|
||||
from sqlalchemy.orm import aliased
|
||||
from ogn.model import Airport, Logbook
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy.sql import func
|
||||
|
||||
from tqdm import tqdm
|
||||
from ogn.commands.database import get_database_days
|
||||
|
||||
manager = Manager()
|
||||
|
||||
|
||||
@manager.command
|
||||
def compute_takeoff_landing():
|
||||
def compute_takeoff_landing(start=None, end=None):
|
||||
"""Compute takeoffs and landings."""
|
||||
|
||||
for single_date in (date(2017, 6, 14) + timedelta(days=n) for n in range(800)):
|
||||
print("Berechne für den {}".format(single_date.strftime("%Y-%m-%d")))
|
||||
|
||||
days = get_database_days(start, end)
|
||||
|
||||
pbar = tqdm(days)
|
||||
for single_date in pbar:
|
||||
pbar.set_description(datetime.strftime(single_date, '%Y-%m-%d'))
|
||||
result = update_takeoff_landings(session=session, date=single_date)
|
||||
print(result)
|
||||
|
||||
|
||||
|
||||
@manager.command
|
||||
def compute_logbook():
|
||||
def compute_logbook(start=None, end=None):
|
||||
"""Compute logbook."""
|
||||
print("Compute logbook...")
|
||||
result = update_logbook(session=session)#.delay()
|
||||
#counter = result.get()
|
||||
#print("New logbook entries: {}".format(counter))
|
||||
|
||||
days = get_database_days(start, end)
|
||||
|
||||
pbar = tqdm(days)
|
||||
for single_date in pbar:
|
||||
pbar.set_description(datetime.strftime(single_date, '%Y-%m-%d'))
|
||||
result = update_logbook(session=session, date=single_date)
|
||||
|
||||
|
||||
@manager.arg('date', help='date (format: yyyy-mm-dd)')
|
||||
|
|
|
@ -1,43 +1,42 @@
|
|||
from manager import Manager
|
||||
from ogn.commands.dbutils import session
|
||||
|
||||
from datetime import date, timedelta
|
||||
from ogn.commands.database import get_database_days
|
||||
|
||||
from ogn.collect.stats import create_device_stats, create_receiver_stats, create_relation_stats,\
|
||||
update_qualities, update_receivers_stats, update_devices_stats,\
|
||||
update_qualities, update_receivers, update_devices,\
|
||||
update_device_stats_jumps
|
||||
|
||||
manager = Manager()
|
||||
|
||||
|
||||
@manager.command
|
||||
def create_stats():
|
||||
def create_stats(start=None, end=None):
|
||||
"""Create DeviceStats, ReceiverStats and RelationStats."""
|
||||
|
||||
for single_date in (date(2016, 1, 1) + timedelta(days=n) for n in range(800)):
|
||||
|
||||
days = get_database_days(start, end)
|
||||
|
||||
for single_date in days:
|
||||
result = create_device_stats(session=session, date=single_date)
|
||||
print(result)
|
||||
|
||||
|
||||
result = update_device_stats_jumps(session=session, date=single_date)
|
||||
print(result)
|
||||
|
||||
|
||||
result = create_receiver_stats(session=session, date=single_date)
|
||||
print(result)
|
||||
|
||||
|
||||
result = create_relation_stats(session=session, date=single_date)
|
||||
print(result)
|
||||
|
||||
|
||||
result = update_qualities(session=session, date=single_date)
|
||||
print(result)
|
||||
|
||||
#result = update_device_stats(session=session, date=date)
|
||||
#print(result)
|
||||
|
||||
|
||||
|
||||
@manager.command
|
||||
def add_missing_receivers():
|
||||
"""Update receivers with data from stats."""
|
||||
|
||||
result = update_receivers_stats(session=session)
|
||||
result = update_receivers(session=session)
|
||||
print(result)
|
||||
|
||||
|
||||
|
@ -45,17 +44,18 @@ def add_missing_receivers():
|
|||
def add_missing_devices():
|
||||
"""Update devices with data from stats."""
|
||||
|
||||
result = update_devices_stats(session=session)
|
||||
result = update_devices(session=session)
|
||||
print(result)
|
||||
|
||||
|
||||
@manager.command
|
||||
def create_flights():
|
||||
def create_flights(start=None, end=None):
|
||||
"""Create Flights."""
|
||||
|
||||
for single_date in (date(2016, 8, 10) + timedelta(days=n) for n in range(800)):
|
||||
days = get_database_days(start, end)
|
||||
|
||||
for single_date in days:
|
||||
result = _create_flights2d(session=session, date=single_date)
|
||||
#result = _create_flights3d(session=session, date=single_date)
|
||||
print(result)
|
||||
|
||||
|
||||
|
@ -111,7 +111,7 @@ def _create_flights2d(session=None, date=None):
|
|||
sq5.device_id
|
||||
ON CONFLICT DO NOTHING;
|
||||
"""
|
||||
|
||||
|
||||
result = session.execute(SQL.format(date.strftime("%Y-%m-%d")))
|
||||
insert_counter = result.rowcount
|
||||
session.commit()
|
||||
|
|
|
@ -2,7 +2,7 @@ import logging
|
|||
|
||||
from manager import Manager
|
||||
from ogn.client import AprsClient
|
||||
from ogn.gateway.process import string_to_message
|
||||
from ogn.gateway.process import string_to_message
|
||||
from datetime import datetime
|
||||
from ogn.gateway.process_tools import DummyMerger, Converter, DbSaver
|
||||
from ogn.commands.dbutils import session
|
||||
|
@ -17,6 +17,7 @@ saver = DbSaver(session=session)
|
|||
converter = Converter(callback=saver)
|
||||
merger = DummyMerger(callback=converter)
|
||||
|
||||
|
||||
def asdf(raw_string):
|
||||
message = string_to_message(raw_string, reference_date=datetime.utcnow())
|
||||
if message is not None:
|
||||
|
|
|
@ -3,7 +3,7 @@ import logging
|
|||
from mgrs import MGRS
|
||||
|
||||
from ogn.commands.dbutils import session
|
||||
from ogn.model import AircraftBeacon, ReceiverBeacon, Location
|
||||
from ogn.model import Location
|
||||
from ogn.parser import parse, ParseError
|
||||
from ogn.gateway.process_tools import DbSaver, Converter, DummyMerger, AIRCRAFT_TYPES, RECEIVER_TYPES
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ from ogn.model import AircraftBeacon, ReceiverBeacon
|
|||
AIRCRAFT_TYPES = ['aprs_aircraft', 'flarm', 'tracker', 'fanet', 'lt24', 'naviter', 'skylines', 'spider', 'spot']
|
||||
RECEIVER_TYPES = ['aprs_receiver', 'receiver']
|
||||
|
||||
|
||||
class DummyMerger:
|
||||
def __init__(self, callback):
|
||||
self.callback = callback
|
||||
|
@ -14,6 +15,7 @@ class DummyMerger:
|
|||
def flush(self):
|
||||
pass
|
||||
|
||||
|
||||
class Merger:
|
||||
def __init__(self, callback, max_timedelta=None, max_lines=None):
|
||||
self.callback = callback
|
||||
|
@ -27,8 +29,8 @@ class Merger:
|
|||
|
||||
# release old messages
|
||||
if self.max_timedelta is not None:
|
||||
for receiver,v1 in self.message_map.items():
|
||||
for name,v2 in v1.items():
|
||||
for receiver, v1 in self.message_map.items():
|
||||
for name, v2 in v1.items():
|
||||
for timestamp,message in v2.items():
|
||||
if message['timestamp'] - timestamp > self.max_timedelta:
|
||||
self.callback.add_message(message)
|
||||
|
@ -37,30 +39,30 @@ class Merger:
|
|||
# release messages > max_lines
|
||||
if self.max_lines is not None:
|
||||
pass
|
||||
|
||||
|
||||
# merge messages with same timestamp
|
||||
receiver_name = message['receiver_name']
|
||||
name = message['name']
|
||||
timestamp = message['timestamp']
|
||||
|
||||
|
||||
if receiver_name in self.message_map:
|
||||
if name in self.message_map[receiver_name]:
|
||||
timestamps = self.message_map[receiver_name][name]
|
||||
if timestamp in timestamps:
|
||||
other = timestamps[timestamp]
|
||||
params1 = dict( [(k,v) for k,v in message.items() if v is not None])
|
||||
params2 = dict( [(k,v) for k,v in other.items() if v is not None])
|
||||
params1 = dict([(k, v) for k, v in message.items() if v is not None])
|
||||
params2 = dict([(k, v) for k, v in other.items() if v is not None])
|
||||
merged = {**params1, **params2}
|
||||
|
||||
|
||||
# zum debuggen
|
||||
if 'raw_message' in message and 'raw_message' in other:
|
||||
merged['raw_message'] = '"{}","{}"'.format(message['raw_message'], other['raw_message'])
|
||||
|
||||
|
||||
self.callback.add_message(merged)
|
||||
del self.message_map[receiver_name][name][timestamp]
|
||||
else:
|
||||
self.message_map[receiver_name][name][timestamp] = message
|
||||
|
||||
|
||||
# release previous messages
|
||||
for ts in list(timestamps):
|
||||
if ts < timestamp:
|
||||
|
@ -73,18 +75,19 @@ class Merger:
|
|||
self.message_map.update({receiver_name: {name: {timestamp: message}}})
|
||||
|
||||
def flush(self):
|
||||
for receiver,v1 in self.message_map.items():
|
||||
for name,v2 in v1.items():
|
||||
for receiver, v1 in self.message_map.items():
|
||||
for name, v2 in v1.items():
|
||||
for timestamp in v2:
|
||||
self.callback.add_message(self.message_map[receiver][name][timestamp])
|
||||
|
||||
self.callback.flush()
|
||||
self.message_map = dict()
|
||||
|
||||
|
||||
class Converter:
|
||||
def __init__(self, callback):
|
||||
self.callback = callback
|
||||
|
||||
|
||||
def add_message(self, message):
|
||||
if message['aprs_type'] in ['status', 'position']:
|
||||
beacon = self.message_to_beacon(message)
|
||||
|
@ -101,12 +104,13 @@ class Converter:
|
|||
beacon = ReceiverBeacon(**message)
|
||||
else:
|
||||
print("Whoops: what is this: {}".format(message))
|
||||
|
||||
|
||||
return beacon
|
||||
|
||||
|
||||
def flush(self):
|
||||
self.callback.flush()
|
||||
|
||||
|
||||
class DummySaver:
|
||||
def add_message(self, message):
|
||||
print(message)
|
||||
|
@ -114,6 +118,7 @@ class DummySaver:
|
|||
def flush(self):
|
||||
print("========== flush ==========")
|
||||
|
||||
|
||||
class DbSaver:
|
||||
def __init__(self, session):
|
||||
self.session = session
|
||||
|
@ -141,8 +146,10 @@ class DbSaver:
|
|||
print(e)
|
||||
return
|
||||
|
||||
|
||||
import os, gzip, csv
|
||||
|
||||
|
||||
class FileSaver:
|
||||
def __init__(self):
|
||||
self.aircraft_messages = list()
|
||||
|
@ -157,13 +164,13 @@ class FileSaver:
|
|||
self.fout_rb = gzip.open(receiver_beacon_filename, 'wt')
|
||||
else:
|
||||
raise FileExistsError
|
||||
|
||||
|
||||
self.aircraft_writer = csv.writer(self.fout_ab, delimiter=',')
|
||||
self.aircraft_writer.writerow(AircraftBeacon.get_columns())
|
||||
|
||||
self.receiver_writer = csv.writer(self.fout_rb, delimiter=',')
|
||||
self.receiver_writer.writerow(ReceiverBeacon.get_columns())
|
||||
|
||||
|
||||
return 1
|
||||
|
||||
def add_message(self, beacon):
|
||||
|
@ -177,7 +184,7 @@ class FileSaver:
|
|||
self.receiver_writer.writerows(self.receiver_messages)
|
||||
self.aircraft_messages = list()
|
||||
self.receiver_messages = list()
|
||||
|
||||
|
||||
def close(self):
|
||||
self.fout_ab.close()
|
||||
self.fout_rb.close()
|
||||
|
|
|
@ -77,10 +77,10 @@ class AircraftBeacon(Beacon):
|
|||
'timestamp',
|
||||
'track',
|
||||
'ground_speed',
|
||||
|
||||
|
||||
#'raw_message',
|
||||
#'reference_timestamp',
|
||||
|
||||
|
||||
'address_type',
|
||||
'aircraft_type',
|
||||
'stealth',
|
||||
|
@ -96,7 +96,7 @@ class AircraftBeacon(Beacon):
|
|||
'hardware_version',
|
||||
'real_address',
|
||||
'signal_power',
|
||||
|
||||
|
||||
'distance',
|
||||
'radial',
|
||||
'quality',
|
||||
|
@ -113,7 +113,7 @@ class AircraftBeacon(Beacon):
|
|||
self.timestamp,
|
||||
self.track,
|
||||
self.ground_speed,
|
||||
|
||||
|
||||
#self.raw_message,
|
||||
#self.reference_timestamp,
|
||||
|
||||
|
@ -140,4 +140,4 @@ class AircraftBeacon(Beacon):
|
|||
|
||||
|
||||
Index('ix_aircraft_beacons_date_device_id_address', func.date(AircraftBeacon.timestamp), AircraftBeacon.device_id, AircraftBeacon.address)
|
||||
Index('ix_aircraft_beacons_date_receiver_id_distance', func.date(AircraftBeacon.timestamp), AircraftBeacon.receiver_id, AircraftBeacon.distance)
|
||||
Index('ix_aircraft_beacons_date_receiver_id_distance', func.date(AircraftBeacon.timestamp), AircraftBeacon.receiver_id, AircraftBeacon.distance)
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
from geoalchemy2.types import Geometry
|
||||
from sqlalchemy import Column, String, Integer, Float, SmallInteger
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base import Base
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
from geoalchemy2.shape import to_shape
|
||||
from geoalchemy2.types import Geometry
|
||||
from sqlalchemy import Column, String, Integer, SmallInteger, Float, DateTime, BigInteger
|
||||
from sqlalchemy import Column, String, SmallInteger, Float, DateTime
|
||||
from sqlalchemy.ext.declarative import AbstractConcreteBase
|
||||
|
||||
from .base import Base
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
from geoalchemy2.types import Geometry
|
||||
from sqlalchemy import Column, String, Integer, Float, SmallInteger, BigInteger
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base import Base
|
||||
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
from sqlalchemy import Column, Integer, String, Float, Boolean, SmallInteger, DateTime
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base import Base
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
from geoalchemy2.types import Geometry
|
||||
from sqlalchemy import Column, String, Integer, Float, SmallInteger, Date, Index, ForeignKey
|
||||
from sqlalchemy import Column, Integer, Date, Index, ForeignKey
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base import Base
|
||||
|
@ -22,5 +22,6 @@ class Flight2D(Base):
|
|||
self.date,
|
||||
self.path_wkt)
|
||||
|
||||
|
||||
Index('ix_flights2d_date_device_id', Flight2D.date, Flight2D.device_id)
|
||||
#Index('ix_flights2d_date_path', Flight2D.date, Flight2D.path_wkt) --> CREATE INDEX ix_flights2d_date_path ON flights2d USING GIST("date", path)
|
||||
#Index('ix_flights2d_date_path', Flight2D.date, Flight2D.path_wkt) --> CREATE INDEX ix_flights2d_date_path ON flights2d USING GIST("date", path)
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from sqlalchemy import Column, Float, String, Integer, SmallInteger, ForeignKey, Index
|
||||
from sqlalchemy import Column, Float, String, Integer, ForeignKey, Index
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.sql import func
|
||||
from .beacon import Beacon
|
||||
|
@ -70,10 +70,10 @@ class ReceiverBeacon(Beacon):
|
|||
'dstcall',
|
||||
'receiver_name',
|
||||
'timestamp',
|
||||
|
||||
|
||||
# 'raw_message',
|
||||
# 'reference_timestamp',
|
||||
|
||||
|
||||
'version',
|
||||
'platform',
|
||||
'cpu_load',
|
||||
|
@ -101,7 +101,7 @@ class ReceiverBeacon(Beacon):
|
|||
self.dstcall,
|
||||
self.receiver_name,
|
||||
self.timestamp,
|
||||
|
||||
|
||||
# self.raw_message,
|
||||
# self.reference_timestamp,
|
||||
|
||||
|
@ -124,4 +124,5 @@ class ReceiverBeacon(Beacon):
|
|||
int(self.good_senders) if self.good_senders else None,
|
||||
int(self.good_and_bad_senders) if self.good_and_bad_senders else None]
|
||||
|
||||
Index('ix_receiver_beacons_date_receiver_id', func.date(ReceiverBeacon.timestamp), ReceiverBeacon.receiver_id)
|
||||
|
||||
Index('ix_receiver_beacons_date_receiver_id', func.date(ReceiverBeacon.timestamp), ReceiverBeacon.receiver_id)
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from sqlalchemy import Column, String, Integer, SmallInteger, Float, Date, ForeignKey, Index
|
||||
from sqlalchemy import Column, String, Integer, SmallInteger, Float, Date, ForeignKey
|
||||
from sqlalchemy.orm import relationship, backref
|
||||
|
||||
|
||||
|
@ -21,4 +21,4 @@ class ReceiverCoverage(Base):
|
|||
|
||||
# Relations
|
||||
receiver_id = Column(Integer, ForeignKey('receivers.id', ondelete='SET NULL'), index=True)
|
||||
receiver = relationship('Receiver', foreign_keys=[receiver_id], backref=backref('receiver_coverages', order_by='ReceiverCoverage.date.asc()'))
|
||||
receiver = relationship('Receiver', foreign_keys=[receiver_id], backref=backref('receiver_coverages', order_by='ReceiverCoverage.date.asc()'))
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from sqlalchemy import Column, Integer, Date, DateTime, Float, ForeignKey, SmallInteger, Boolean, String, Index
|
||||
from sqlalchemy import Column, Integer, Date, Float, ForeignKey, Index
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from .base import Base
|
||||
|
@ -27,5 +27,6 @@ class RelationStats(Base):
|
|||
self.normalized_signal_quality,
|
||||
self.beacon_count)
|
||||
|
||||
|
||||
Index('ix_relation_stats_date_device_id', RelationStats.date, RelationStats.device_id, RelationStats.receiver_id)
|
||||
Index('ix_relation_stats_date_receiver_id', RelationStats.date, RelationStats.receiver_id, RelationStats.device_id)
|
||||
Index('ix_relation_stats_date_receiver_id', RelationStats.date, RelationStats.receiver_id, RelationStats.device_id)
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
from sqlalchemy import Boolean, Column, Integer, SmallInteger, DateTime, ForeignKey, Index
|
||||
from sqlalchemy import Boolean, Column, Integer, SmallInteger, DateTime, ForeignKey
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.sql import func
|
||||
|
||||
from .base import Base
|
||||
|
||||
|
|
21
ogn/utils.py
21
ogn/utils.py
|
@ -1,13 +1,11 @@
|
|||
import csv
|
||||
import gzip
|
||||
from io import StringIO
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from aerofiles.seeyou import Reader
|
||||
from geopy.exc import GeopyError
|
||||
from geopy.geocoders import Nominatim
|
||||
from ogn.parser.utils import FEETS_TO_METER
|
||||
import requests
|
||||
from urllib.request import Request
|
||||
|
||||
from .model import DeviceInfoOrigin, DeviceInfo, Airport, Location
|
||||
|
||||
|
@ -24,6 +22,17 @@ nm2m = 1852
|
|||
mi2m = 1609.34
|
||||
|
||||
|
||||
def get_days(start, end):
|
||||
days = [start + timedelta(days=x) for x in range(0, (end - start).days + 1)]
|
||||
return days
|
||||
|
||||
|
||||
def date_to_timestamps(date):
|
||||
start = datetime(date.year, date.month, date.day, 0, 0, 0)
|
||||
end = datetime(date.year, date.month, date.day, 23, 59, 59)
|
||||
return (start, end)
|
||||
|
||||
|
||||
def get_ddb(csv_file=None, address_origin=DeviceInfoOrigin.unknown):
|
||||
if csv_file is None:
|
||||
r = requests.get(DDB_URL)
|
||||
|
@ -74,11 +83,11 @@ def get_flarmnet(fln_file=None, address_origin=DeviceInfoOrigin.flarmnet):
|
|||
|
||||
|
||||
def get_trackable(ddb):
|
||||
l = []
|
||||
result = []
|
||||
for i in ddb:
|
||||
if i.tracked and i.address_type in address_prefixes:
|
||||
l.append("{}{}".format(address_prefixes[i.address_type], i.address))
|
||||
return l
|
||||
result.append("{}{}".format(address_prefixes[i.address_type], i.address))
|
||||
return result
|
||||
|
||||
|
||||
def get_airports(cupfile):
|
||||
|
|
|
@ -2,7 +2,7 @@ import unittest
|
|||
import os
|
||||
|
||||
from ogn.model import AircraftBeacon, ReceiverBeacon, Device, Receiver, DeviceInfo
|
||||
from ogn.collect.database import add_missing_devices, add_missing_receivers, import_ddb_file
|
||||
from ogn.collect.database import add_missing_devices, add_missing_receivers, import_ddb
|
||||
|
||||
|
||||
class TestDB(unittest.TestCase):
|
||||
|
@ -31,8 +31,8 @@ class TestDB(unittest.TestCase):
|
|||
def test_update_devices(self):
|
||||
session = self.session
|
||||
|
||||
ab01 = AircraftBeacon(receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:00')
|
||||
rb01 = ReceiverBeacon(name='Bene', timestamp='2017-12-10 09:59:50')
|
||||
ab01 = AircraftBeacon(name='FLRDD4711', receiver_name='Koenigsdf', address='DD4711', timestamp='2017-12-10 10:00:00')
|
||||
rb01 = ReceiverBeacon(name='Bene', receiver_name='GLIDERN1', timestamp='2017-12-10 09:59:50')
|
||||
d01 = Device(address='DD4711')
|
||||
r01 = Receiver(name='Koenigsdf')
|
||||
session.bulk_save_objects([ab01, rb01, d01, r01])
|
||||
|
@ -54,7 +54,7 @@ class TestDB(unittest.TestCase):
|
|||
def test_import_ddb_file(self):
|
||||
session = self.session
|
||||
|
||||
import_ddb_file(session, path=os.path.dirname(__file__) + '/../custom_ddb.txt')
|
||||
import_ddb(session, path=os.path.dirname(__file__) + '/../custom_ddb.txt')
|
||||
|
||||
device_infos = session.query(DeviceInfo).all()
|
||||
self.assertEqual(len(device_infos), 6)
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
import unittest
|
||||
import os
|
||||
from datetime import datetime
|
||||
from datetime import datetime, date
|
||||
|
||||
from ogn.model import AircraftBeacon, ReceiverBeacon, Receiver, Device, DeviceStats
|
||||
|
||||
from ogn.collect.stats import update_device_stats
|
||||
from ogn.collect.stats import update_devices
|
||||
|
||||
|
||||
class TestDB(unittest.TestCase):
|
||||
|
@ -22,16 +22,16 @@ class TestDB(unittest.TestCase):
|
|||
init()
|
||||
|
||||
# Prepare Beacons
|
||||
self.ab01 = AircraftBeacon(timestamp='2017-12-10 10:00:01')
|
||||
self.ab02 = AircraftBeacon(timestamp='2017-12-10 10:00:02')
|
||||
self.ab03 = AircraftBeacon(timestamp='2017-12-10 10:00:03')
|
||||
self.ab04 = AircraftBeacon(timestamp='2017-12-10 10:00:04')
|
||||
self.ab05 = AircraftBeacon(timestamp='2017-12-10 10:00:05')
|
||||
self.ab06 = AircraftBeacon(timestamp='2017-12-10 10:00:05')
|
||||
self.ab01 = AircraftBeacon(name='FLRDD4711', receiver_name='Koenigsdf', timestamp='2017-12-10 10:00:01')
|
||||
self.ab02 = AircraftBeacon(name='FLRDD4711', receiver_name='Koenigsdf', timestamp='2017-12-10 10:00:02')
|
||||
self.ab03 = AircraftBeacon(name='FLRDD4711', receiver_name='Koenigsdf', timestamp='2017-12-10 10:00:03')
|
||||
self.ab04 = AircraftBeacon(name='FLRDD4711', receiver_name='Koenigsdf', timestamp='2017-12-10 10:00:04')
|
||||
self.ab05 = AircraftBeacon(name='FLRDD4711', receiver_name='Koenigsdf', timestamp='2017-12-10 10:00:05')
|
||||
self.ab06 = AircraftBeacon(name='FLRDD4711', receiver_name='Koenigsdf', timestamp='2017-12-10 10:00:05')
|
||||
|
||||
self.rb01 = ReceiverBeacon(timestamp='2017-12-10 09:55:00', altitude=601, version='0.2.5', platform='ARM')
|
||||
self.rb02 = ReceiverBeacon(timestamp='2017-12-10 10:00:00', altitude=601, version='0.2.7', platform='ARM')
|
||||
self.rb03 = ReceiverBeacon(timestamp='2017-12-10 10:05:00', altitude=601, version='0.2.6', platform='ARM')
|
||||
self.rb01 = ReceiverBeacon(name='Koenigsdf', receiver_name='GLIDERN1', timestamp='2017-12-10 09:55:00', altitude=601, version='0.2.5', platform='ARM')
|
||||
self.rb02 = ReceiverBeacon(name='Koenigsdf', receiver_name='GLIDERN1', timestamp='2017-12-10 10:00:00', altitude=601, version='0.2.7', platform='ARM')
|
||||
self.rb03 = ReceiverBeacon(name='Koenigsdf', receiver_name='GLIDERN1', timestamp='2017-12-10 10:05:00', altitude=601, version='0.2.6', platform='ARM')
|
||||
|
||||
self.r01 = Receiver(name='Koenigsdf')
|
||||
self.r02 = Receiver(name='Bene')
|
||||
|
@ -61,7 +61,7 @@ class TestDB(unittest.TestCase):
|
|||
session.add(self.ab01)
|
||||
session.commit()
|
||||
|
||||
update_device_stats(session, date='2017-12-10')
|
||||
update_devices(session)
|
||||
|
||||
devicestats = session.query(DeviceStats).all()
|
||||
self.assertEqual(len(devicestats), 1)
|
||||
|
@ -88,7 +88,7 @@ class TestDB(unittest.TestCase):
|
|||
session.add(self.ab02)
|
||||
session.commit()
|
||||
|
||||
update_device_stats(session, date='2017-12-10')
|
||||
update_devices(session, date='2017-12-10')
|
||||
|
||||
devicestats = session.query(DeviceStats).all()
|
||||
self.assertEqual(len(devicestats), 1)
|
||||
|
@ -114,7 +114,7 @@ class TestDB(unittest.TestCase):
|
|||
session.add(self.ab03)
|
||||
session.commit()
|
||||
|
||||
update_device_stats(session, date='2017-12-10')
|
||||
update_devices(session, date='2017-12-10')
|
||||
|
||||
devicestats = session.query(DeviceStats).all()
|
||||
self.assertEqual(len(devicestats), 1)
|
||||
|
@ -142,7 +142,7 @@ class TestDB(unittest.TestCase):
|
|||
session.add(self.ab04)
|
||||
session.commit()
|
||||
|
||||
update_device_stats(session, date='2017-12-10')
|
||||
update_devices(session, date='2017-12-10')
|
||||
|
||||
devicestats = session.query(DeviceStats).all()
|
||||
self.assertEqual(len(devicestats), 1)
|
||||
|
@ -168,7 +168,7 @@ class TestDB(unittest.TestCase):
|
|||
session.add(self.ab05)
|
||||
session.commit()
|
||||
|
||||
update_device_stats(session, date='2017-12-10')
|
||||
update_devices(session, date='2017-12-10')
|
||||
|
||||
devicestats = session.query(DeviceStats).all()
|
||||
self.assertEqual(len(devicestats), 1)
|
||||
|
@ -195,7 +195,7 @@ class TestDB(unittest.TestCase):
|
|||
session.add(self.ab06)
|
||||
session.commit()
|
||||
|
||||
update_device_stats(session, date='2017-12-10')
|
||||
update_devices(session, date='2017-12-10')
|
||||
|
||||
devicestats = session.query(DeviceStats).all()
|
||||
self.assertEqual(len(devicestats), 1)
|
||||
|
|
|
@ -1,12 +1,19 @@
|
|||
import os
|
||||
import unittest
|
||||
from datetime import date
|
||||
|
||||
from ogn.model import AircraftType
|
||||
from ogn.utils import get_ddb, get_trackable, get_country_code, get_airports
|
||||
from ogn.utils import get_days, get_ddb, get_trackable, get_airports
|
||||
import unittest.mock as mock
|
||||
|
||||
|
||||
class TestStringMethods(unittest.TestCase):
|
||||
def test_get_days(self):
|
||||
start = date(2018, 2, 27)
|
||||
end = date(2018, 3, 2)
|
||||
days = get_days(start, end)
|
||||
self.assertEqual(days, [date(2018, 2, 27), date(2018, 2, 28), date(2018, 3, 1), date(2018, 3, 2)])
|
||||
|
||||
def test_get_devices(self):
|
||||
devices = get_ddb()
|
||||
self.assertGreater(len(devices), 1000)
|
||||
|
|
Ładowanie…
Reference in New Issue