pull/68/head
Konstantin Gründger 2019-03-06 21:11:46 +01:00
rodzic 0c99770a73
commit 3fe5e91e4f
9 zmienionych plików z 288 dodań i 101 usunięć

Wyświetl plik

@ -37,13 +37,13 @@ For best performance you should use [TimescaleDB](https://www.timescale.com), wh
5. Create database
```
./manage.py db.init
./flask database init
```
6. Optional: Prepare tables for TimescaleDB
```
./manage.py db.init_timescaledb
./flask database init_timescaledb
```
7. Optional: Import world border dataset (needed if you want to know the country a receiver belongs to, etc.)
@ -85,7 +85,7 @@ The following scripts run in the foreground and should be deamonized
- Start the aprs client
```
./manage.py gateway.run
./flask gateway run
```
- Start a task server (make sure redis is up and running)
@ -107,66 +107,44 @@ and set the environment variable `OGN_CONFIG_MODULE` accordingly.
```
touch myconfig.py
export OGN_CONFIG_MODULE="myconfig"
./manage.py gateway.run
./flask gateway run
```
### manage.py - CLI options
### Flask - Command Line Interface
```
usage: manage [<namespace>.]<command> [<args>]
Usage: flask [OPTIONS] COMMAND [ARGS]...
positional arguments:
command the command to run
A general utility script for Flask applications.
optional arguments:
-h, --help show this help message and exit
Provides commands from Flask, extensions, and the application. Loads the
application defined in the FLASK_APP environment variable, or from a
wsgi.py file. Setting the FLASK_ENV environment variable to 'development'
will enable debug mode.
available commands:
[bulkimport]
create_flights2d Create complete flight traces from logfile tables.
create_gaps2d Create 'gaps' from logfile tables.
file_export Export separate logfile tables to csv files. They can be used for fast bulk import with sql COPY command.
file_import Import APRS logfiles into separate logfile tables.
transfer Transfer beacons from separate logfile tables to beacon table.
update Update beacons (add foreign keys, compute distance, bearing, ags, etc.) in separate logfile tables.
[db]
drop Drop all tables.
import_airports Import airports from a ".cup" file
import_ddb Import registered devices from the DDB.
import_file Import registered devices from a local file.
import_flarmnet Import registered devices from a local file.
init Initialize the database.
init_timescaledb Initialize TimescaleDB features.
update_country_codes Update country codes of all receivers.
upgrade Upgrade database to the latest version.
[flights]
flights2d Compute flights.
[gateway]
run Run the aprs client.
[export]
cup Export receiver waypoints as '.cup'.
igc Export igc file for <address> at <date>.
[logbook]
compute_logbook Compute logbook.
compute_takeoff_landingCompute takeoffs and landings.
show Show a logbook for <airport_name>.
[stats]
create Create DeviceStats, ReceiverStats and RelationStats.
create_ognrange Create stats for Melissa's ognrange.
update_devices Update devices with data from stats.
update_receivers Update receivers with data from stats.
$ export FLASK_APP=app.py
$ export FLASK_ENV=development
$ flask run
Options:
--version Show the flask version
--help Show this message and exit.
Commands:
database Database creation and handling.
db Perform database migrations.
export Export data in several file formats.
flights Create 2D flight paths from data.
gateway Connection to APRS servers.
logbook Handling of logbook data.
routes Show the routes for the app.
run Runs a development server.
shell Runs a shell in the app context.
stats Handling of statistical data.
```
Only the command `logbook.compute` requires a running task server (celery) at the moment.
Most commands are command groups, so if you execute this command you will get further (sub)commands.
### Available tasks
### Available tasks (deprecated - needs rework)
- `ogn.collect.database.import_ddb` - Import registered devices from the DDB.
- `ogn.collect.database.import_file` - Import registered devices from a local file.

Wyświetl plik

@ -4,7 +4,7 @@ from sqlalchemy import insert, distinct, between, literal
from sqlalchemy.sql import null, and_, func, or_, update
from sqlalchemy.sql.expression import case
from ogn_python.model import AircraftBeacon, DeviceStats, ReceiverStats, RelationStats, Receiver, Device
from ogn_python.model import AircraftBeacon, DeviceStats, Country, CountryStats, ReceiverStats, RelationStats, Receiver, Device
from .celery import app
from ogn_python.model.receiver_beacon import ReceiverBeacon
@ -56,6 +56,9 @@ def create_device_stats(session=None, date=None):
func.count(sq.c.device_id)
.over(partition_by=sq.c.device_id)
.label('aircraft_beacon_count'),
func.first_value(sq.c.name)
.over(partition_by=sq.c.device_id, order_by=case([(sq.c.name == null(), None)], else_=sq.c.timestamp).asc().nullslast())
.label('name'),
func.first_value(sq.c.timestamp)
.over(partition_by=sq.c.device_id, order_by=case([(sq.c.timestamp == null(), None)], else_=sq.c.timestamp).asc().nullslast())
.label('firstseen'),
@ -81,7 +84,8 @@ def create_device_stats(session=None, date=None):
# And insert them
ins = insert(DeviceStats).from_select(
[DeviceStats.device_id, DeviceStats.date, DeviceStats.receiver_count, DeviceStats.max_altitude, DeviceStats.aircraft_beacon_count, DeviceStats.firstseen, DeviceStats.lastseen, DeviceStats.aircraft_type, DeviceStats.stealth,
[DeviceStats.device_id, DeviceStats.date, DeviceStats.receiver_count, DeviceStats.max_altitude, DeviceStats.aircraft_beacon_count, DeviceStats.name,
DeviceStats.firstseen, DeviceStats.lastseen, DeviceStats.aircraft_type, DeviceStats.stealth,
DeviceStats.software_version, DeviceStats.hardware_version, DeviceStats.real_address],
device_stats)
res = session.execute(ins)
@ -175,6 +179,39 @@ def create_receiver_stats(session=None, date=None):
return "ReceiverStats for {}: {} deleted, {} inserted, {} updated".format(date, deleted_counter, insert_counter, update_counter)
@app.task
def create_country_stats(session=None, date=None):
if session is None:
session = app.session
if not date:
logger.warn("A date is needed for calculating stats. Exiting")
return None
else:
(start, end) = date_to_timestamps(date)
# First kill the stats for the selected date
deleted_counter = session.query(CountryStats) \
.filter(CountryStats.date == date) \
.delete()
country_stats = session.query(literal(date), Country.gid,
func.count(AircraftBeacon.timestamp).label('aircraft_beacon_count'), \
func.count(func.distinct(AircraftBeacon.receiver_id)).label('device_count')) \
.filter(between(AircraftBeacon.timestamp, start, end)) \
.filter(func.st_contains(Country.geom, AircraftBeacon.location)) \
.group_by(Country.gid) \
.subquery()
# And insert them
ins = insert(CountryStats).from_select(
[CountryStats.date, CountryStats.country_id, CountryStats.aircraft_beacon_count, CountryStats.device_count],
country_stats)
res = session.execute(ins)
insert_counter = res.rowcount
session.commit()
@app.task
def update_device_stats_jumps(session=None, date=None):
"""Update device stats jumps."""
@ -422,6 +459,9 @@ def update_devices(session=None):
device_stats = session.query(
distinct(DeviceStats.device_id).label('device_id'),
func.first_value(DeviceStats.name)
.over(partition_by=DeviceStats.device_id, order_by=case([(DeviceStats.name == null(), None)], else_=DeviceStats.date).desc().nullslast())
.label('name'),
func.first_value(DeviceStats.firstseen)
.over(partition_by=DeviceStats.device_id, order_by=case([(DeviceStats.firstseen == null(), None)], else_=DeviceStats.date).asc().nullslast())
.label('firstseen'),
@ -448,7 +488,8 @@ def update_devices(session=None):
upd = update(Device) \
.where(and_(Device.id == device_stats.c.device_id)) \
.values({'firstseen': device_stats.c.firstseen,
.values({'name': device_stats.c.name,
'firstseen': device_stats.c.firstseen,
'lastseen': device_stats.c.lastseen,
'aircraft_type': device_stats.c.aircraft_type,
'stealth': device_stats.c.stealth,

Wyświetl plik

@ -10,30 +10,79 @@ from ogn_python import db
user_cli = AppGroup('flights')
user_cli.help = "Create 2D flight paths from data."
NOTHING = ''
CONTEST_RELEVANT = 'AND agl < 1000'
LOW_PASS = 'AND agl < 50 and ground_speed > 250'
def compute_gaps(session, date):
query = """
INSERT INTO flights2d(date, flight_type, device_id, path)
SELECT '{date}' AS date,
3 AS flight_type,
sq3.device_id,
ST_Collect(sq3.path)
FROM (
SELECT sq2.d1 device_id,
ST_MakeLine(sq2.l1, sq2.l2) path
FROM
(
SELECT sq.timestamp t1,
LAG(sq.timestamp) OVER ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) t2,
sq.location l1,
LAG(sq.location) OVER ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) l2,
sq.device_id d1,
LAG(sq.device_id) OVER ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) d2
FROM
(
SELECT DISTINCT ON (device_id, timestamp) timestamp, device_id, location, agl
FROM aircraft_beacons
WHERE timestamp BETWEEN '{date} 00:00:00' AND '{date} 23:59:59' AND agl > 300
ORDER BY device_id, timestamp, error_count
) sq
) sq2
WHERE EXTRACT(epoch FROM sq2.t1 - sq2.t2) > 300
AND ST_DistanceSphere(sq2.l1, sq2.l2) / EXTRACT(epoch FROM sq2.t1 - sq2.t2) BETWEEN 15 AND 50
) sq3
GROUP BY sq3.device_id
ON CONFLICT DO NOTHING;
""".format(date=date.strftime('%Y-%m-%d'))
session.execute(query)
session.commit()
def compute_flights2d(session, date, flight_type):
if flight_type == 0:
filter = NOTHING
elif flight_type == 1:
filter = CONTEST_RELEVANT
elif flight_type == 2:
filter = LOW_PASS
def compute_flights2d(session, date):
query = """
INSERT INTO flights2d
(
date,
flight_type,
device_id,
path,
path_simple
)
SELECT sq5.date,
sq5.device_id,
st_collect(sq5.linestring order BY sq5.part) multilinestring,
st_collect(st_simplify(sq5.linestring ORDER BY sq5.part, 0.0001) simple_multilinestring
SELECT '{date}' AS date,
{flight_type} as flight_type,
sq5.device_id,
st_collect(sq5.linestring order BY sq5.part) multilinestring,
st_collect(st_simplify(sq5.linestring, 0.0001) ORDER BY sq5.part) simple_multilinestring
FROM (
SELECT sq4.timestamp::date AS date,
sq4.device_id,
sq4.part,
st_makeline(sq4.location ORDER BY sq4.timestamp) linestring
SELECT sq4.device_id,
sq4.part,
st_makeline(sq4.location ORDER BY sq4.timestamp) linestring
FROM (
SELECT sq3.timestamp,
sq3.location,
sq3.device_id,
sum(sq3.ping) OVER (partition BY sq3.timestamp::date, sq3.device_id ORDER BY sq3.timestamp) part
SELECT sq3.timestamp,
sq3.location,
sq3.device_id,
sum(sq3.ping) OVER (partition BY sq3.device_id ORDER BY sq3.timestamp) part
FROM (
SELECT sq2.t1 AS timestamp,
sq2.l1 AS location,
@ -52,17 +101,19 @@ def compute_flights2d(session, date):
FROM (
SELECT DISTINCT ON (device_id, timestamp) timestamp, device_id, location
FROM aircraft_beacons
WHERE timestamp BETWEEN '{0} 00:00:00' AND '{0} 23:59:59'
WHERE timestamp BETWEEN '{date} 00:00:00' AND '{date} 23:59:59' {filter}
ORDER BY device_id, timestamp, error_count
) sq
) sq2
) sq3
) sq4
GROUP BY sq4.timestamp::date, sq4.device_id, sq4.part
GROUP BY sq4.device_id, sq4.part
) sq5
GROUP BY sq5.date, sq5.device_id
GROUP BY sq5.device_id
ON CONFLICT DO NOTHING;
""".format(date.strftime('%Y-%m-%d'))
""".format(date=date.strftime('%Y-%m-%d'),
flight_type=flight_type,
filter=filter)
session.execute(query)
session.commit()
@ -70,12 +121,16 @@ def compute_flights2d(session, date):
@user_cli.command('create')
@click.argument('start')
@click.argument('end')
def create(start, end):
"""Compute flights."""
@click.argument('flight_type', type=click.INT)
def create(start, end, flight_type):
"""Compute flights. Flight type: 0: all flights, 1: below 1000m AGL, 2: below 50m AGL + faster than 250 km/h, 3: inverse coverage'"""
days = get_database_days(start, end)
pbar = tqdm(days)
for single_date in pbar:
pbar.set_description(datetime.strftime(single_date, '%Y-%m-%d'))
result = compute_flights2d(session=db.session, date=single_date)
if flight_type <= 2:
result = compute_flights2d(session=db.session, date=single_date, flight_type=flight_type)
else:
result = compute_gaps(session=db.session, date=single_date)

Wyświetl plik

@ -6,7 +6,7 @@ from tqdm import tqdm
from ogn_python.commands.database import get_database_days
from ogn_python.collect.stats import create_device_stats, create_receiver_stats, create_relation_stats,\
from ogn_python.collect.stats import create_device_stats, create_receiver_stats, create_relation_stats, create_country_stats,\
update_qualities, update_receivers as update_receivers_command, update_devices as update_devices_command,\
update_device_stats_jumps
@ -36,6 +36,30 @@ def create(start, end):
result = create_relation_stats(session=db.session, date=single_date)
result = update_qualities(session=db.session, date=single_date)
@user_cli.command('create_country')
@click.argument('start')
@click.argument('end')
def create_country(start, end):
"""Create CountryStats."""
days = get_database_days(start, end)
pbar = tqdm(days)
for single_date in pbar:
pbar.set_description(datetime.strftime(single_date, '%Y-%m-%d'))
result = create_country_stats(session=db.session, date=single_date)
from ogn_python.model import *
@user_cli.command('update_devices_name')
def update_devices_name():
"""Update Devices name."""
device_ids = db.session.query(Device.id).all()
for device_id in tqdm(device_ids):
db.session.execute("update devices d set name = sq.name from ( select * from aircraft_beacons ab where ab.device_id = {} limit 1) sq where d.id = sq.device_id and d.name is null or d.name = 'ICA3D3CC4';".format(device_id[0]))
db.session.commit()
@user_cli.command('update_receivers')
def update_receivers():
@ -53,6 +77,30 @@ def update_devices():
print(result)
@user_cli.command('update_mgrs')
@click.argument('start')
@click.argument('end')
def update_mgrs(start, end):
"""Create location_mgrs_short."""
days = get_database_days(start, end)
pbar = tqdm(days)
for single_date in pbar:
datestr = datetime.strftime(single_date, '%Y-%m-%d')
pbar.set_description(datestr)
for pbar2 in tqdm(["{:02d}:{:02d}".format(hh, mm) for hh in range(0, 24) for mm in range(0, 60)]):
sql = """
UPDATE aircraft_beacons
SET location_mgrs_short = left(location_mgrs, 5) || substring(location_mgrs, 6, 2) || substring(location_mgrs, 11, 2)
WHERE timestamp BETWEEN '{0} {1}:00' and '{0} {1}:59' AND location_mgrs_short IS NULL;
""".format(datestr, pbar2)
#print(sql)
db.session.execute(sql)
db.session.commit()
@user_cli.command('create_ognrange')
@click.argument('start')
@click.argument('end')

Wyświetl plik

@ -83,6 +83,8 @@ class ContinuousDbFeeder:
def __init__(self,):
self.postfix = 'continuous_import'
self.last_flush = datetime.utcnow()
self.last_add_missing = datetime.utcnow()
self.last_transfer = datetime.utcnow()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
@ -108,17 +110,25 @@ class ContinuousDbFeeder:
app.logger.error("Ignore beacon_type: {}".format(message['beacon_type']))
return
if datetime.utcnow() - self.last_flush >= timedelta(seconds=1):
if datetime.utcnow() - self.last_flush >= timedelta(seconds=5):
self.flush()
self.prepare()
self.transfer()
self.delete_beacons()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
self.last_flush = datetime.utcnow()
if datetime.utcnow() - self.last_add_missing >= timedelta(seconds=60):
self.add_missing()
self.last_add_missing = datetime.utcnow()
if datetime.utcnow() - self.last_transfer >= timedelta(seconds=10):
self.transfer()
self.delete_beacons()
self.last_transfer = datetime.utcnow()
def flush(self):
self.aircraft_buffer.seek(0)
self.receiver_buffer.seek(0)
@ -132,16 +142,16 @@ class ContinuousDbFeeder:
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
def add_missing(self):
add_missing_receivers(self.postfix)
add_missing_devices(self.postfix)
def prepare(self):
# make receivers complete
add_missing_receivers(self.postfix)
update_receiver_beacons(self.postfix)
update_receiver_location(self.postfix)
# make devices complete
add_missing_devices(self.postfix)
# prepare beacons for transfer
update_aircraft_beacons(self.postfix)
def transfer(self):
@ -155,18 +165,47 @@ class ContinuousDbFeeder:
delete_aircraft_beacons(self.postfix)
def prepare_bigdata(postfix):
# make receivers complete
add_missing_receivers(postfix)
update_receiver_location(postfix)
class FileDbFeeder():
def __init__(self):
self.postfix = 'continuous_import'
self.last_flush = datetime.utcnow()
# make devices complete
add_missing_devices(postfix)
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
# prepare beacons for transfer
create_indices(postfix)
update_receiver_beacons_bigdata(postfix)
update_aircraft_beacons_bigdata(postfix)
create_tables(self.postfix)
create_indices(self.postfix)
def add(self, raw_string):
message = string_to_message(raw_string, reference_date=datetime.utcnow())
if message is None or ('raw_message' in message and message['raw_message'][0] == '#') or 'beacon_type' not in message:
return
if message['beacon_type'] in AIRCRAFT_BEACON_TYPES:
complete_message = ','.join([str(message[k]) if k in message and message[k] is not None else '\\N' for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS])
self.aircraft_buffer.write(complete_message)
self.aircraft_buffer.write('\n')
elif message['beacon_type'] in RECEIVER_BEACON_TYPES:
complete_message = ','.join([str(message[k]) if k in message and message[k] is not None else '\\N' for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS])
self.receiver_buffer.write(complete_message)
self.receiver_buffer.write('\n')
else:
app.logger.error("Ignore beacon_type: {}".format(message['beacon_type']))
return
def prepare(self):
# make receivers complete
add_missing_receivers(self.postfix)
update_receiver_location(self.postfix)
# make devices complete
add_missing_devices(self.postfix)
# prepare beacons for transfer
create_indices(self.postfix)
update_receiver_beacons_bigdata(self.postfix)
update_aircraft_beacons_bigdata(self.postfix)
def get_aircraft_beacons_postfixes():

Wyświetl plik

@ -35,9 +35,9 @@ def add_missing_devices(postfix):
db.session.execute("""
INSERT INTO devices(address)
SELECT DISTINCT(ab.address)
SELECT DISTINCT (ab.address)
FROM "aircraft_beacons_{0}" AS ab
WHERE NOT EXISTS (SELECT 1 FROM devices AS d WHERE d.address = ab.address)
WHERE ab.address IS NOT NULL AND NOT EXISTS (SELECT 1 FROM devices AS d WHERE d.address = ab.address)
ORDER BY ab.address;
""".format(postfix))
db.session.commit()
@ -48,10 +48,16 @@ def add_missing_receivers(postfix):
db.session.execute("""
INSERT INTO receivers(name)
SELECT DISTINCT(rb.name)
SELECT DISTINCT (rb.name)
FROM "receiver_beacons_{0}" AS rb
WHERE NOT EXISTS (SELECT 1 FROM receivers AS r WHERE r.name = rb.name)
ORDER BY name;
ORDER BY rb.name;
INSERT INTO receivers(name)
SELECT DISTINCT (ab.receiver_name)
FROM "aircraft_beacons_{0}" AS ab
WHERE NOT EXISTS (SELECT 1 FROM receivers AS r WHERE r.name = ab.receiver_name)
ORDER BY ab.receiver_name;
""".format(postfix))
db.session.commit()
@ -82,7 +88,7 @@ def update_receiver_beacons(postfix):
UPDATE receiver_beacons_{0} AS rb
SET receiver_id = r.id
FROM receivers AS r
WHERE rb.name = r.name;
WHERE rb.receiver_id IS NULL AND rb.name = r.name;
""".format(postfix))
db.session.commit()
@ -127,7 +133,7 @@ def update_aircraft_beacons(postfix):
END
FROM devices AS d, receivers AS r
WHERE ab.address = d.address AND ab.receiver_name = r.name;
WHERE ab.device_id IS NULL and ab.receiver_id IS NULL AND ab.address = d.address AND ab.receiver_name = r.name;
""".format(postfix))
db.session.commit()

Wyświetl plik

@ -2,6 +2,7 @@
from .aircraft_type import AircraftType
from .beacon import Beacon
from .country import Country
from .country_stats import CountryStats
from .device import Device
from .device_info import DeviceInfo
from .device_info_origin import DeviceInfoOrigin

Wyświetl plik

@ -0,0 +1,18 @@
from ogn_python import db
class CountryStats(db.Model):
__tablename__ = "country_stats"
id = db.Column(db.Integer, primary_key=True)
date = db.Column(db.Date)
# Static data
aircraft_beacon_count = db.Column(db.Integer)
device_count = db.Column(db.Integer)
# Relations
country_id = db.Column(db.Integer, db.ForeignKey('countries.gid', ondelete='SET NULL'), index=True)
country = db.relationship('Country', foreign_keys=[country_id], backref=db.backref('stats', order_by='CountryStats.date.asc()'))

Wyświetl plik

@ -7,6 +7,7 @@ class Flight2D(db.Model):
__tablename__ = "flights2d"
date = db.Column(db.Date, primary_key=True)
flight_type = db.Column(db.SmallInteger, primary_key=True)
path_wkt = db.Column('path', Geometry('MULTILINESTRING', srid=4326))
path_simple_wkt = db.Column('path_simple', Geometry('MULTILINESTRING', srid=4326)) # this is the path simplified with ST_Simplify(path, 0.0001)