kopia lustrzana https://github.com/glidernet/ogn-python
Update to TimescaleDB
rodzic
ee88621806
commit
85efb49c93
23
README.md
23
README.md
|
@ -6,7 +6,7 @@
|
|||
A database backend for the [Open Glider Network](http://wiki.glidernet.org/).
|
||||
The ogn-python module saves all received beacons into a database with [SQLAlchemy](http://www.sqlalchemy.org/).
|
||||
It connects to the OGN aprs servers with [python-ogn-client](https://github.com/glidernet/python-ogn-client).
|
||||
It requires [PostgreSQL](http://www.postgresql.org/) and [PostGIS](http://www.postgis.net/).
|
||||
It requires [TimescaleDB](https://www.timescale.com, based on PostgreSQL) and [PostGIS](http://www.postgis.net/).
|
||||
|
||||
[Examples](https://github.com/glidernet/ogn-python/wiki/Examples)
|
||||
|
||||
|
@ -91,17 +91,21 @@ optional arguments:
|
|||
available commands:
|
||||
|
||||
[bulkimport]
|
||||
convert_logfile Convert ogn logfiles to csv logfiles (one for aircraft beacons and one for receiver beacons) <arg: path>. Logfile name: blablabla.txt_YYYY-MM-DD.
|
||||
create_indices Create indices for AircraftBeacon.
|
||||
drop_indices Drop indices of AircraftBeacon.
|
||||
import_csv_logfile Import csv logfile <arg: csv logfile>.
|
||||
create_flights2d Create complete flight traces from logfile tables.
|
||||
create_gaps2d Create 'gaps' from logfile tables.
|
||||
file_export Export separate logfile tables to csv files. They can be used for fast bulk import with sql COPY command.
|
||||
file_import Import APRS logfiles into separate logfile tables.
|
||||
transfer Transfer beacons from separate logfile tables to beacon table.
|
||||
update Update beacons (add foreign keys, compute distance, bearing, ags, etc.) in separate logfile tables.
|
||||
|
||||
[db]
|
||||
drop Drop all tables.
|
||||
import_airports Import airports from a ".cup" file
|
||||
import_ddb Import registered devices from the DDB.
|
||||
import_ddb_file Import registered devices from a local file.
|
||||
import_file Import registered devices from a local file.
|
||||
import_flarmnet Import registered devices from a local file.
|
||||
init Initialize the database.
|
||||
update_country_codes Update country codes of all receivers.
|
||||
upgrade Upgrade database to the latest version.
|
||||
|
||||
[gateway]
|
||||
|
@ -116,9 +120,10 @@ available commands:
|
|||
show Show a logbook for <airport_name>.
|
||||
|
||||
[stats]
|
||||
airports Compute airport statistics.
|
||||
devices Compute device statistics
|
||||
receivers Compute receiver statistics.
|
||||
add_missing_devices Update devices with data from stats.
|
||||
add_missing_receivers Update receivers with data from stats.
|
||||
create_flights Create Flights.
|
||||
create_stats Create DeviceStats, ReceiverStats and RelationStats.
|
||||
|
||||
[show.airport]
|
||||
list_all Show a list of all airports.
|
||||
|
|
|
@ -47,7 +47,7 @@ def import_ddb(session=None):
|
|||
|
||||
|
||||
@app.task
|
||||
def update_devices(session=None):
|
||||
def add_missing_devices(session=None):
|
||||
"""Add/update entries in devices table and update foreign keys in aircraft beacons."""
|
||||
|
||||
if session is None:
|
||||
|
@ -75,16 +75,16 @@ def update_devices(session=None):
|
|||
synchronize_session='fetch')
|
||||
|
||||
session.commit()
|
||||
logger.info("Devices: {} inserted, {} updated".format(insert_count, update_receivers))
|
||||
logger.info("Devices: {} inserted, {} updated".format(insert_count, add_missing_receivers))
|
||||
logger.info("Updated {} AircraftBeacons".format(upd))
|
||||
|
||||
return "{} Devices inserted, {} Devices updated, {} AircraftBeacons updated" \
|
||||
.format(insert_count, update_receivers, upd)
|
||||
.format(insert_count, add_missing_receivers, upd)
|
||||
|
||||
|
||||
@app.task
|
||||
def update_receivers(session=None):
|
||||
"""Add/update_receivers entries in receiver table and update receivers foreign keys and distance in aircraft beacons and update foreign keys in receiver beacons."""
|
||||
def add_missing_receivers(session=None):
|
||||
"""Add/add_missing_receivers entries in receiver table and update receivers foreign keys and distance in aircraft beacons and update foreign keys in receiver beacons."""
|
||||
|
||||
if session is None:
|
||||
session = app.session
|
||||
|
@ -116,11 +116,11 @@ def update_receivers(session=None):
|
|||
|
||||
session.commit()
|
||||
|
||||
logger.info("Receivers: {} inserted, {} updated.".format(insert_count, update_receivers))
|
||||
logger.info("Receivers: {} inserted, {} updated.".format(insert_count, add_missing_receivers))
|
||||
logger.info("Updated relations: {} aircraft beacons, {} receiver beacons".format(update_aircraft_beacons, update_receiver_beacons))
|
||||
|
||||
return "{} Receivers inserted, {} Receivers updated, {} AircraftBeacons updated, {} ReceiverBeacons updated" \
|
||||
.format(insert_count, update_receivers, update_aircraft_beacons, update_receiver_beacons)
|
||||
.format(insert_count, add_missing_receivers, update_aircraft_beacons, update_receiver_beacons)
|
||||
|
||||
|
||||
@app.task
|
||||
|
|
|
@ -1,381 +1,617 @@
|
|||
import os
|
||||
import re
|
||||
import logging
|
||||
|
||||
from manager import Manager
|
||||
from ogn.commands.dbutils import session
|
||||
|
||||
import psycopg2
|
||||
from tqdm import tqdm
|
||||
from io import StringIO
|
||||
|
||||
from ogn.model import AircraftBeacon, ReceiverBeacon
|
||||
from ogn.utils import open_file
|
||||
from ogn.gateway.process_tools import FileSaver, Converter, Merger
|
||||
|
||||
|
||||
manager = Manager()
|
||||
|
||||
PATTERN = '^.+\.txt\_(\d{4}\-\d{2}\-\d{2})(\.gz)?$'
|
||||
class LogfileDbSaver():
|
||||
def __init__(self):
|
||||
"""Establish the database connection."""
|
||||
try:
|
||||
self.conn = psycopg2.connect(database = "ogn", user = "postgres", password = "postgres", host = "localhost", port = "5432")
|
||||
except:
|
||||
raise Exception("I am unable to connect to the database")
|
||||
self.cur = self.conn.cursor()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
@manager.command
|
||||
def convert_logfile(path):
|
||||
"""Convert ogn logfiles to csv logfiles (one for aircraft beacons and one for receiver beacons) <arg: path>. Logfile name: blablabla.txt_YYYY-MM-DD."""
|
||||
def __exit__(self, type, value, traceback):
|
||||
"""Closes the database connection."""
|
||||
|
||||
logging.basicConfig(filename='convert.log', level=logging.DEBUG)
|
||||
self.cur.close()
|
||||
self.conn.close()
|
||||
|
||||
if os.path.isfile(path):
|
||||
head, tail = os.path.split(path)
|
||||
convert(tail, path=head)
|
||||
logging.info("Finished converting single file {}".format(head))
|
||||
elif os.path.isdir(path):
|
||||
for filename in sorted(os.listdir(path)):
|
||||
convert(filename, path=path)
|
||||
logging.info("Finished converting file path {}".format(path))
|
||||
else:
|
||||
logging.warning("Not a file nor a path: {}".format(path))
|
||||
def set_datestr(self, datestr):
|
||||
"""Sets the datestr of the current tables."""
|
||||
|
||||
self.prefix = datestr.replace('-', '_')
|
||||
self.aircraft_table = 'aircraft_beacons_{}'.format(self.prefix)
|
||||
self.receiver_table = 'receiver_beacons_{}'.format(self.prefix)
|
||||
self.aircraft_buffer = StringIO()
|
||||
self.receiver_buffer = StringIO()
|
||||
|
||||
def convert(sourcefile, path=''):
|
||||
logging.info("convert: {} {}".format(sourcefile, path))
|
||||
import datetime
|
||||
def get_datestrs(self, no_index_only=False):
|
||||
"""Get the date strings from imported log files."""
|
||||
|
||||
from ogn.gateway.process import string_to_message
|
||||
index_clause = " AND hasindexes = FALSE" if no_index_only == True else ""
|
||||
|
||||
match = re.search(PATTERN, sourcefile)
|
||||
if match:
|
||||
reference_date_string = match.group(1)
|
||||
reference_date = datetime.datetime.strptime(reference_date_string, "%Y-%m-%d")
|
||||
self.cur.execute(("SELECT DISTINCT(RIGHT(tablename, 10))"
|
||||
" FROM pg_catalog.pg_tables"
|
||||
" WHERE schemaname = 'public' AND tablename LIKE 'aircraft_beacons_%'{}"
|
||||
" ORDER BY RIGHT(tablename, 10);".format(index_clause)))
|
||||
|
||||
# Build the processing pipeline
|
||||
saver = FileSaver()
|
||||
converter = Converter(callback=saver)
|
||||
merger = Merger(callback=converter)
|
||||
return [datestr[0].replace('_', '-') for datestr in self.cur.fetchall()]
|
||||
|
||||
def create_tables(self):
|
||||
"""Create date dependant tables for log file import."""
|
||||
|
||||
try:
|
||||
saver.open(path, reference_date_string)
|
||||
except FileExistsError:
|
||||
logging.warning("Output files already exists. Skipping")
|
||||
return
|
||||
else:
|
||||
logging.warning("filename '{}' does not match pattern. Skipping".format(sourcefile))
|
||||
return
|
||||
self.cur.execute('CREATE EXTENSION IF NOT EXISTS postgis;')
|
||||
self.cur.execute('CREATE EXTENSION IF NOT EXISTS btree_gist;')
|
||||
self.cur.execute('DROP TABLE IF EXISTS "{}";'.format(self.aircraft_table))
|
||||
self.cur.execute('DROP TABLE IF EXISTS "{}";'.format(self.receiver_table))
|
||||
self.cur.execute("""
|
||||
CREATE TABLE "{0}" (
|
||||
location geometry,
|
||||
altitude real,
|
||||
name character varying,
|
||||
dstcall character varying,
|
||||
relay character varying,
|
||||
receiver_name character varying(9),
|
||||
"timestamp" timestamp without time zone,
|
||||
track smallint,
|
||||
ground_speed real,
|
||||
|
||||
fin = open_file(os.path.join(path, sourcefile))
|
||||
address_type smallint,
|
||||
aircraft_type smallint,
|
||||
stealth boolean,
|
||||
address character varying,
|
||||
climb_rate real,
|
||||
turn_rate real,
|
||||
signal_quality real,
|
||||
error_count smallint,
|
||||
frequency_offset real,
|
||||
gps_quality_horizontal smallint,
|
||||
gps_quality_vertical smallint,
|
||||
software_version real,
|
||||
hardware_version smallint,
|
||||
real_address character varying(6),
|
||||
signal_power real,
|
||||
|
||||
distance real,
|
||||
radial smallint,
|
||||
quality real,
|
||||
location_mgrs character varying(15),
|
||||
|
||||
receiver_id int,
|
||||
device_id int);
|
||||
""".format(self.aircraft_table))
|
||||
|
||||
self.cur.execute("""
|
||||
CREATE TABLE "{0}" (
|
||||
location geometry,
|
||||
altitude real,
|
||||
name character varying,
|
||||
receiver_name character varying(9),
|
||||
dstcall character varying,
|
||||
"timestamp" timestamp without time zone,
|
||||
|
||||
version character varying,
|
||||
platform character varying,
|
||||
cpu_load real,
|
||||
free_ram real,
|
||||
total_ram real,
|
||||
ntp_error real,
|
||||
rt_crystal_correction real,
|
||||
voltage real,
|
||||
amperage real,
|
||||
cpu_temp real,
|
||||
senders_visible integer,
|
||||
senders_total integer,
|
||||
rec_input_noise real,
|
||||
senders_signal real,
|
||||
senders_messages integer,
|
||||
good_senders_signal real,
|
||||
good_senders integer,
|
||||
good_and_bad_senders integer,
|
||||
|
||||
receiver_id int);
|
||||
""".format(self.receiver_table))
|
||||
self.conn.commit()
|
||||
except:
|
||||
raise Exception("I can't create the tables")
|
||||
|
||||
def add(self, beacon):
|
||||
"""Adds the values of the beacon to the buffer."""
|
||||
|
||||
value_string = ','.join([str(value) for value in beacon.get_values()]) + '\n'
|
||||
if isinstance(beacon, AircraftBeacon):
|
||||
self.aircraft_buffer.write(value_string)
|
||||
elif isinstance(beacon, ReceiverBeacon):
|
||||
self.receiver_buffer.write(value_string)
|
||||
|
||||
def flush(self):
|
||||
"""Writes the buffer into the tables and reset the buffer."""
|
||||
|
||||
self.aircraft_buffer.seek(0)
|
||||
self.receiver_buffer.seek(0)
|
||||
self.cur.copy_from(self.aircraft_buffer, self.aircraft_table, sep=',', null='None', columns=AircraftBeacon.get_columns())
|
||||
self.cur.copy_from(self.receiver_buffer, self.receiver_table, sep=',', null='None', columns=ReceiverBeacon.get_columns())
|
||||
self.conn.commit()
|
||||
self.aircraft_buffer = StringIO()
|
||||
self.receiver_buffer = StringIO()
|
||||
|
||||
def export_to_path(self, path):
|
||||
import os, gzip
|
||||
aircraft_beacons_file = os.path.join(path, self.aircraft_table + '.csv.gz')
|
||||
with gzip.open(aircraft_beacons_file, 'wb') as gzip_file:
|
||||
self.cur.copy_expert("COPY ({}) TO STDOUT WITH CSV HEADER;".format(self.get_merged_aircraft_beacons_subquery()), gzip_file)
|
||||
receiver_beacons_file = os.path.join(path, self.receiver_table + '.csv.gz')
|
||||
with gzip.open(receiver_beacons_file, 'wb') as gzip_file:
|
||||
self.cur.copy_expert("COPY ({}) TO STDOUT WITH CSV HEADER;".format(self.get_merged_receiver_beacons_subquery()), gzip_file)
|
||||
|
||||
def create_indices(self):
|
||||
"""Creates indices for aircraft- and receiver-beacons. We need them for the beacon merging operation."""
|
||||
|
||||
self.cur.execute("""
|
||||
CREATE INDEX IF NOT EXISTS ix_{0}_timestamp_name_receiver_name ON "{0}" (timestamp, name, receiver_name);
|
||||
CREATE INDEX IF NOT EXISTS ix_{1}_timestamp_name_receiver_name ON "{1}" (timestamp, name, receiver_name);
|
||||
""".format(self.aircraft_table, self.receiver_table))
|
||||
self.conn.commit()
|
||||
|
||||
def add_missing_devices(self):
|
||||
"""Add missing devices."""
|
||||
|
||||
self.cur.execute("""
|
||||
INSERT INTO devices(address)
|
||||
SELECT DISTINCT(ab.address)
|
||||
FROM "{}" AS ab
|
||||
WHERE NOT EXISTS (SELECT 1 FROM devices AS d WHERE d.address = ab.address)
|
||||
ORDER BY ab.address;
|
||||
""".format(self.aircraft_table))
|
||||
self.conn.commit()
|
||||
|
||||
def add_missing_receivers(self):
|
||||
"""Add missing receivers."""
|
||||
|
||||
self.cur.execute("""
|
||||
INSERT INTO receivers(name)
|
||||
SELECT DISTINCT(rb.name)
|
||||
FROM "{0}" AS rb
|
||||
WHERE NOT EXISTS (SELECT 1 FROM receivers AS r WHERE r.name = rb.name)
|
||||
ORDER BY name;
|
||||
""".format(self.receiver_table))
|
||||
self.conn.commit()
|
||||
|
||||
def update_receiver_location(self):
|
||||
"""Updates the receiver location. We need this because we want the actual location for distance calculations."""
|
||||
|
||||
self.cur.execute("""
|
||||
UPDATE receivers AS r
|
||||
SET location = sq.location
|
||||
FROM
|
||||
(SELECT rb.receiver_id,
|
||||
ROW_NUMBER() OVER (PARTITION BY receiver_id),
|
||||
FIRST_VALUE(rb.location) OVER (PARTITION BY receiver_id ORDER BY CASE WHEN location IS NOT NULL THEN timestamp ELSE NULL END NULLS LAST) AS location
|
||||
FROM "{1}" AS rb
|
||||
) AS sq
|
||||
WHERE r.id = sq.receiver_id AND sq.row_number = 1;
|
||||
""".format(self.aircraft_table, self.receiver_table))
|
||||
self.conn.commit()
|
||||
|
||||
def update_receiver_beacons(self):
|
||||
"""Updates the foreign keys. Due to performance reasons we use a new table instead of updating the old."""
|
||||
|
||||
self.cur.execute("""
|
||||
SELECT
|
||||
|
||||
rb.location, rb.altitude, rb.name, rb.receiver_name, rb.dstcall, rb.timestamp,
|
||||
|
||||
rb.version, rb.platform, rb.cpu_load, rb.free_ram, rb.total_ram, rb.ntp_error, rb.rt_crystal_correction, rb.voltage, rb.amperage real,
|
||||
rb.cpu_temp, rb.senders_visible, rb.senders_total, rb.rec_input_noise, rb.senders_signal, rb.senders_messages, rb.good_senders_signal real,
|
||||
rb.good_senders, rb.good_and_bad_senders,
|
||||
|
||||
r.id AS receiver_id
|
||||
INTO "{0}_temp"
|
||||
FROM "{0}" AS rb, receivers AS r
|
||||
WHERE rb.name = r.name;
|
||||
|
||||
DROP TABLE IF EXISTS "{0}";
|
||||
ALTER TABLE "{0}_temp" RENAME TO "{0}";
|
||||
""".format(self.receiver_table))
|
||||
self.conn.commit()
|
||||
|
||||
def update_aircraft_beacons(self):
|
||||
"""Updates the foreign keys and calculates distance/radial and quality and computes the altitude above ground level.
|
||||
Elevation data has to be in the table 'elevation' with srid 4326.
|
||||
Due to performance reasons we use a new table instead of updating the old."""
|
||||
|
||||
self.cur.execute("""
|
||||
SELECT
|
||||
ab.location, ab.altitude, ab.name, ab.dstcall, ab.relay, ab.receiver_name, ab.timestamp, ab.track, ab.ground_speed,
|
||||
|
||||
ab.address_type, ab.aircraft_type, ab.stealth, ab.address, ab.climb_rate, ab.turn_rate, ab.signal_quality, ab.error_count,
|
||||
ab.frequency_offset, ab.gps_quality_horizontal, ab.gps_quality_vertical, ab.software_version, ab.hardware_version, ab.real_address, ab.signal_power,
|
||||
|
||||
ab.location_mgrs,
|
||||
|
||||
d.id AS device_id,
|
||||
r.id AS receiver_id,
|
||||
CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(ST_DistanceSphere(ab.location, r.location) AS REAL) ELSE NULL END AS distance,
|
||||
CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL THEN CAST(degrees(ST_Azimuth(ab.location, r.location)) AS SMALLINT) ELSE NULL END AS radial,
|
||||
CASE WHEN ab.location IS NOT NULL AND r.location IS NOT NULL AND ST_DistanceSphere(ab.location, r.location) > 0 AND ab.signal_quality IS NOT NULL
|
||||
THEN CAST(signal_quality + 20*log(ST_DistanceSphere(ab.location, r.location)/10000) AS REAL)
|
||||
ELSE NULL
|
||||
END AS quality,
|
||||
CAST(ab.altitude - ST_Value(e.rast, ab.location) AS REAL) AS agl
|
||||
|
||||
INTO "{0}_temp"
|
||||
FROM "{0}" AS ab, devices AS d, receivers AS r, elevation AS e
|
||||
WHERE ab.address = d.address AND receiver_name = r.name AND ST_Intersects(e.rast, ab.location);
|
||||
|
||||
DROP TABLE IF EXISTS "{0}";
|
||||
ALTER TABLE "{0}_temp" RENAME TO "{0}";
|
||||
""".format(self.aircraft_table))
|
||||
self.conn.commit()
|
||||
|
||||
def get_merged_aircraft_beacons_subquery(self):
|
||||
"""Some beacons are split into position and status beacon. With this query we merge them into one beacon."""
|
||||
|
||||
return """
|
||||
SELECT
|
||||
MAX(location) AS location,
|
||||
MAX(altitude) AS altitude,
|
||||
name,
|
||||
MAX(dstcall) AS dstcall,
|
||||
MAX(relay) AS relay,
|
||||
receiver_name,
|
||||
timestamp,
|
||||
MAX(track) AS track,
|
||||
MAX(ground_speed) AS ground_speed,
|
||||
|
||||
MAX(address_type) AS address_type,
|
||||
MAX(aircraft_type) AS aircraft_type,
|
||||
CAST(MAX(CAST(stealth AS int)) AS boolean) AS stealth,
|
||||
MAX(address) AS address,
|
||||
MAX(climb_rate) AS climb_rate,
|
||||
MAX(turn_rate) AS turn_rate,
|
||||
MAX(signal_quality) AS signal_quality,
|
||||
MAX(error_count) AS error_count,
|
||||
MAX(frequency_offset) AS frequency_offset,
|
||||
MAX(gps_quality_horizontal) AS gps_quality_horizontal,
|
||||
MAX(gps_quality_vertical) AS gps_quality_vertical,
|
||||
MAX(software_version) AS software_version,
|
||||
MAX(hardware_version) AS hardware_version,
|
||||
MAX(real_address) AS real_address,
|
||||
MAX(signal_power) AS signal_power,
|
||||
|
||||
CAST(MAX(distance) AS REAL) AS distance,
|
||||
CAST(MAX(radial) AS REAL) AS radial,
|
||||
CAST(MAX(quality) AS REAL) AS quality,
|
||||
CAST(MAX(agl) AS REAL) AS agl,
|
||||
MAX(location_mgrs) AS location_mgrs,
|
||||
|
||||
MAX(receiver_id) AS receiver_id,
|
||||
MAX(device_id) AS device_id
|
||||
FROM "{0}" AS ab
|
||||
GROUP BY timestamp, name, receiver_name
|
||||
ORDER BY timestamp, name, receiver_name
|
||||
""".format(self.aircraft_table)
|
||||
|
||||
def get_merged_receiver_beacons_subquery(self):
|
||||
"""Some beacons are split into position and status beacon. With this query we merge them into one beacon."""
|
||||
|
||||
return """
|
||||
SELECT
|
||||
MAX(location) AS location,
|
||||
MAX(altitude) AS altitude,
|
||||
name,
|
||||
receiver_name,
|
||||
MAX(dstcall) AS dstcall,
|
||||
timestamp,
|
||||
|
||||
MAX(version) AS version,
|
||||
MAX(platform) AS platform,
|
||||
MAX(cpu_load) AS cpu_load,
|
||||
MAX(free_ram) AS free_ram,
|
||||
MAX(total_ram) AS total_ram,
|
||||
MAX(ntp_error) AS ntp_error,
|
||||
MAX(rt_crystal_correction) AS rt_crystal_correction,
|
||||
MAX(voltage) AS voltage,
|
||||
MAX(amperage) AS amperage,
|
||||
MAX(cpu_temp) AS cpu_temp,
|
||||
MAX(senders_visible) AS senders_visible,
|
||||
MAX(senders_total) AS senders_total,
|
||||
MAX(rec_input_noise) AS rec_input_noise,
|
||||
MAX(senders_signal) AS senders_signal,
|
||||
MAX(senders_messages) AS senders_messages,
|
||||
MAX(good_senders_signal) AS good_senders_signal,
|
||||
MAX(good_senders) AS good_senders,
|
||||
MAX(good_and_bad_senders) AS good_and_bad_senders,
|
||||
|
||||
MAX(receiver_id) AS receiver_id
|
||||
FROM "{0}" AS rb
|
||||
GROUP BY timestamp, name, receiver_name
|
||||
ORDER BY timestamp, name, receiver_name
|
||||
""".format(self.receiver_table)
|
||||
|
||||
def is_transfered(self):
|
||||
query = """
|
||||
SELECT
|
||||
1
|
||||
FROM ({} LIMIT 1) AS sq, aircraft_beacons AS ab
|
||||
WHERE ab.timestamp = sq.timestamp AND ab.name = sq.name AND ab.receiver_name = sq.receiver_name;
|
||||
""".format(self.get_merged_aircraft_beacons_subquery())
|
||||
|
||||
self.cur.execute(query)
|
||||
return len(self.cur.fetchall()) == 1
|
||||
|
||||
def transfer(self):
|
||||
query = """
|
||||
INSERT INTO aircraft_beacons(location, altitude, name, dstcall, relay, receiver_name, timestamp, track, ground_speed,
|
||||
address_type, aircraft_type, stealth, address, climb_rate, turn_rate, signal_quality, error_count, frequency_offset, gps_quality_horizontal, gps_quality_vertical, software_version, hardware_version, real_address, signal_power,
|
||||
distance, radial, quality, agl, location_mgrs,
|
||||
receiver_id, device_id)
|
||||
{}
|
||||
ON CONFLICT DO NOTHING;
|
||||
""".format(self.get_merged_aircraft_beacons_subquery())
|
||||
|
||||
self.cur.execute(query)
|
||||
self.conn.commit()
|
||||
|
||||
def create_flights2d(self):
|
||||
query = """
|
||||
INSERT INTO flights2d
|
||||
(
|
||||
date,
|
||||
device_id,
|
||||
path
|
||||
)
|
||||
SELECT sq5.date,
|
||||
sq5.device_id,
|
||||
st_collect(sq5.linestring order BY sq5.part) multilinestring
|
||||
FROM (
|
||||
SELECT sq4.timestamp::date AS date,
|
||||
sq4.device_id,
|
||||
sq4.part,
|
||||
st_makeline(sq4.location ORDER BY sq4.timestamp) linestring
|
||||
FROM (
|
||||
SELECT sq3.timestamp,
|
||||
sq3.location,
|
||||
sq3.device_id,
|
||||
sum(sq3.ping) OVER (partition BY sq3.timestamp::date, sq3.device_id ORDER BY sq3.timestamp) part
|
||||
FROM (
|
||||
SELECT sq2.t1 AS timestamp,
|
||||
sq2.l1 AS location,
|
||||
sq2.d1 device_id,
|
||||
CASE
|
||||
WHEN sq2.t1 - sq2.t2 < interval'100s'
|
||||
AND st_distancesphere(sq2.l1, sq2.l2) < 1000 THEN 0
|
||||
ELSE 1
|
||||
END AS ping
|
||||
FROM (
|
||||
SELECT sq.timestamp t1,
|
||||
lag(sq.timestamp) OVER (partition BY sq.timestamp::date, sq.device_id ORDER BY sq.timestamp) t2,
|
||||
sq.location l1,
|
||||
lag(sq.location) OVER (partition BY sq.timestamp::date, sq.device_id ORDER BY sq.timestamp) l2,
|
||||
sq.device_id d1,
|
||||
lag(sq.device_id) OVER (partition BY sq.timestamp::date, sq.device_id ORDER BY sq.timestamp) d2
|
||||
FROM (
|
||||
SELECT timestamp,
|
||||
device_id,
|
||||
location,
|
||||
row_number() OVER (partition BY timestamp::date, device_id, timestamp ORDER BY error_count) message_number
|
||||
FROM {}
|
||||
WHERE device_id IS NOT NULL) sq
|
||||
WHERE sq.message_number = 1 ) sq2 ) sq3 ) sq4
|
||||
GROUP BY sq4.timestamp::date,
|
||||
sq4.device_id,
|
||||
sq4.part ) sq5
|
||||
GROUP BY sq5.date,
|
||||
sq5.device_id
|
||||
ON CONFLICT DO NOTHING;
|
||||
""".format(self.aircraft_table)
|
||||
|
||||
self.cur.execute(query)
|
||||
self.conn.commit()
|
||||
|
||||
def create_gaps2d(self):
|
||||
query = """
|
||||
INSERT INTO gaps2d(date, device_id, path)
|
||||
SELECT sq3.date,
|
||||
sq3.device_id,
|
||||
ST_Collect(sq3.path)
|
||||
FROM (
|
||||
SELECT
|
||||
sq2.t1::DATE AS date,
|
||||
sq2.d1 device_id,
|
||||
st_makeline(sq2.l1, sq2.l2) path
|
||||
FROM
|
||||
(
|
||||
SELECT sq.timestamp t1,
|
||||
lag(sq.timestamp) over ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) t2,
|
||||
sq.location l1,
|
||||
lag(sq.location) over ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) l2,
|
||||
sq.device_id d1,
|
||||
lag(sq.device_id) over ( PARTITION BY sq.timestamp::DATE, sq.device_id ORDER BY sq.timestamp) d2
|
||||
FROM
|
||||
(
|
||||
SELECT timestamp, device_id, location,
|
||||
Row_number() over ( PARTITION BY timestamp::DATE, device_id, timestamp ORDER BY error_count) message_number
|
||||
FROM {}
|
||||
) sq
|
||||
WHERE sq.message_number = 1
|
||||
) sq2
|
||||
WHERE Extract(epoch FROM sq2.t1 - sq2.t2) > 300 AND st_distancesphere(sq2.l1, sq2.l2) / Extract(epoch FROM sq2.t1 - sq2.t2) BETWEEN 15 AND 50
|
||||
) sq3
|
||||
GROUP BY sq3.date, sq3.device_id
|
||||
ON CONFLICT DO NOTHING;
|
||||
""".format(self.aircraft_table)
|
||||
|
||||
self.cur.execute(query)
|
||||
self.conn.commit()
|
||||
|
||||
def convert(sourcefile, datestr, saver):
|
||||
from ogn.gateway.process import string_to_message
|
||||
from ogn.gateway.process_tools import AIRCRAFT_TYPES, RECEIVER_TYPES
|
||||
from datetime import datetime
|
||||
|
||||
fin = open_file(sourcefile)
|
||||
|
||||
# get total lines of the input file
|
||||
try:
|
||||
total_lines = 0
|
||||
for line in fin: # Der Log vom 3.4.2018 und 24.6.2018 und 25.06.2018 und 24.07.2018 geht hier krachen
|
||||
total_lines += 1
|
||||
fin.seek(0)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return
|
||||
|
||||
progress = -1
|
||||
current_line = 0
|
||||
|
||||
print('Start importing ogn-logfile')
|
||||
total_lines = 0
|
||||
for line in fin:
|
||||
total_lines += 1
|
||||
fin.seek(0)
|
||||
|
||||
current_line = 0
|
||||
steps = 100000
|
||||
reference_date = datetime.strptime(datestr + ' 12:00:00', '%Y-%m-%d %H:%M:%S')
|
||||
|
||||
pbar = tqdm(fin, total=total_lines)
|
||||
for line in pbar:
|
||||
pbar.set_description('Importing {}'.format(sourcefile))
|
||||
|
||||
current_line += 1
|
||||
if int(1000 * current_line / total_lines) != progress:
|
||||
progress = round(1000 * current_line / total_lines)
|
||||
print("\rReading line {} ({}%)".format(current_line, progress / 10), end='')
|
||||
if current_line % steps == 0:
|
||||
saver.flush()
|
||||
|
||||
message = string_to_message(line.strip(), reference_date=reference_date)
|
||||
if message is None:
|
||||
print("=====")
|
||||
print(line.strip())
|
||||
continue
|
||||
|
||||
dictfilt = lambda x, y: dict([ (i,x[i]) for i in x if i in set(y) ])
|
||||
|
||||
try:
|
||||
merger.add_message(message)
|
||||
if message['beacon_type'] in AIRCRAFT_TYPES:
|
||||
message = dictfilt(message, ('beacon_type', 'aprs_type', 'location_wkt', 'altitude', 'name', 'dstcall', 'relay', 'receiver_name', 'timestamp', 'track', 'ground_speed',
|
||||
'address_type', 'aircraft_type', 'stealth', 'address', 'climb_rate', 'turn_rate', 'signal_quality', 'error_count', 'frequency_offset', 'gps_quality_horizontal', 'gps_quality_vertical', 'software_version', 'hardware_version', 'real_address', 'signal_power',
|
||||
'distance', 'radial', 'quality', 'agl', 'location_mgrs',
|
||||
'receiver_id', 'device_id'))
|
||||
|
||||
beacon = AircraftBeacon(**message)
|
||||
elif message['beacon_type'] in RECEIVER_TYPES:
|
||||
if 'rec_crystal_correction' in message:
|
||||
del message['rec_crystal_correction']
|
||||
del message['rec_crystal_correction_fine']
|
||||
beacon = ReceiverBeacon(**message)
|
||||
saver.add(beacon)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
merger.flush()
|
||||
saver.close()
|
||||
|
||||
saver.flush()
|
||||
fin.close()
|
||||
|
||||
|
||||
@manager.command
|
||||
def drop_indices():
|
||||
"""Drop indices of AircraftBeacon."""
|
||||
session.execute("""
|
||||
DROP INDEX IF EXISTS idx_aircraft_beacons_location;
|
||||
DROP INDEX IF EXISTS ix_aircraft_beacons_date_device_id_address;
|
||||
DROP INDEX IF EXISTS ix_aircraft_beacons_date_receiver_id_distance;
|
||||
DROP INDEX IF EXISTS ix_aircraft_beacons_timestamp;
|
||||
|
||||
DROP INDEX IF EXISTS idx_receiver_beacons_location;
|
||||
DROP INDEX IF EXISTS ix_receiver_beacons_date_receiver_id;
|
||||
DROP INDEX IF EXISTS ix_receiver_beacons_timestamp;
|
||||
""")
|
||||
print("Dropped indices of AircraftBeacon and ReceiverBeacon")
|
||||
def file_import(path):
|
||||
"""Import APRS logfiles into separate logfile tables."""
|
||||
|
||||
# disable constraint trigger
|
||||
session.execute("""
|
||||
ALTER TABLE aircraft_beacons DISABLE TRIGGER ALL;
|
||||
ALTER TABLE receiver_beacons DISABLE TRIGGER ALL;
|
||||
""")
|
||||
session.commit()
|
||||
print("Disabled constraint triggers")
|
||||
|
||||
|
||||
@manager.command
|
||||
def create_indices():
|
||||
"""Create indices for AircraftBeacon."""
|
||||
session.execute("""
|
||||
CREATE INDEX idx_aircraft_beacons_location ON aircraft_beacons USING GIST(location);
|
||||
CREATE INDEX ix_aircraft_beacons_date_device_id_address ON aircraft_beacons USING BTREE((timestamp::date), device_id, address);
|
||||
CREATE INDEX ix_aircraft_beacons_date_receiver_id_distance ON aircraft_beacons USING BTREE((timestamp::date), receiver_id, distance);
|
||||
CREATE INDEX ix_aircraft_beacons_timestamp ON aircraft_beacons USING BTREE(timestamp);
|
||||
|
||||
CREATE INDEX idx_receiver_beacons_location ON receiver_beacons USING GIST(location);
|
||||
CREATE INDEX ix_receiver_beacons_date_receiver_id ON receiver_beacons USING BTREE((timestamp::date), receiver_id);
|
||||
CREATE INDEX ix_receiver_beacons_timestamp ON receiver_beacons USING BTREE(timestamp);
|
||||
""")
|
||||
print("Created indices for AircraftBeacon and ReceiverBeacon")
|
||||
|
||||
session.execute("""
|
||||
ALTER TABLE aircraft_beacons ENABLE TRIGGER ALL;
|
||||
ALTER TABLE receiver_beacons ENABLE TRIGGER ALL;
|
||||
""")
|
||||
session.commit()
|
||||
print("Enabled constraint triggers")
|
||||
|
||||
|
||||
@manager.command
|
||||
def import_csv_logfile(path, logfile='main.log', loglevel='INFO'):
|
||||
"""Import csv logfile <arg: csv logfile>."""
|
||||
|
||||
import datetime
|
||||
|
||||
import os
|
||||
if os.path.isfile(path):
|
||||
print("{}: Importing file: {}".format(datetime.datetime.now(), path))
|
||||
import_logfile(path)
|
||||
elif os.path.isdir(path):
|
||||
print("{}: Scanning path: {}".format(datetime.datetime.now(), path))
|
||||
for filename in sorted(os.listdir(path)):
|
||||
print("{}: Importing file: {}".format(datetime.datetime.now(), filename))
|
||||
import_logfile(os.path.join(path, filename))
|
||||
else:
|
||||
print("{}: Path {} not found.".format(datetime.datetime.now(), path))
|
||||
|
||||
print("{}: Finished.".format(datetime.datetime.now()))
|
||||
|
||||
|
||||
def import_logfile(path):
|
||||
import os
|
||||
import re
|
||||
|
||||
head, tail = os.path.split(path)
|
||||
match = re.search('^.+\.csv\_(\d{4}\-\d{2}\-\d{2}).+?$', tail)
|
||||
if match:
|
||||
reference_date_string = match.group(1)
|
||||
else:
|
||||
print("filename '{}' does not match pattern. Skipping".format(path))
|
||||
# Get Filepaths and dates to import
|
||||
results = list()
|
||||
for (root, dirs, files) in os.walk(path):
|
||||
for file in sorted(files):
|
||||
match = re.match('OGN_log\.txt_([0-9]{4}\-[0-9]{2}\-[0-9]{2})\.gz$', file)
|
||||
if match:
|
||||
results.append({'filepath': os.path.join(root, file),
|
||||
'datestr': match.group(1)})
|
||||
|
||||
with LogfileDbSaver() as saver:
|
||||
already_imported = saver.get_datestrs()
|
||||
|
||||
results = list(filter(lambda x: x['datestr'] not in already_imported, results))
|
||||
|
||||
pbar = tqdm(results)
|
||||
for result in pbar:
|
||||
filepath = result['filepath']
|
||||
datestr = result['datestr']
|
||||
pbar.set_description("Importing data for {}".format(datestr))
|
||||
|
||||
saver.set_datestr(datestr)
|
||||
saver.create_tables()
|
||||
convert(filepath, datestr, saver)
|
||||
saver.add_missing_devices()
|
||||
saver.add_missing_receivers()
|
||||
|
||||
|
||||
@manager.command
|
||||
def update():
|
||||
"""Update beacons (add foreign keys, compute distance, bearing, ags, etc.) in separate logfile tables."""
|
||||
|
||||
with LogfileDbSaver() as saver:
|
||||
datestrs = saver.get_datestrs(no_index_only=True)
|
||||
pbar = tqdm(datestrs)
|
||||
for datestr in pbar:
|
||||
pbar.set_description("Updating relations for {}".format(datestr))
|
||||
saver.set_datestr(datestr)
|
||||
saver.update_receiver_location()
|
||||
saver.update_aircraft_beacons()
|
||||
saver.update_receiver_location()
|
||||
saver.create_indices()
|
||||
|
||||
@manager.command
|
||||
def transfer():
|
||||
"""Transfer beacons from separate logfile tables to beacon table."""
|
||||
|
||||
with LogfileDbSaver() as saver:
|
||||
datestrs = saver.get_datestrs()
|
||||
pbar = tqdm(datestrs)
|
||||
for datestr in pbar:
|
||||
pbar.set_description("Transfer beacons for {}".format(datestr))
|
||||
saver.set_datestr(datestr)
|
||||
if not saver.is_transfered():
|
||||
saver.transfer()
|
||||
|
||||
@manager.command
|
||||
def create_flights2d():
|
||||
"""Create complete flight traces from logfile tables."""
|
||||
|
||||
with LogfileDbSaver() as saver:
|
||||
datestrs = saver.get_datestrs()
|
||||
pbar = tqdm(datestrs)
|
||||
for datestr in pbar:
|
||||
pbar.set_description("Create Flights2D for {}".format(datestr))
|
||||
saver.set_datestr(datestr)
|
||||
saver.create_flights2d()
|
||||
|
||||
@manager.command
|
||||
def create_gaps2d():
|
||||
"""Create 'gaps' from logfile tables."""
|
||||
|
||||
with LogfileDbSaver() as saver:
|
||||
datestrs = saver.get_datestrs()
|
||||
pbar = tqdm(datestrs)
|
||||
for datestr in pbar:
|
||||
pbar.set_description("Create Gaps2D for {}".format(datestr))
|
||||
saver.set_datestr(datestr)
|
||||
saver.create_gaps2d()
|
||||
|
||||
@manager.command
|
||||
def file_export(path):
|
||||
"""Export separate logfile tables to csv files. They can be used for fast bulk import with sql COPY command."""
|
||||
|
||||
import os
|
||||
if not os.path.isdir(path):
|
||||
print("'{}' is not a path. Exiting")
|
||||
return
|
||||
|
||||
f = open_file(path)
|
||||
header = f.readline().strip()
|
||||
f.close()
|
||||
|
||||
aircraft_beacon_header = ','.join(AircraftBeacon.get_csv_columns())
|
||||
receiver_beacon_header = ','.join(ReceiverBeacon.get_csv_columns())
|
||||
|
||||
if header == aircraft_beacon_header:
|
||||
import_aircraft_beacon_logfile(path)
|
||||
elif header == receiver_beacon_header:
|
||||
import_receiver_beacon_logfile(path)
|
||||
else:
|
||||
s1 = header
|
||||
s2 = ','.join(AircraftBeacon.get_csv_columns())
|
||||
print(s1)
|
||||
print(s2)
|
||||
print([i for i in range(len(s1)) if s1[i] != s2[i]])
|
||||
print("Unknown file type: {}".format(tail))
|
||||
with LogfileDbSaver() as saver:
|
||||
datestrs = saver.get_datestrs()
|
||||
pbar = tqdm(datestrs)
|
||||
for datestr in pbar:
|
||||
pbar.set_description("Exporting data for {}".format(datestr))
|
||||
saver.set_datestr(datestr)
|
||||
saver.export_to_path(path)
|
||||
|
||||
|
||||
def import_aircraft_beacon_logfile(csv_logfile):
|
||||
SQL_TEMPTABLE_STATEMENT = """
|
||||
DROP TABLE IF EXISTS aircraft_beacons_temp;
|
||||
CREATE TABLE aircraft_beacons_temp(
|
||||
location geometry,
|
||||
altitude real,
|
||||
name character varying,
|
||||
dstcall character varying,
|
||||
relay character varying,
|
||||
receiver_name character varying(9),
|
||||
"timestamp" timestamp without time zone,
|
||||
track smallint,
|
||||
ground_speed real,
|
||||
|
||||
address_type smallint,
|
||||
aircraft_type smallint,
|
||||
stealth boolean,
|
||||
address character varying,
|
||||
climb_rate real,
|
||||
turn_rate real,
|
||||
signal_quality real,
|
||||
error_count smallint,
|
||||
frequency_offset real,
|
||||
gps_quality_horizontal smallint,
|
||||
gps_quality_vertical smallint,
|
||||
software_version real,
|
||||
hardware_version smallint,
|
||||
real_address character varying(6),
|
||||
signal_power real,
|
||||
|
||||
distance real,
|
||||
radial smallint,
|
||||
quality real,
|
||||
location_mgrs character varying(15)
|
||||
);
|
||||
"""
|
||||
|
||||
session.execute(SQL_TEMPTABLE_STATEMENT)
|
||||
|
||||
SQL_COPY_STATEMENT = """
|
||||
COPY aircraft_beacons_temp(%s) FROM STDIN WITH
|
||||
CSV
|
||||
HEADER
|
||||
DELIMITER AS ','
|
||||
"""
|
||||
|
||||
file = open_file(csv_logfile)
|
||||
column_names = ','.join(AircraftBeacon.get_csv_columns())
|
||||
sql = SQL_COPY_STATEMENT % column_names
|
||||
|
||||
print("Start importing logfile: {}".format(csv_logfile))
|
||||
|
||||
conn = session.connection().connection
|
||||
cursor = conn.cursor()
|
||||
cursor.copy_expert(sql=sql, file=file)
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
file.close()
|
||||
print("Read logfile into temporary table")
|
||||
|
||||
# create device if not exist
|
||||
session.execute("""
|
||||
INSERT INTO devices(address)
|
||||
SELECT DISTINCT(t.address)
|
||||
FROM aircraft_beacons_temp t
|
||||
WHERE NOT EXISTS (SELECT 1 FROM devices d WHERE d.address = t.address)
|
||||
""")
|
||||
print("Inserted missing Devices")
|
||||
|
||||
# create receiver if not exist
|
||||
session.execute("""
|
||||
INSERT INTO receivers(name)
|
||||
SELECT DISTINCT(t.receiver_name)
|
||||
FROM aircraft_beacons_temp t
|
||||
WHERE NOT EXISTS (SELECT 1 FROM receivers r WHERE r.name = t.receiver_name)
|
||||
""")
|
||||
print("Inserted missing Receivers")
|
||||
|
||||
session.execute("""
|
||||
INSERT INTO aircraft_beacons(location, altitude, name, dstcall, relay, receiver_name, timestamp, track, ground_speed,
|
||||
address_type, aircraft_type, stealth, address, climb_rate, turn_rate, signal_quality, error_count, frequency_offset, gps_quality_horizontal, gps_quality_vertical, software_version, hardware_version, real_address, signal_power,
|
||||
distance, radial, quality, location_mgrs,
|
||||
receiver_id, device_id)
|
||||
SELECT t.location, t.altitude, t.name, t.dstcall, t.relay, t.receiver_name, t.timestamp, t.track, t.ground_speed,
|
||||
t.address_type, t.aircraft_type, t.stealth, t.address, t.climb_rate, t.turn_rate, t.signal_quality, t.error_count, t.frequency_offset, t.gps_quality_horizontal, t.gps_quality_vertical, t.software_version, t.hardware_version, t.real_address, t.signal_power,
|
||||
t.distance, t.radial, t.quality, t.location_mgrs,
|
||||
r.id, d.id
|
||||
FROM aircraft_beacons_temp t, receivers r, devices d
|
||||
WHERE t.receiver_name = r.name AND t.address = d.address
|
||||
""")
|
||||
print("Wrote AircraftBeacons from temporary table into final table")
|
||||
|
||||
session.execute("""DROP TABLE aircraft_beacons_temp""")
|
||||
print("Dropped temporary table")
|
||||
|
||||
session.commit()
|
||||
print("Finished")
|
||||
|
||||
|
||||
def import_receiver_beacon_logfile(csv_logfile):
|
||||
"""Import csv logfile <arg: csv logfile>."""
|
||||
|
||||
SQL_TEMPTABLE_STATEMENT = """
|
||||
DROP TABLE IF EXISTS receiver_beacons_temp;
|
||||
CREATE TABLE receiver_beacons_temp(
|
||||
location geometry,
|
||||
altitude real,
|
||||
name character varying,
|
||||
receiver_name character varying(9),
|
||||
dstcall character varying,
|
||||
"timestamp" timestamp without time zone,
|
||||
|
||||
version character varying,
|
||||
platform character varying,
|
||||
cpu_load real,
|
||||
free_ram real,
|
||||
total_ram real,
|
||||
ntp_error real,
|
||||
rt_crystal_correction real,
|
||||
voltage real,
|
||||
amperage real,
|
||||
cpu_temp real,
|
||||
senders_visible integer,
|
||||
senders_total integer,
|
||||
rec_input_noise real,
|
||||
senders_signal real,
|
||||
senders_messages integer,
|
||||
good_senders_signal real,
|
||||
good_senders integer,
|
||||
good_and_bad_senders integer
|
||||
);
|
||||
"""
|
||||
|
||||
session.execute(SQL_TEMPTABLE_STATEMENT)
|
||||
|
||||
SQL_COPY_STATEMENT = """
|
||||
COPY receiver_beacons_temp(%s) FROM STDIN WITH
|
||||
CSV
|
||||
HEADER
|
||||
DELIMITER AS ','
|
||||
"""
|
||||
|
||||
file = open_file(csv_logfile)
|
||||
column_names = ','.join(ReceiverBeacon.get_csv_columns())
|
||||
sql = SQL_COPY_STATEMENT % column_names
|
||||
|
||||
print("Start importing logfile: {}".format(csv_logfile))
|
||||
|
||||
conn = session.connection().connection
|
||||
cursor = conn.cursor()
|
||||
cursor.copy_expert(sql=sql, file=file)
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
file.close()
|
||||
print("Read logfile into temporary table")
|
||||
|
||||
# create receiver if not exist
|
||||
session.execute("""
|
||||
INSERT INTO receivers(name)
|
||||
SELECT DISTINCT(t.name)
|
||||
FROM receiver_beacons_temp t
|
||||
WHERE NOT EXISTS (SELECT 1 FROM receivers r WHERE r.name = t.name)
|
||||
""")
|
||||
print("Inserted missing Receivers")
|
||||
|
||||
session.execute("""
|
||||
INSERT INTO receiver_beacons(location, altitude, name, dstcall, receiver_name, timestamp,
|
||||
version, platform, cpu_load, free_ram, total_ram, ntp_error, rt_crystal_correction, voltage,amperage, cpu_temp, senders_visible, senders_total, rec_input_noise, senders_signal, senders_messages, good_senders_signal, good_senders, good_and_bad_senders,
|
||||
receiver_id)
|
||||
SELECT t.location, t.altitude, t.name, t.dstcall, t.receiver_name, t.timestamp,
|
||||
t.version, t.platform, t.cpu_load, t.free_ram, t.total_ram, t.ntp_error, t.rt_crystal_correction, t.voltage,amperage, t.cpu_temp, t.senders_visible, t.senders_total, t.rec_input_noise, t.senders_signal, t.senders_messages, t.good_senders_signal, t.good_senders, t.good_and_bad_senders,
|
||||
r.id
|
||||
FROM receiver_beacons_temp t, receivers r
|
||||
WHERE t.name = r.name
|
||||
""")
|
||||
print("Wrote ReceiverBeacons from temporary table into final table")
|
||||
|
||||
session.execute("""DROP TABLE receiver_beacons_temp""")
|
||||
print("Dropped temporary table")
|
||||
|
||||
session.commit()
|
||||
print("Finished")
|
||||
if __name__ == '__main__':
|
||||
file_export()
|
||||
|
|
|
@ -21,8 +21,12 @@ def init():
|
|||
|
||||
session.execute('CREATE EXTENSION IF NOT EXISTS postgis;')
|
||||
session.execute('CREATE EXTENSION IF NOT EXISTS btree_gist;')
|
||||
session.execute('CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;')
|
||||
session.commit()
|
||||
Base.metadata.create_all(engine)
|
||||
session.execute("SELECT create_hypertable('aircraft_beacons', 'timestamp', chunk_target_size => '2GB');")
|
||||
session.execute("SELECT create_hypertable('receiver_beacons', 'timestamp', chunk_target_size => '2GB');")
|
||||
session.commit()
|
||||
#alembic_cfg = Config(ALEMBIC_CONFIG_FILE)
|
||||
#command.stamp(alembic_cfg, "head")
|
||||
print("Done.")
|
||||
|
|
|
@ -34,28 +34,31 @@ def create_stats():
|
|||
|
||||
|
||||
@manager.command
|
||||
def update_receivers():
|
||||
def add_missing_receivers():
|
||||
"""Update receivers with data from stats."""
|
||||
|
||||
|
||||
result = update_receivers_stats(session=session)
|
||||
print(result)
|
||||
|
||||
|
||||
|
||||
@manager.command
|
||||
def update_devices():
|
||||
def add_missing_devices():
|
||||
"""Update devices with data from stats."""
|
||||
|
||||
|
||||
result = update_devices_stats(session=session)
|
||||
print(result)
|
||||
|
||||
|
||||
@manager.command
|
||||
def create_flights():
|
||||
"""Create Flights."""
|
||||
|
||||
|
||||
for single_date in (date(2016, 8, 10) + timedelta(days=n) for n in range(800)):
|
||||
result = _create_flights2d(session=session, date=single_date)
|
||||
#result = _create_flights3d(session=session, date=single_date)
|
||||
print(result)
|
||||
|
||||
|
||||
def _create_flights2d(session=None, date=None):
|
||||
SQL = """
|
||||
INSERT INTO flights2d
|
||||
|
@ -66,7 +69,7 @@ def _create_flights2d(session=None, date=None):
|
|||
)
|
||||
SELECT sq5.date,
|
||||
sq5.device_id,
|
||||
st_collect(sq5.linestring order BY sq5.part) multilinestring
|
||||
st_collect(sq5.linestring ORDER BY sq5.part) multilinestring
|
||||
FROM (
|
||||
SELECT sq4.timestamp::date AS date,
|
||||
sq4.device_id,
|
||||
|
@ -106,6 +109,7 @@ def _create_flights2d(session=None, date=None):
|
|||
sq4.part ) sq5
|
||||
GROUP BY sq5.date,
|
||||
sq5.device_id
|
||||
ON CONFLICT DO NOTHING;
|
||||
"""
|
||||
|
||||
result = session.execute(SQL.format(date.strftime("%Y-%m-%d")))
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
import logging
|
||||
from math import log10
|
||||
|
||||
from mgrs import MGRS
|
||||
|
||||
from ogn.utils import haversine
|
||||
from ogn.commands.dbutils import session
|
||||
from ogn.model import AircraftBeacon, ReceiverBeacon, Location
|
||||
from ogn.parser import parse, ParseError
|
||||
|
@ -14,18 +12,10 @@ logger = logging.getLogger(__name__)
|
|||
myMGRS = MGRS()
|
||||
|
||||
|
||||
|
||||
def _replace_lonlat_with_wkt(message, reference_receiver=None):
|
||||
def _replace_lonlat_with_wkt(message):
|
||||
latitude = message['latitude']
|
||||
longitude = message['longitude']
|
||||
|
||||
if reference_receiver is not None:
|
||||
distance,bearing = haversine(reference_receiver['latitude'], reference_receiver['longitude'], latitude, longitude)
|
||||
message['distance'] = distance
|
||||
message['radial'] = round(bearing)
|
||||
if 'signal_quality' in message and message['signal_quality'] is not None and distance >= 1:
|
||||
message['quality'] = message['signal_quality'] + 20 * log10(message['distance'] / 10000) # normalized to 10km
|
||||
|
||||
location = Location(longitude, latitude)
|
||||
message['location_wkt'] = location.to_wkt()
|
||||
message['location_mgrs'] = myMGRS.toMGRS(latitude, longitude).decode('utf-8')
|
||||
|
@ -34,38 +24,36 @@ def _replace_lonlat_with_wkt(message, reference_receiver=None):
|
|||
return message
|
||||
|
||||
|
||||
receivers = dict()
|
||||
|
||||
def string_to_message(raw_string, reference_date):
|
||||
global receivers
|
||||
|
||||
try:
|
||||
message = parse(raw_string, reference_date)
|
||||
except NotImplementedError as e:
|
||||
logger.error('No parser implemented for message: {}'.format(raw_string))
|
||||
#logger.w('No parser implemented for message: {}'.format(raw_string))
|
||||
return None
|
||||
except ParseError as e:
|
||||
logger.error('Parsing error with message: {}'.format(raw_string))
|
||||
#logger.error('Parsing error with message: {}'.format(raw_string))
|
||||
return None
|
||||
except TypeError as e:
|
||||
logger.error('TypeError with message: {}'.format(raw_string))
|
||||
#logger.error('TypeError with message: {}'.format(raw_string))
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(raw_string)
|
||||
logger.error(e)
|
||||
#logger.error(raw_string)
|
||||
#logger.error(e)
|
||||
return None
|
||||
|
||||
# update reference receivers and distance to the receiver
|
||||
if message['aprs_type'] == 'position':
|
||||
if message['beacon_type'] in RECEIVER_TYPES:
|
||||
receivers.update({message['name']: {'latitude': message['latitude'], 'longitude': message['longitude']}})
|
||||
message = _replace_lonlat_with_wkt(message)
|
||||
elif message['beacon_type'] in AIRCRAFT_TYPES:
|
||||
reference_receiver = receivers.get(message['receiver_name'])
|
||||
message = _replace_lonlat_with_wkt(message, reference_receiver=reference_receiver)
|
||||
if 'gps_quality' in message and message['gps_quality'] is not None and 'horizontal' in message['gps_quality']:
|
||||
message['gps_quality_horizontal'] = message['gps_quality']['horizontal']
|
||||
message['gps_quality_vertical'] = message['gps_quality']['vertical']
|
||||
message = _replace_lonlat_with_wkt(message)
|
||||
if 'gps_quality' in message:
|
||||
if message['gps_quality'] is not None and 'horizontal' in message['gps_quality']:
|
||||
message['gps_quality_horizontal'] = message['gps_quality']['horizontal']
|
||||
message['gps_quality_vertical'] = message['gps_quality']['vertical']
|
||||
del message['gps_quality']
|
||||
|
||||
# update raw_message
|
||||
message['raw_message'] = raw_string
|
||||
|
|
|
@ -159,18 +159,18 @@ class FileSaver:
|
|||
raise FileExistsError
|
||||
|
||||
self.aircraft_writer = csv.writer(self.fout_ab, delimiter=',')
|
||||
self.aircraft_writer.writerow(AircraftBeacon.get_csv_columns())
|
||||
self.aircraft_writer.writerow(AircraftBeacon.get_columns())
|
||||
|
||||
self.receiver_writer = csv.writer(self.fout_rb, delimiter=',')
|
||||
self.receiver_writer.writerow(ReceiverBeacon.get_csv_columns())
|
||||
self.receiver_writer.writerow(ReceiverBeacon.get_columns())
|
||||
|
||||
return 1
|
||||
|
||||
def add_message(self, beacon):
|
||||
if isinstance(beacon, AircraftBeacon):
|
||||
self.aircraft_messages.append(beacon.get_csv_values())
|
||||
self.aircraft_messages.append(beacon.get_values())
|
||||
elif isinstance(beacon, ReceiverBeacon):
|
||||
self.receiver_messages.append(beacon.get_csv_values())
|
||||
self.receiver_messages.append(beacon.get_values())
|
||||
|
||||
def flush(self):
|
||||
self.aircraft_writer.writerows(self.aircraft_messages)
|
||||
|
|
|
@ -24,36 +24,13 @@ class AircraftBeacon(Beacon):
|
|||
real_address = Column(String(6))
|
||||
signal_power = Column(Float(precision=2))
|
||||
proximity = None
|
||||
|
||||
# Tracker stuff (position message)
|
||||
flightlevel = None
|
||||
|
||||
# Tracker stuff (status message)
|
||||
gps_satellites = None
|
||||
gps_quality = None
|
||||
gps_altitude = None
|
||||
pressure = None
|
||||
temperature = None
|
||||
humidity = None
|
||||
voltage = None
|
||||
transmitter_power = None
|
||||
noise_level = None
|
||||
relays = None
|
||||
|
||||
# Spider stuff
|
||||
spider_id = None
|
||||
model = None
|
||||
status = None
|
||||
|
||||
# Naviter stuff
|
||||
do_not_track = None
|
||||
reserved = None
|
||||
|
||||
# Calculated values
|
||||
distance = Column(Float(precision=2))
|
||||
radial = Column(SmallInteger)
|
||||
quality = Column(Float(precision=2)) # signal quality normalized to 10km
|
||||
location_mgrs = Column(String(15))
|
||||
agl = Column(Float(precision=2))
|
||||
|
||||
# Relations
|
||||
receiver_id = Column(Integer, ForeignKey('receivers.id', ondelete='SET NULL'))
|
||||
|
@ -63,8 +40,7 @@ class AircraftBeacon(Beacon):
|
|||
device = relationship('Device', foreign_keys=[device_id], backref='aircraft_beacons')
|
||||
|
||||
# Multi-column indices
|
||||
Index('ix_aircraft_beacons_receiver_id_receiver_name', 'receiver_id', 'receiver_name')
|
||||
Index('ix_aircraft_beacons_device_id_address', 'device_id', 'address')
|
||||
Index('ix_aircraft_beacons_receiver_id_distance', 'receiver_id', 'distance')
|
||||
Index('ix_aircraft_beacons_device_id_timestamp', 'device_id', 'timestamp')
|
||||
|
||||
def __repr__(self):
|
||||
|
@ -91,42 +67,42 @@ class AircraftBeacon(Beacon):
|
|||
self.location_mgrs)
|
||||
|
||||
@classmethod
|
||||
def get_csv_columns(self):
|
||||
return['location',
|
||||
'altitude',
|
||||
'name',
|
||||
'dstcall',
|
||||
'relay',
|
||||
'receiver_name',
|
||||
'timestamp',
|
||||
'track',
|
||||
'ground_speed',
|
||||
|
||||
#'raw_message',
|
||||
#'reference_timestamp',
|
||||
def get_columns(self):
|
||||
return ['location',
|
||||
'altitude',
|
||||
'name',
|
||||
'dstcall',
|
||||
'relay',
|
||||
'receiver_name',
|
||||
'timestamp',
|
||||
'track',
|
||||
'ground_speed',
|
||||
|
||||
#'raw_message',
|
||||
#'reference_timestamp',
|
||||
|
||||
'address_type',
|
||||
'aircraft_type',
|
||||
'stealth',
|
||||
'address',
|
||||
'climb_rate',
|
||||
'turn_rate',
|
||||
'signal_quality',
|
||||
'error_count',
|
||||
'frequency_offset',
|
||||
'gps_quality_horizontal',
|
||||
'gps_quality_vertical',
|
||||
'software_version',
|
||||
'hardware_version',
|
||||
'real_address',
|
||||
'signal_power',
|
||||
|
||||
'distance',
|
||||
'radial',
|
||||
'quality',
|
||||
'location_mgrs']
|
||||
|
||||
'address_type',
|
||||
'aircraft_type',
|
||||
'stealth',
|
||||
'address',
|
||||
'climb_rate',
|
||||
'turn_rate',
|
||||
'signal_quality',
|
||||
'error_count',
|
||||
'frequency_offset',
|
||||
'gps_quality_horizontal',
|
||||
'gps_quality_vertical',
|
||||
'software_version',
|
||||
'hardware_version',
|
||||
'real_address',
|
||||
'signal_power',
|
||||
|
||||
'distance',
|
||||
'radial',
|
||||
'quality',
|
||||
'location_mgrs']
|
||||
|
||||
def get_csv_values(self):
|
||||
def get_values(self):
|
||||
return [
|
||||
self.location_wkt,
|
||||
int(self.altitude) if self.altitude else None,
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
from geoalchemy2.shape import to_shape
|
||||
from geoalchemy2.types import Geometry
|
||||
from sqlalchemy import Column, String, Integer, SmallInteger, Float, DateTime
|
||||
from sqlalchemy import Column, String, Integer, SmallInteger, Float, DateTime, BigInteger
|
||||
from sqlalchemy.ext.declarative import AbstractConcreteBase
|
||||
|
||||
from .base import Base
|
||||
|
@ -8,17 +8,15 @@ from .geo import Location
|
|||
|
||||
|
||||
class Beacon(AbstractConcreteBase, Base):
|
||||
id = Column(Integer, primary_key=True)
|
||||
|
||||
# APRS data
|
||||
location_wkt = Column('location', Geometry('POINT', srid=4326))
|
||||
altitude = Column(Float(precision=2))
|
||||
|
||||
name = Column(String)
|
||||
name = Column(String, primary_key=True)
|
||||
dstcall = Column(String)
|
||||
relay = Column(String)
|
||||
receiver_name = Column(String(9))
|
||||
timestamp = Column(DateTime, index=True)
|
||||
receiver_name = Column(String(9), primary_key=True)
|
||||
timestamp = Column(DateTime, primary_key=True)
|
||||
symboltable = None
|
||||
symbolcode = None
|
||||
track = Column(SmallInteger)
|
||||
|
|
|
@ -8,14 +8,12 @@ from .base import Base
|
|||
class Flight2D(Base):
|
||||
__tablename__ = "flights2d"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
|
||||
date = Column(Date)
|
||||
date = Column(Date, primary_key=True)
|
||||
|
||||
path_wkt = Column('path', Geometry('MULTILINESTRING', srid=4326))
|
||||
|
||||
# Relations
|
||||
device_id = Column(Integer, ForeignKey('devices.id', ondelete='SET NULL'), index=True)
|
||||
device_id = Column(Integer, ForeignKey('devices.id', ondelete='SET NULL'), primary_key=True)
|
||||
device = relationship('Device', foreign_keys=[device_id], backref='flights2d')
|
||||
|
||||
def __repr__(self):
|
||||
|
|
|
@ -63,37 +63,37 @@ class ReceiverBeacon(Beacon):
|
|||
self.good_and_bad_senders)
|
||||
|
||||
@classmethod
|
||||
def get_csv_columns(self):
|
||||
return['location',
|
||||
'altitude',
|
||||
'name',
|
||||
'dstcall',
|
||||
'receiver_name',
|
||||
'timestamp',
|
||||
|
||||
# 'raw_message',
|
||||
# 'reference_timestamp',
|
||||
def get_columns(self):
|
||||
return ['location',
|
||||
'altitude',
|
||||
'name',
|
||||
'dstcall',
|
||||
'receiver_name',
|
||||
'timestamp',
|
||||
|
||||
# 'raw_message',
|
||||
# 'reference_timestamp',
|
||||
|
||||
'version',
|
||||
'platform',
|
||||
'cpu_load',
|
||||
'free_ram',
|
||||
'total_ram',
|
||||
'ntp_error',
|
||||
'rt_crystal_correction',
|
||||
'voltage',
|
||||
'amperage',
|
||||
'cpu_temp',
|
||||
'senders_visible',
|
||||
'senders_total',
|
||||
'rec_input_noise',
|
||||
'senders_signal',
|
||||
'senders_messages',
|
||||
'good_senders_signal',
|
||||
'good_senders',
|
||||
'good_and_bad_senders']
|
||||
|
||||
'version',
|
||||
'platform',
|
||||
'cpu_load',
|
||||
'free_ram',
|
||||
'total_ram',
|
||||
'ntp_error',
|
||||
'rt_crystal_correction',
|
||||
'voltage',
|
||||
'amperage',
|
||||
'cpu_temp',
|
||||
'senders_visible',
|
||||
'senders_total',
|
||||
'rec_input_noise',
|
||||
'senders_signal',
|
||||
'senders_messages',
|
||||
'good_senders_signal',
|
||||
'good_senders',
|
||||
'good_and_bad_senders']
|
||||
|
||||
def get_csv_values(self):
|
||||
def get_values(self):
|
||||
return [
|
||||
self.location_wkt,
|
||||
int(self.altitude) if self.altitude else None,
|
||||
|
|
36
ogn/utils.py
36
ogn/utils.py
|
@ -51,6 +51,7 @@ def get_ddb(csv_file=None, address_origin=DeviceInfoOrigin.unknown):
|
|||
|
||||
return device_infos
|
||||
|
||||
|
||||
def get_flarmnet(fln_file=None, address_origin=DeviceInfoOrigin.flarmnet):
|
||||
if fln_file is None:
|
||||
r = requests.get(FLARMNET_URL)
|
||||
|
@ -58,7 +59,7 @@ def get_flarmnet(fln_file=None, address_origin=DeviceInfoOrigin.flarmnet):
|
|||
else:
|
||||
with open(fln_file, 'r') as file:
|
||||
rows = [bytes.fromhex(line.strip()).decode('latin1') for line in file.readlines() in len(line) == 172]
|
||||
|
||||
|
||||
device_infos = list()
|
||||
for row in rows:
|
||||
device_info = DeviceInfo()
|
||||
|
@ -66,11 +67,12 @@ def get_flarmnet(fln_file=None, address_origin=DeviceInfoOrigin.flarmnet):
|
|||
device_info.aircraft = row[48:69].strip()
|
||||
device_info.registration = row[69:76].strip()
|
||||
device_info.competition = row[76:79].strip()
|
||||
|
||||
|
||||
device_infos.append(device_info)
|
||||
|
||||
return device_infos
|
||||
|
||||
|
||||
def get_trackable(ddb):
|
||||
l = []
|
||||
for i in ddb:
|
||||
|
@ -78,6 +80,7 @@ def get_trackable(ddb):
|
|||
l.append("{}{}".format(address_prefixes[i.address_type], i.address))
|
||||
return l
|
||||
|
||||
|
||||
def get_geolocator():
|
||||
geolocator = Nominatim()
|
||||
|
||||
|
@ -91,6 +94,7 @@ def get_geolocator():
|
|||
|
||||
return geolocator
|
||||
|
||||
|
||||
def get_country_code(latitude, longitude):
|
||||
geolocator = get_geolocator()
|
||||
try:
|
||||
|
@ -145,32 +149,8 @@ def open_file(filename):
|
|||
a = f.read(2)
|
||||
f.close()
|
||||
if (a == b'\x1f\x8b'):
|
||||
f = gzip.open(filename, 'rt')
|
||||
f = gzip.open(filename, 'rt', encoding="latin-1")
|
||||
return f
|
||||
else:
|
||||
f = open(filename, 'rt')
|
||||
f = open(filename, 'rt', encoding="latin-1")
|
||||
return f
|
||||
|
||||
from math import radians, cos, sin, asin, sqrt, atan2, degrees
|
||||
|
||||
def haversine(lat1, lon1, lat2, lon2):
|
||||
"""
|
||||
Calculate the great circle distance between two points
|
||||
on the earth (specified in decimal degrees)
|
||||
"""
|
||||
# convert decimal degrees to radians
|
||||
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
|
||||
|
||||
# haversine formula
|
||||
dlon = lon2 - lon1
|
||||
dlat = lat2 - lat1
|
||||
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
|
||||
c = 2 * asin(sqrt(a))
|
||||
r = 6371000.785 # Radius of earth in meters
|
||||
d = c * r
|
||||
|
||||
# calculate bearing
|
||||
bearing = atan2(sin(dlon)*cos(lat2), cos(lat1)*sin(lat2)-sin(lat1)*cos(lat2)*cos(dlon))
|
||||
bearing = (degrees(bearing) + 360) % 360
|
||||
|
||||
return d,bearing
|
|
@ -2,7 +2,7 @@ import unittest
|
|||
import os
|
||||
|
||||
from ogn.model import AircraftBeacon, ReceiverBeacon, Device, Receiver, DeviceInfo
|
||||
from ogn.collect.database import update_devices, update_receivers, import_ddb_file
|
||||
from ogn.collect.database import add_missing_devices, add_missing_receivers, import_ddb_file
|
||||
|
||||
|
||||
class TestDB(unittest.TestCase):
|
||||
|
@ -37,8 +37,8 @@ class TestDB(unittest.TestCase):
|
|||
r01 = Receiver(name='Koenigsdf')
|
||||
session.bulk_save_objects([ab01, rb01, d01, r01])
|
||||
|
||||
update_devices(session)
|
||||
update_receivers(session)
|
||||
add_missing_devices(session)
|
||||
add_missing_receivers(session)
|
||||
|
||||
aircraft_beacons = session.query(AircraftBeacon).all()
|
||||
self.assertEqual(len(aircraft_beacons), 1)
|
||||
|
|
Ładowanie…
Reference in New Issue