SQL merger... WIP

pull/68/head
Konstantin Gründger 2019-01-28 22:06:38 +01:00
rodzic 90ab582ca3
commit e24129b13c
6 zmienionych plików z 76 dodań i 192 usunięć

Wyświetl plik

@ -485,7 +485,7 @@ class LogfileDbSaver():
def convert(sourcefile, datestr, saver): def convert(sourcefile, datestr, saver):
from ogn.gateway.process import string_to_message from ogn.gateway.process import string_to_message
from ogn.gateway.process_tools import AIRCRAFT_TYPES, RECEIVER_TYPES from ogn.gateway.process_tools import AIRCRAFT_BEACON_TYPES, RECEIVER_BEACON_TYPES
from datetime import datetime from datetime import datetime
fin = open_file(sourcefile) fin = open_file(sourcefile)
@ -515,14 +515,14 @@ def convert(sourcefile, datestr, saver):
dictfilt = lambda x, y: dict([(i, x[i]) for i in x if i in set(y)]) dictfilt = lambda x, y: dict([(i, x[i]) for i in x if i in set(y)])
try: try:
if message['beacon_type'] in AIRCRAFT_TYPES: if message['beacon_type'] in AIRCRAFT_BEACON_TYPES:
message = dictfilt(message, ('beacon_type', 'aprs_type', 'location_wkt', 'altitude', 'name', 'dstcall', 'relay', 'receiver_name', 'timestamp', 'track', 'ground_speed', message = dictfilt(message, ('beacon_type', 'aprs_type', 'location_wkt', 'altitude', 'name', 'dstcall', 'relay', 'receiver_name', 'timestamp', 'track', 'ground_speed',
'address_type', 'aircraft_type', 'stealth', 'address', 'climb_rate', 'turn_rate', 'signal_quality', 'error_count', 'frequency_offset', 'gps_quality_horizontal', 'gps_quality_vertical', 'software_version', 'hardware_version', 'real_address', 'signal_power', 'address_type', 'aircraft_type', 'stealth', 'address', 'climb_rate', 'turn_rate', 'signal_quality', 'error_count', 'frequency_offset', 'gps_quality_horizontal', 'gps_quality_vertical', 'software_version', 'hardware_version', 'real_address', 'signal_power',
'distance', 'radial', 'quality', 'agl', 'location_mgrs', 'location_mgrs_short', 'distance', 'radial', 'quality', 'agl', 'location_mgrs', 'location_mgrs_short',
'receiver_id', 'device_id')) 'receiver_id', 'device_id'))
beacon = AircraftBeacon(**message) beacon = AircraftBeacon(**message)
elif message['beacon_type'] in RECEIVER_TYPES: elif message['beacon_type'] in RECEIVER_BEACON_TYPES:
if 'rec_crystal_correction' in message: if 'rec_crystal_correction' in message:
del message['rec_crystal_correction'] del message['rec_crystal_correction']
del message['rec_crystal_correction_fine'] del message['rec_crystal_correction_fine']

Wyświetl plik

@ -4,7 +4,7 @@ from manager import Manager
from ogn.client import AprsClient from ogn.client import AprsClient
from ogn.gateway.process import string_to_message from ogn.gateway.process import string_to_message
from datetime import datetime from datetime import datetime
from ogn.gateway.process_tools import DummyMerger, Converter, DbSaver from ogn.gateway.process_tools import DbSaver
from ogn.commands.dbutils import session from ogn.commands.dbutils import session
manager = Manager() manager = Manager()
@ -12,16 +12,13 @@ manager = Manager()
logging_formatstr = '%(asctime)s - %(levelname).4s - %(name)s - %(message)s' logging_formatstr = '%(asctime)s - %(levelname).4s - %(name)s - %(message)s'
log_levels = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'] log_levels = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG']
# Build the processing pipeline
saver = DbSaver(session=session) saver = DbSaver(session=session)
converter = Converter(callback=saver)
merger = DummyMerger(callback=converter)
def asdf(raw_string): def asdf(raw_string):
message = string_to_message(raw_string, reference_date=datetime.utcnow()) message = string_to_message(raw_string, reference_date=datetime.utcnow())
if message is not None: if message is not None:
merger.add_message(message) saver.add_message(message)
else: else:
print(message) print(message)
@ -53,6 +50,6 @@ def run(aprs_user='anon-dev', logfile='main.log', loglevel='INFO'):
except KeyboardInterrupt: except KeyboardInterrupt:
print('\nStop ogn gateway') print('\nStop ogn gateway')
merger.flush() saver.flush()
client.disconnect() client.disconnect()
logging.shutdown() logging.shutdown()

Wyświetl plik

@ -5,7 +5,7 @@ from mgrs import MGRS
from ogn.commands.dbutils import session from ogn.commands.dbutils import session
from ogn.model import Location from ogn.model import Location
from ogn.parser import parse, ParseError from ogn.parser import parse, ParseError
from ogn.gateway.process_tools import DbSaver, Converter, DummyMerger, AIRCRAFT_TYPES, RECEIVER_TYPES from ogn.gateway.process_tools import DbSaver, AIRCRAFT_BEACON_TYPES, RECEIVER_BEACON_TYPES
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -32,30 +32,29 @@ def string_to_message(raw_string, reference_date):
try: try:
message = parse(raw_string, reference_date) message = parse(raw_string, reference_date)
except NotImplementedError as e: except NotImplementedError as e:
#logger.w('No parser implemented for message: {}'.format(raw_string)) logger.w('No parser implemented for message: {}'.format(raw_string))
return None return None
except ParseError as e: except ParseError as e:
#logger.error('Parsing error with message: {}'.format(raw_string)) logger.error('Parsing error with message: {}'.format(raw_string))
return None return None
except TypeError as e: except TypeError as e:
#logger.error('TypeError with message: {}'.format(raw_string)) logger.error('TypeError with message: {}'.format(raw_string))
return None return None
except Exception as e: except Exception as e:
#logger.error(raw_string) logger.error(raw_string)
#logger.error(e) logger.error(e)
return None return None
# update reference receivers and distance to the receiver # update reference receivers and distance to the receiver
if message['aprs_type'] == 'position': if message['aprs_type'] == 'position':
if message['beacon_type'] in RECEIVER_TYPES: if message['beacon_type'] in AIRCRAFT_BEACON_TYPES + RECEIVER_BEACON_TYPES:
message = _replace_lonlat_with_wkt(message) message = _replace_lonlat_with_wkt(message)
elif message['beacon_type'] in AIRCRAFT_TYPES:
message = _replace_lonlat_with_wkt(message) if message['beacon_type'] in AIRCRAFT_BEACON_TYPES and 'gps_quality' in message:
if 'gps_quality' in message: if message['gps_quality'] is not None and 'horizontal' in message['gps_quality']:
if message['gps_quality'] is not None and 'horizontal' in message['gps_quality']: message['gps_quality_horizontal'] = message['gps_quality']['horizontal']
message['gps_quality_horizontal'] = message['gps_quality']['horizontal'] message['gps_quality_vertical'] = message['gps_quality']['vertical']
message['gps_quality_vertical'] = message['gps_quality']['vertical'] del message['gps_quality']
del message['gps_quality']
# update raw_message # update raw_message
message['raw_message'] = raw_string message['raw_message'] = raw_string
@ -63,15 +62,12 @@ def string_to_message(raw_string, reference_date):
return message return message
# Build the processing pipeline
saver = DbSaver(session=session) saver = DbSaver(session=session)
converter = Converter(callback=saver)
merger = DummyMerger(callback=converter)
def process_raw_message(raw_message, reference_date=None, merger=merger): def process_raw_message(raw_message, reference_date=None, saver=saver):
logger.debug('Received message: {}'.format(raw_message)) logger.debug('Received message: {}'.format(raw_message))
message = string_to_message(raw_message, reference_date) message = string_to_message(raw_message, reference_date)
merger.add_message(message) saver.add_message(message)

Wyświetl plik

@ -1,8 +1,15 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
from ogn.model import AircraftBeacon, ReceiverBeacon from ogn.model import AircraftBeacon, ReceiverBeacon
from ogn.collect.database import upsert
AIRCRAFT_TYPES = ['aprs_aircraft', 'flarm', 'tracker', 'fanet', 'lt24', 'naviter', 'skylines', 'spider', 'spot'] # define message types we want to proceed
RECEIVER_TYPES = ['aprs_receiver', 'receiver'] AIRCRAFT_BEACON_TYPES = ['aprs_aircraft', 'flarm', 'tracker', 'fanet', 'lt24', 'naviter', 'skylines', 'spider', 'spot']
RECEIVER_BEACON_TYPES = ['aprs_receiver', 'receiver']
# define fields we want to proceed
BEACON_KEY_FIELDS = ['name', 'receiver_name', 'timestamp']
AIRCRAFT_BEACON_FIELDS = ['location', 'altitude', 'dstcall', 'relay', 'track', 'ground_speed', 'address_type', 'aircraft_type', 'stealth', 'address', 'climb_rate', 'turn_rate', 'signal_quality', 'error_count', 'frequency_offset', 'gps_quality_horizontal', 'gps_quality_vertical', 'software_version', 'hardware_version', 'real_address', 'signal_power', 'distance', 'radial', 'quality', 'location_mgrs', 'location_mgrs_short', 'agl', 'receiver_id', 'device_id']
RECEIVER_BEACON_FIELDS = ['location', 'altitude', 'dstcall', 'relay', 'version', 'platform', 'cpu_load', 'free_ram', 'total_ram', 'ntp_error', 'rt_crystal_correction', 'voltage', 'amperage', 'cpu_temp', 'senders_visible', 'senders_total', 'rec_input_noise', 'senders_signal', 'senders_messages', 'good_senders_signal', 'good_senders', 'good_and_bad_senders']
class DummyMerger: class DummyMerger:
@ -16,99 +23,58 @@ class DummyMerger:
pass pass
class Merger: class DbSaver:
def __init__(self, callback, max_timedelta=None, max_lines=None): def __init__(self, session):
self.callback = callback self.session = session
self.max_timedelta = max_timedelta self.aircraft_message_map = dict()
self.max_lines = max_lines self.receiver_message_map = dict()
self.message_map = dict() self.last_commit = datetime.utcnow()
def _put_in_map(self, message, my_map):
key = message['name'] + message['receiver_name'] + message['timestamp'].strftime('%s')
if key in my_map:
other = my_map[key]
params1 = dict([(k, v) for k, v in message.items() if v is not None])
params2 = dict([(k, v) for k, v in other.items() if v is not None])
merged = {**params1, **params2}
my_map[key] = merged
else:
my_map[key] = message
def add_message(self, message): def add_message(self, message):
if message is None or ('raw_message' in message and message['raw_message'][0] == '#'): if message is None or ('raw_message' in message and message['raw_message'][0] == '#') or 'beacon_type' not in message:
return return
# release old messages if 'location_wkt' in message:
if self.max_timedelta is not None: message['location'] = message.pop('location_wkt') # total_time_wasted_here = 3
for receiver, v1 in self.message_map.items():
for name, v2 in v1.items():
for timestamp,message in v2.items():
if message['timestamp'] - timestamp > self.max_timedelta:
self.callback.add_message(message)
del self.message_map[receiver][name][timestamp]
# release messages > max_lines if message['beacon_type'] in AIRCRAFT_BEACON_TYPES:
if self.max_lines is not None: self._put_in_map(message=message, my_map=self.aircraft_message_map)
pass elif message['beacon_type'] in RECEIVER_BEACON_TYPES:
self._put_in_map(message=message, my_map=self.receiver_message_map)
# merge messages with same timestamp
receiver_name = message['receiver_name']
name = message['name']
timestamp = message['timestamp']
if receiver_name in self.message_map:
if name in self.message_map[receiver_name]:
timestamps = self.message_map[receiver_name][name]
if timestamp in timestamps:
other = timestamps[timestamp]
params1 = dict([(k, v) for k, v in message.items() if v is not None])
params2 = dict([(k, v) for k, v in other.items() if v is not None])
merged = {**params1, **params2}
# zum debuggen
if 'raw_message' in message and 'raw_message' in other:
merged['raw_message'] = '"{}","{}"'.format(message['raw_message'], other['raw_message'])
self.callback.add_message(merged)
del self.message_map[receiver_name][name][timestamp]
else:
self.message_map[receiver_name][name][timestamp] = message
# release previous messages
for ts in list(timestamps):
if ts < timestamp:
self.callback.add_message(timestamps[ts])
del self.message_map[receiver_name][name][ts]
else:
# add new message
self.message_map[receiver_name].update({name: {timestamp: message}})
else: else:
self.message_map.update({receiver_name: {name: {timestamp: message}}}) print("Ignore beacon_type: {}".format(message['beacon_type']))
return
elapsed_time = datetime.utcnow() - self.last_commit
if elapsed_time >= timedelta(seconds=5):
self.flush()
def flush(self): def flush(self):
for receiver, v1 in self.message_map.items(): if len(self.aircraft_message_map) > 0:
for name, v2 in v1.items(): messages = list(self.aircraft_message_map.values())
for timestamp in v2: even_messages = [{k: message[k] if k in message else None for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS} for message in messages]
self.callback.add_message(self.message_map[receiver][name][timestamp]) upsert(session=self.session, model=AircraftBeacon, rows=even_messages, update_cols=AIRCRAFT_BEACON_FIELDS)
if len(self.receiver_message_map) > 0:
messages = list(self.receiver_message_map.values())
even_messages = [{k: message[k] if k in message else None for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS} for message in messages]
upsert(session=self.session, model=ReceiverBeacon, rows=even_messages, update_cols=RECEIVER_BEACON_FIELDS)
self.session.commit()
self.callback.flush() self.aircraft_message_map = dict()
self.message_map = dict() self.receiver_message_map = dict()
self.last_commit = datetime.utcnow()
class Converter:
def __init__(self, callback):
self.callback = callback
def add_message(self, message):
if message['aprs_type'] in ['status', 'position']:
beacon = self.message_to_beacon(message)
self.callback.add_message(beacon)
def message_to_beacon(self, message):
# create beacons
if message['beacon_type'] in AIRCRAFT_TYPES:
beacon = AircraftBeacon(**message)
elif message['beacon_type'] in RECEIVER_TYPES:
if 'rec_crystal_correction' in message:
del message['rec_crystal_correction']
del message['rec_crystal_correction_fine']
beacon = ReceiverBeacon(**message)
else:
print("Whoops: what is this: {}".format(message))
return beacon
def flush(self):
self.callback.flush()
class DummySaver: class DummySaver:
@ -119,34 +85,6 @@ class DummySaver:
print("========== flush ==========") print("========== flush ==========")
class DbSaver:
def __init__(self, session):
self.session = session
self.beacons = list()
self.last_commit = datetime.utcnow()
def add_message(self, beacon):
global last_commit
global beacons
self.beacons.append(beacon)
elapsed_time = datetime.utcnow() - self.last_commit
if elapsed_time >= timedelta(seconds=1):
self.flush()
def flush(self):
try:
self.session.bulk_save_objects(self.beacons)
self.session.commit()
self.beacons = list()
self.last_commit = datetime.utcnow()
except Exception as e:
self.session.rollback()
print(e)
return
import os, gzip, csv import os, gzip, csv

Wyświetl plik

@ -1,7 +1,9 @@
from geoalchemy2.shape import to_shape from geoalchemy2.shape import to_shape
from geoalchemy2.types import Geometry from geoalchemy2.types import Geometry
from sqlalchemy import Column, String, SmallInteger, Float, DateTime from sqlalchemy import Column, String, SmallInteger, Float, DateTime
from sqlalchemy.sql import func
from sqlalchemy.ext.declarative import AbstractConcreteBase from sqlalchemy.ext.declarative import AbstractConcreteBase
from sqlalchemy.ext.hybrid import hybrid_property
from .base import Base from .base import Base
from .geo import Location from .geo import Location
@ -38,3 +40,4 @@ class Beacon(AbstractConcreteBase, Base):
coords = to_shape(self.location_wkt) coords = to_shape(self.location_wkt)
return Location(lat=coords.y, lon=coords.x) return Location(lat=coords.y, lon=coords.x)

Wyświetl plik

@ -47,56 +47,6 @@ class MergerTest(unittest.TestCase):
merger.flush() merger.flush()
callback.add_message.assert_called_once_with(merged) callback.add_message.assert_called_once_with(merged)
@unittest.skip('not finished yet')
def test_exceed_timedelta(self):
a = {'name': 'Jeff', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45)}
b = {'name': 'John', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45)}
c = {'name': 'Fred', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 59)}
callback = MagicMock()
merger = Merger(callback=callback, max_timedelta=datetime.timedelta(seconds=10))
merger.add_message(a)
callback.add_message.assert_not_called()
merger.add_message(b)
callback.add_message.assert_not_called()
merger.add_message(c)
calls = [call(a), call(b)]
callback.add_message.assert_has_calls(calls, any_order=True)
merger.flush()
calls = [call(a), call(b), call(c)]
callback.add_message.assert_has_calls(calls, any_order=True)
@unittest.skip('not finished yet')
def test_exceed_maxlines(self):
a = {'name': 'Albert', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45)}
b = {'name': 'Bertram', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 46)}
c = {'name': 'Chlodwig', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 47)}
d = {'name': 'Dagobert', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 48)}
e = {'name': 'Erich', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 49)}
f = {'name': 'Frodo', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 50)}
callback = MagicMock()
merger = Merger(callback=callback, max_lines=5)
merger.add_message(a)
callback.add_message.assert_not_called()
merger.add_message(b)
callback.add_message.assert_not_called()
merger.add_message(c)
callback.add_message.assert_not_called()
merger.add_message(d)
callback.add_message.assert_not_called()
merger.add_message(e)
callback.add_message.assert_not_called()
merger.add_message(f)
callback.add_message.assert_called_once_with(a)
if __name__ == '__main__': if __name__ == '__main__':