diff --git a/ogn/commands/bulkimport.py b/ogn/commands/bulkimport.py index 442c5ca..8d9d091 100644 --- a/ogn/commands/bulkimport.py +++ b/ogn/commands/bulkimport.py @@ -485,7 +485,7 @@ class LogfileDbSaver(): def convert(sourcefile, datestr, saver): from ogn.gateway.process import string_to_message - from ogn.gateway.process_tools import AIRCRAFT_TYPES, RECEIVER_TYPES + from ogn.gateway.process_tools import AIRCRAFT_BEACON_TYPES, RECEIVER_BEACON_TYPES from datetime import datetime fin = open_file(sourcefile) @@ -515,14 +515,14 @@ def convert(sourcefile, datestr, saver): dictfilt = lambda x, y: dict([(i, x[i]) for i in x if i in set(y)]) try: - if message['beacon_type'] in AIRCRAFT_TYPES: + if message['beacon_type'] in AIRCRAFT_BEACON_TYPES: message = dictfilt(message, ('beacon_type', 'aprs_type', 'location_wkt', 'altitude', 'name', 'dstcall', 'relay', 'receiver_name', 'timestamp', 'track', 'ground_speed', 'address_type', 'aircraft_type', 'stealth', 'address', 'climb_rate', 'turn_rate', 'signal_quality', 'error_count', 'frequency_offset', 'gps_quality_horizontal', 'gps_quality_vertical', 'software_version', 'hardware_version', 'real_address', 'signal_power', 'distance', 'radial', 'quality', 'agl', 'location_mgrs', 'location_mgrs_short', 'receiver_id', 'device_id')) beacon = AircraftBeacon(**message) - elif message['beacon_type'] in RECEIVER_TYPES: + elif message['beacon_type'] in RECEIVER_BEACON_TYPES: if 'rec_crystal_correction' in message: del message['rec_crystal_correction'] del message['rec_crystal_correction_fine'] diff --git a/ogn/gateway/manage.py b/ogn/gateway/manage.py index 813661e..2093aac 100644 --- a/ogn/gateway/manage.py +++ b/ogn/gateway/manage.py @@ -4,7 +4,7 @@ from manager import Manager from ogn.client import AprsClient from ogn.gateway.process import string_to_message from datetime import datetime -from ogn.gateway.process_tools import DummyMerger, Converter, DbSaver +from ogn.gateway.process_tools import DbSaver from ogn.commands.dbutils import session manager = Manager() @@ -12,16 +12,13 @@ manager = Manager() logging_formatstr = '%(asctime)s - %(levelname).4s - %(name)s - %(message)s' log_levels = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'] -# Build the processing pipeline saver = DbSaver(session=session) -converter = Converter(callback=saver) -merger = DummyMerger(callback=converter) def asdf(raw_string): message = string_to_message(raw_string, reference_date=datetime.utcnow()) if message is not None: - merger.add_message(message) + saver.add_message(message) else: print(message) @@ -53,6 +50,6 @@ def run(aprs_user='anon-dev', logfile='main.log', loglevel='INFO'): except KeyboardInterrupt: print('\nStop ogn gateway') - merger.flush() + saver.flush() client.disconnect() logging.shutdown() diff --git a/ogn/gateway/process.py b/ogn/gateway/process.py index 2f105d8..62f400a 100644 --- a/ogn/gateway/process.py +++ b/ogn/gateway/process.py @@ -5,7 +5,7 @@ from mgrs import MGRS from ogn.commands.dbutils import session from ogn.model import Location from ogn.parser import parse, ParseError -from ogn.gateway.process_tools import DbSaver, Converter, DummyMerger, AIRCRAFT_TYPES, RECEIVER_TYPES +from ogn.gateway.process_tools import DbSaver, AIRCRAFT_BEACON_TYPES, RECEIVER_BEACON_TYPES logger = logging.getLogger(__name__) @@ -32,30 +32,29 @@ def string_to_message(raw_string, reference_date): try: message = parse(raw_string, reference_date) except NotImplementedError as e: - #logger.w('No parser implemented for message: {}'.format(raw_string)) + logger.w('No parser implemented for message: {}'.format(raw_string)) return None except ParseError as e: - #logger.error('Parsing error with message: {}'.format(raw_string)) + logger.error('Parsing error with message: {}'.format(raw_string)) return None except TypeError as e: - #logger.error('TypeError with message: {}'.format(raw_string)) + logger.error('TypeError with message: {}'.format(raw_string)) return None except Exception as e: - #logger.error(raw_string) - #logger.error(e) + logger.error(raw_string) + logger.error(e) return None # update reference receivers and distance to the receiver if message['aprs_type'] == 'position': - if message['beacon_type'] in RECEIVER_TYPES: + if message['beacon_type'] in AIRCRAFT_BEACON_TYPES + RECEIVER_BEACON_TYPES: message = _replace_lonlat_with_wkt(message) - elif message['beacon_type'] in AIRCRAFT_TYPES: - message = _replace_lonlat_with_wkt(message) - if 'gps_quality' in message: - if message['gps_quality'] is not None and 'horizontal' in message['gps_quality']: - message['gps_quality_horizontal'] = message['gps_quality']['horizontal'] - message['gps_quality_vertical'] = message['gps_quality']['vertical'] - del message['gps_quality'] + + if message['beacon_type'] in AIRCRAFT_BEACON_TYPES and 'gps_quality' in message: + if message['gps_quality'] is not None and 'horizontal' in message['gps_quality']: + message['gps_quality_horizontal'] = message['gps_quality']['horizontal'] + message['gps_quality_vertical'] = message['gps_quality']['vertical'] + del message['gps_quality'] # update raw_message message['raw_message'] = raw_string @@ -63,15 +62,12 @@ def string_to_message(raw_string, reference_date): return message -# Build the processing pipeline saver = DbSaver(session=session) -converter = Converter(callback=saver) -merger = DummyMerger(callback=converter) -def process_raw_message(raw_message, reference_date=None, merger=merger): +def process_raw_message(raw_message, reference_date=None, saver=saver): logger.debug('Received message: {}'.format(raw_message)) message = string_to_message(raw_message, reference_date) - merger.add_message(message) + saver.add_message(message) diff --git a/ogn/gateway/process_tools.py b/ogn/gateway/process_tools.py index c17c3a4..a684262 100644 --- a/ogn/gateway/process_tools.py +++ b/ogn/gateway/process_tools.py @@ -1,8 +1,15 @@ from datetime import datetime, timedelta from ogn.model import AircraftBeacon, ReceiverBeacon +from ogn.collect.database import upsert -AIRCRAFT_TYPES = ['aprs_aircraft', 'flarm', 'tracker', 'fanet', 'lt24', 'naviter', 'skylines', 'spider', 'spot'] -RECEIVER_TYPES = ['aprs_receiver', 'receiver'] +# define message types we want to proceed +AIRCRAFT_BEACON_TYPES = ['aprs_aircraft', 'flarm', 'tracker', 'fanet', 'lt24', 'naviter', 'skylines', 'spider', 'spot'] +RECEIVER_BEACON_TYPES = ['aprs_receiver', 'receiver'] + +# define fields we want to proceed +BEACON_KEY_FIELDS = ['name', 'receiver_name', 'timestamp'] +AIRCRAFT_BEACON_FIELDS = ['location', 'altitude', 'dstcall', 'relay', 'track', 'ground_speed', 'address_type', 'aircraft_type', 'stealth', 'address', 'climb_rate', 'turn_rate', 'signal_quality', 'error_count', 'frequency_offset', 'gps_quality_horizontal', 'gps_quality_vertical', 'software_version', 'hardware_version', 'real_address', 'signal_power', 'distance', 'radial', 'quality', 'location_mgrs', 'location_mgrs_short', 'agl', 'receiver_id', 'device_id'] +RECEIVER_BEACON_FIELDS = ['location', 'altitude', 'dstcall', 'relay', 'version', 'platform', 'cpu_load', 'free_ram', 'total_ram', 'ntp_error', 'rt_crystal_correction', 'voltage', 'amperage', 'cpu_temp', 'senders_visible', 'senders_total', 'rec_input_noise', 'senders_signal', 'senders_messages', 'good_senders_signal', 'good_senders', 'good_and_bad_senders'] class DummyMerger: @@ -16,99 +23,58 @@ class DummyMerger: pass -class Merger: - def __init__(self, callback, max_timedelta=None, max_lines=None): - self.callback = callback - self.max_timedelta = max_timedelta - self.max_lines = max_lines - self.message_map = dict() +class DbSaver: + def __init__(self, session): + self.session = session + self.aircraft_message_map = dict() + self.receiver_message_map = dict() + self.last_commit = datetime.utcnow() + + def _put_in_map(self, message, my_map): + key = message['name'] + message['receiver_name'] + message['timestamp'].strftime('%s') + + if key in my_map: + other = my_map[key] + params1 = dict([(k, v) for k, v in message.items() if v is not None]) + params2 = dict([(k, v) for k, v in other.items() if v is not None]) + merged = {**params1, **params2} + my_map[key] = merged + else: + my_map[key] = message def add_message(self, message): - if message is None or ('raw_message' in message and message['raw_message'][0] == '#'): + if message is None or ('raw_message' in message and message['raw_message'][0] == '#') or 'beacon_type' not in message: return - # release old messages - if self.max_timedelta is not None: - for receiver, v1 in self.message_map.items(): - for name, v2 in v1.items(): - for timestamp,message in v2.items(): - if message['timestamp'] - timestamp > self.max_timedelta: - self.callback.add_message(message) - del self.message_map[receiver][name][timestamp] + if 'location_wkt' in message: + message['location'] = message.pop('location_wkt') # total_time_wasted_here = 3 - # release messages > max_lines - if self.max_lines is not None: - pass - - # merge messages with same timestamp - receiver_name = message['receiver_name'] - name = message['name'] - timestamp = message['timestamp'] - - if receiver_name in self.message_map: - if name in self.message_map[receiver_name]: - timestamps = self.message_map[receiver_name][name] - if timestamp in timestamps: - other = timestamps[timestamp] - params1 = dict([(k, v) for k, v in message.items() if v is not None]) - params2 = dict([(k, v) for k, v in other.items() if v is not None]) - merged = {**params1, **params2} - - # zum debuggen - if 'raw_message' in message and 'raw_message' in other: - merged['raw_message'] = '"{}","{}"'.format(message['raw_message'], other['raw_message']) - - self.callback.add_message(merged) - del self.message_map[receiver_name][name][timestamp] - else: - self.message_map[receiver_name][name][timestamp] = message - - # release previous messages - for ts in list(timestamps): - if ts < timestamp: - self.callback.add_message(timestamps[ts]) - del self.message_map[receiver_name][name][ts] - else: - # add new message - self.message_map[receiver_name].update({name: {timestamp: message}}) + if message['beacon_type'] in AIRCRAFT_BEACON_TYPES: + self._put_in_map(message=message, my_map=self.aircraft_message_map) + elif message['beacon_type'] in RECEIVER_BEACON_TYPES: + self._put_in_map(message=message, my_map=self.receiver_message_map) else: - self.message_map.update({receiver_name: {name: {timestamp: message}}}) + print("Ignore beacon_type: {}".format(message['beacon_type'])) + return + + elapsed_time = datetime.utcnow() - self.last_commit + if elapsed_time >= timedelta(seconds=5): + self.flush() def flush(self): - for receiver, v1 in self.message_map.items(): - for name, v2 in v1.items(): - for timestamp in v2: - self.callback.add_message(self.message_map[receiver][name][timestamp]) + if len(self.aircraft_message_map) > 0: + messages = list(self.aircraft_message_map.values()) + even_messages = [{k: message[k] if k in message else None for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS} for message in messages] + upsert(session=self.session, model=AircraftBeacon, rows=even_messages, update_cols=AIRCRAFT_BEACON_FIELDS) + if len(self.receiver_message_map) > 0: + messages = list(self.receiver_message_map.values()) + even_messages = [{k: message[k] if k in message else None for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS} for message in messages] + upsert(session=self.session, model=ReceiverBeacon, rows=even_messages, update_cols=RECEIVER_BEACON_FIELDS) + self.session.commit() - self.callback.flush() - self.message_map = dict() - - -class Converter: - def __init__(self, callback): - self.callback = callback - - def add_message(self, message): - if message['aprs_type'] in ['status', 'position']: - beacon = self.message_to_beacon(message) - self.callback.add_message(beacon) - - def message_to_beacon(self, message): - # create beacons - if message['beacon_type'] in AIRCRAFT_TYPES: - beacon = AircraftBeacon(**message) - elif message['beacon_type'] in RECEIVER_TYPES: - if 'rec_crystal_correction' in message: - del message['rec_crystal_correction'] - del message['rec_crystal_correction_fine'] - beacon = ReceiverBeacon(**message) - else: - print("Whoops: what is this: {}".format(message)) - - return beacon - - def flush(self): - self.callback.flush() + self.aircraft_message_map = dict() + self.receiver_message_map = dict() + self.last_commit = datetime.utcnow() class DummySaver: @@ -119,34 +85,6 @@ class DummySaver: print("========== flush ==========") -class DbSaver: - def __init__(self, session): - self.session = session - self.beacons = list() - self.last_commit = datetime.utcnow() - - def add_message(self, beacon): - global last_commit - global beacons - - self.beacons.append(beacon) - - elapsed_time = datetime.utcnow() - self.last_commit - if elapsed_time >= timedelta(seconds=1): - self.flush() - - def flush(self): - try: - self.session.bulk_save_objects(self.beacons) - self.session.commit() - self.beacons = list() - self.last_commit = datetime.utcnow() - except Exception as e: - self.session.rollback() - print(e) - return - - import os, gzip, csv diff --git a/ogn/model/beacon.py b/ogn/model/beacon.py index da63268..7d43cff 100644 --- a/ogn/model/beacon.py +++ b/ogn/model/beacon.py @@ -1,7 +1,9 @@ from geoalchemy2.shape import to_shape from geoalchemy2.types import Geometry from sqlalchemy import Column, String, SmallInteger, Float, DateTime +from sqlalchemy.sql import func from sqlalchemy.ext.declarative import AbstractConcreteBase +from sqlalchemy.ext.hybrid import hybrid_property from .base import Base from .geo import Location @@ -38,3 +40,4 @@ class Beacon(AbstractConcreteBase, Base): coords = to_shape(self.location_wkt) return Location(lat=coords.y, lon=coords.x) + diff --git a/tests/gateway/test_merger.py b/tests/gateway/test_merger.py index e03fb20..73c1c65 100644 --- a/tests/gateway/test_merger.py +++ b/tests/gateway/test_merger.py @@ -47,56 +47,6 @@ class MergerTest(unittest.TestCase): merger.flush() callback.add_message.assert_called_once_with(merged) - @unittest.skip('not finished yet') - def test_exceed_timedelta(self): - a = {'name': 'Jeff', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45)} - b = {'name': 'John', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45)} - c = {'name': 'Fred', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 59)} - - callback = MagicMock() - merger = Merger(callback=callback, max_timedelta=datetime.timedelta(seconds=10)) - merger.add_message(a) - callback.add_message.assert_not_called() - - merger.add_message(b) - callback.add_message.assert_not_called() - - merger.add_message(c) - calls = [call(a), call(b)] - callback.add_message.assert_has_calls(calls, any_order=True) - - merger.flush() - calls = [call(a), call(b), call(c)] - callback.add_message.assert_has_calls(calls, any_order=True) - - @unittest.skip('not finished yet') - def test_exceed_maxlines(self): - a = {'name': 'Albert', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45)} - b = {'name': 'Bertram', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 46)} - c = {'name': 'Chlodwig', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 47)} - d = {'name': 'Dagobert', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 48)} - e = {'name': 'Erich', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 49)} - f = {'name': 'Frodo', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 50)} - - callback = MagicMock() - merger = Merger(callback=callback, max_lines=5) - merger.add_message(a) - callback.add_message.assert_not_called() - - merger.add_message(b) - callback.add_message.assert_not_called() - - merger.add_message(c) - callback.add_message.assert_not_called() - - merger.add_message(d) - callback.add_message.assert_not_called() - - merger.add_message(e) - callback.add_message.assert_not_called() - - merger.add_message(f) - callback.add_message.assert_called_once_with(a) if __name__ == '__main__':