ogn-python/ogn_python/gateway/bulkimport.py

284 wiersze
11 KiB
Python
Czysty Zwykły widok Historia

2019-03-04 21:14:13 +00:00
from datetime import datetime, timedelta
from io import StringIO
from flask.cli import AppGroup
import click
from tqdm import tqdm
from mgrs import MGRS
from ogn.parser import parse, ParseError
from ogn_python.model import AircraftBeacon, ReceiverBeacon, Location
from ogn_python.utils import open_file
from ogn_python.gateway.process_tools import *
from ogn_python import db
from ogn_python import app
user_cli = AppGroup('bulkimport')
user_cli.help = "Tools for accelerated data import."
# define message types we want to proceed
AIRCRAFT_BEACON_TYPES = ['aprs_aircraft', 'flarm', 'tracker', 'fanet', 'lt24', 'naviter', 'skylines', 'spider', 'spot']
RECEIVER_BEACON_TYPES = ['aprs_receiver', 'receiver']
# define fields we want to proceed
BEACON_KEY_FIELDS = ['name', 'receiver_name', 'timestamp']
AIRCRAFT_BEACON_FIELDS = ['location', 'altitude', 'dstcall', 'relay', 'track', 'ground_speed', 'address_type', 'aircraft_type', 'stealth', 'address', 'climb_rate', 'turn_rate', 'signal_quality', 'error_count', 'frequency_offset', 'gps_quality_horizontal', 'gps_quality_vertical', 'software_version', 'hardware_version', 'real_address', 'signal_power', 'distance', 'radial', 'quality', 'location_mgrs', 'location_mgrs_short', 'agl', 'receiver_id', 'device_id']
RECEIVER_BEACON_FIELDS = ['location', 'altitude', 'dstcall', 'relay', 'version', 'platform', 'cpu_load', 'free_ram', 'total_ram', 'ntp_error', 'rt_crystal_correction', 'voltage', 'amperage', 'cpu_temp', 'senders_visible', 'senders_total', 'rec_input_noise', 'senders_signal', 'senders_messages', 'good_senders_signal', 'good_senders', 'good_and_bad_senders']
myMGRS = MGRS()
def string_to_message(raw_string, reference_date):
global receivers
try:
message = parse(raw_string, reference_date)
except NotImplementedError as e:
app.logger.error('No parser implemented for message: {}'.format(raw_string))
return None
except ParseError as e:
app.logger.error('Parsing error with message: {}'.format(raw_string))
return None
except TypeError as e:
app.logger.error('TypeError with message: {}'.format(raw_string))
return None
except Exception as e:
app.logger.error('Other Exception with string: {}'.format(raw_string))
return None
# update reference receivers and distance to the receiver
if message['aprs_type'] == 'position':
if message['beacon_type'] in AIRCRAFT_BEACON_TYPES + RECEIVER_BEACON_TYPES:
latitude = message['latitude']
longitude = message['longitude']
location = Location(longitude, latitude)
message['location'] = location.to_wkt()
location_mgrs = myMGRS.toMGRS(latitude, longitude).decode('utf-8')
message['location_mgrs'] = location_mgrs
message['location_mgrs_short'] = location_mgrs[0:5] + location_mgrs[5:7] + location_mgrs[10:12]
if message['beacon_type'] in AIRCRAFT_BEACON_TYPES and 'gps_quality' in message:
if message['gps_quality'] is not None and 'horizontal' in message['gps_quality']:
message['gps_quality_horizontal'] = message['gps_quality']['horizontal']
message['gps_quality_vertical'] = message['gps_quality']['vertical']
del message['gps_quality']
# TODO: Fix python-ogn-client 0.91
if 'senders_messages' in message and message['senders_messages'] is not None:
message['senders_messages'] = int(message['senders_messages'])
if 'good_senders' in message and message['good_senders'] is not None:
message['good_senders'] = int(message['good_senders'])
if 'good_and_bad_senders' in message and message['good_and_bad_senders'] is not None:
message['good_and_bad_senders'] = int(message['good_and_bad_senders'])
return message
class ContinuousDbFeeder:
def __init__(self,):
self.postfix = 'continuous_import'
self.last_flush = datetime.utcnow()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
create_tables(self.postfix)
create_indices(self.postfix)
def add(self, raw_string):
message = string_to_message(raw_string, reference_date=datetime.utcnow())
if message is None or ('raw_message' in message and message['raw_message'][0] == '#') or 'beacon_type' not in message:
return
if message['beacon_type'] in AIRCRAFT_BEACON_TYPES:
complete_message = ','.join([str(message[k]) if k in message and message[k] is not None else '\\N' for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS])
self.aircraft_buffer.write(complete_message)
self.aircraft_buffer.write('\n')
elif message['beacon_type'] in RECEIVER_BEACON_TYPES:
complete_message = ','.join([str(message[k]) if k in message and message[k] is not None else '\\N' for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS])
self.receiver_buffer.write(complete_message)
self.receiver_buffer.write('\n')
else:
app.logger.error("Ignore beacon_type: {}".format(message['beacon_type']))
return
if datetime.utcnow() - self.last_flush >= timedelta(seconds=1):
self.flush()
self.prepare()
self.transfer()
self.delete_beacons()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
self.last_flush = datetime.utcnow()
def flush(self):
self.aircraft_buffer.seek(0)
self.receiver_buffer.seek(0)
connection = db.engine.raw_connection()
cursor = connection.cursor()
cursor.copy_from(self.aircraft_buffer, 'aircraft_beacons_{0}'.format(self.postfix), sep=',', columns=BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS)
cursor.copy_from(self.receiver_buffer, 'receiver_beacons_{0}'.format(self.postfix), sep=',', columns=BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS)
connection.commit()
self.aircraft_buffer = StringIO()
self.receiver_buffer = StringIO()
def prepare(self):
# make receivers complete
add_missing_receivers(self.postfix)
update_receiver_beacons(self.postfix)
update_receiver_location(self.postfix)
# make devices complete
add_missing_devices(self.postfix)
# prepare beacons for transfer
update_aircraft_beacons(self.postfix)
def transfer(self):
# tranfer beacons
transfer_aircraft_beacons(self.postfix)
transfer_receiver_beacons(self.postfix)
def delete_beacons(self):
# delete already transfered beacons
delete_receiver_beacons(self.postfix)
delete_aircraft_beacons(self.postfix)
def prepare_bigdata(postfix):
# make receivers complete
add_missing_receivers(postfix)
update_receiver_location(postfix)
# make devices complete
add_missing_devices(postfix)
# prepare beacons for transfer
create_indices(postfix)
update_receiver_beacons_bigdata(postfix)
update_aircraft_beacons_bigdata(postfix)
def get_aircraft_beacons_postfixes():
"""Get the postfixes from imported aircraft_beacons logs."""
postfixes = db.session.execute("""
SELECT DISTINCT(RIGHT(tablename, 8))
FROM pg_catalog.pg_tables
WHERE schemaname = 'public' AND tablename LIKE 'aircraft\_beacons\_20______'
ORDER BY RIGHT(tablename, 10);
""").fetchall()
return [postfix for postfix in postfixes]
def export_to_path(postfix):
import os, gzip
aircraft_beacons_file = os.path.join(path, 'aircraft_beacons_{0}.csv.gz'.format(postfix))
with gzip.open(aircraft_beacons_file, 'wt', encoding='utf-8') as gzip_file:
self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_aircraft_beacons_subquery()), gzip_file)
receiver_beacons_file = os.path.join(path, 'receiver_beacons_{0}.csv.gz'.format(postfix))
with gzip.open(receiver_beacons_file, 'wt') as gzip_file:
self.cur.copy_expert("COPY ({}) TO STDOUT WITH (DELIMITER ',', FORMAT CSV, HEADER, ENCODING 'UTF-8');".format(self.get_merged_receiver_beacons_subquery()), gzip_file)
def convert(sourcefile, datestr, saver):
from ogn_python.gateway.process import string_to_message
from ogn_python.gateway.process_tools import AIRCRAFT_BEACON_TYPES, RECEIVER_BEACON_TYPES
from datetime import datetime
fin = open_file(sourcefile)
# get total lines of the input file
total_lines = 0
for line in fin:
total_lines += 1
fin.seek(0)
current_line = 0
steps = 100000
reference_date = datetime.strptime(datestr + ' 12:00:00', '%Y-%m-%d %H:%M:%S')
pbar = tqdm(fin, total=total_lines)
for line in pbar:
pbar.set_description('Importing {}'.format(sourcefile))
current_line += 1
if current_line % steps == 0:
saver.flush()
message = string_to_message(line.strip(), reference_date=reference_date)
if message is None:
continue
dictfilt = lambda x, y: dict([(i, x[i]) for i in x if i in set(y)])
try:
if message['beacon_type'] in AIRCRAFT_BEACON_TYPES:
message = dictfilt(message, ('beacon_type', 'aprs_type', 'location_wkt', 'altitude', 'name', 'dstcall', 'relay', 'receiver_name', 'timestamp', 'track', 'ground_speed',
'address_type', 'aircraft_type', 'stealth', 'address', 'climb_rate', 'turn_rate', 'signal_quality', 'error_count', 'frequency_offset', 'gps_quality_horizontal', 'gps_quality_vertical', 'software_version', 'hardware_version', 'real_address', 'signal_power',
'distance', 'radial', 'quality', 'agl', 'location_mgrs', 'location_mgrs_short',
'receiver_id', 'device_id'))
beacon = AircraftBeacon(**message)
elif message['beacon_type'] in RECEIVER_BEACON_TYPES:
if 'rec_crystal_correction' in message:
del message['rec_crystal_correction']
del message['rec_crystal_correction_fine']
beacon = ReceiverBeacon(**message)
saver.add(beacon)
except Exception as e:
print(e)
saver.flush()
fin.close()
@user_cli.command('file_import')
@click.argument('path')
def file_import(path):
"""Import APRS logfiles into separate logfile tables."""
import os
import re
# Get Filepaths and dates to import
results = list()
for (root, dirs, files) in os.walk(path):
for file in sorted(files):
match = re.match('OGN_log\.txt_([0-9]{4}\-[0-9]{2}\-[0-9]{2})\.gz$', file)
if match:
results.append({'filepath': os.path.join(root, file),
'datestr': match.group(1)})
with LogfileDbSaver() as saver:
already_imported = saver.get_datestrs()
results = list(filter(lambda x: x['datestr'] not in already_imported, results))
pbar = tqdm(results)
for result in pbar:
filepath = result['filepath']
datestr = result['datestr']
pbar.set_description("Importing data for {}".format(datestr))
saver.set_datestr(datestr)
saver.create_tables()
convert(filepath, datestr, saver)
saver.add_missing_devices()
saver.add_missing_receivers()