From aaac0029f6225eef7962ebfd1146c9a9705f36bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Konstantin=20Gru=CC=88ndger?= Date: Mon, 31 Oct 2016 08:58:19 +0100 Subject: [PATCH] Better bulk import: separate beacon creation from db insert --- ogn/gateway/manage.py | 11 ++- ogn/gateway/process.py | 166 +++++++++++++++++++++-------------------- 2 files changed, 93 insertions(+), 84 deletions(-) diff --git a/ogn/gateway/manage.py b/ogn/gateway/manage.py index da21b93..9716d37 100644 --- a/ogn/gateway/manage.py +++ b/ogn/gateway/manage.py @@ -1,7 +1,8 @@ import logging from ogn.client import AprsClient -from ogn.gateway.process import process_beacon +from ogn.gateway.process import process_beacon, message_to_beacon +from ogn.commands.dbutils import session from datetime import datetime from manager import Manager @@ -65,9 +66,15 @@ def import_logfile(ogn_logfile, reference_date, logfile='main.log', loglevel='IN log_handlers.append(logging.FileHandler(logfile)) logging.basicConfig(format=logging_formatstr, level=loglevel, handlers=log_handlers) + beacons = list() + print('Start importing ogn-logfile') for line in f: - process_beacon(line, reference_date=reference_date) + beacon = message_to_beacon(line, reference_date=reference_date) + if beacon is not None: + beacons.append(beacon) + + session.bulk_save_objects(beacons) f.close() logging.shutdown() diff --git a/ogn/gateway/process.py b/ogn/gateway/process.py index d1fef12..805e4dc 100644 --- a/ogn/gateway/process.py +++ b/ogn/gateway/process.py @@ -14,92 +14,94 @@ def replace_lonlat_with_wkt(message): return message -def process_beacon(raw_message, reference_date=None, reference_time=None): - if raw_message[0] == '#': - return - try: - message = parse_aprs(raw_message, reference_date, reference_time) - # symboltable / symbolcodes used by OGN: - # I&: used as receiver - # /X: helicopter_rotorcraft - # /': glider_or_motorglider - # \^: powered_aircraft - # /g: para_glider - # /O: ? - # /^: ? - # \n: ? - # /z: ? - # /o: ? - if 'symboltable' not in message and 'symbolcode' not in message: - # we have a receiver_beacon (status message) - message.update(parse_ogn_receiver_beacon(message['comment'])) - beacon = ReceiverBeacon(**message) +def message_to_beacon(raw_message, reference_date): + beacon = None - # connect beacon with receiver - receiver = session.query(Receiver.id) \ - .filter(Receiver.name == beacon.name) \ - .first() - if receiver is None: - receiver = Receiver() - receiver.name = beacon.name - session.add(receiver) - beacon.receiver_id = receiver.id - elif message['symboltable'] == "I" and message['symbolcode'] == '&': - # ... we have a receiver_beacon - message.update(parse_ogn_receiver_beacon(message['comment'])) - message = replace_lonlat_with_wkt(message) - beacon = ReceiverBeacon(**message) + if raw_message[0] != '#': + try: + message = parse_aprs(raw_message, reference_date) + # symboltable / symbolcodes used by OGN: + # I&: used as receiver + # /X: helicopter_rotorcraft + # /': glider_or_motorglider + # \^: powered_aircraft + # /g: para_glider + # /O: ? + # /^: ? + # \n: ? + # /z: ? + # /o: ? + if 'symboltable' not in message and 'symbolcode' not in message: + # we have a receiver_beacon (status message) + message.update(parse_ogn_receiver_beacon(message['comment'])) + beacon = ReceiverBeacon(**message) + elif message['symboltable'] == "I" and message['symbolcode'] == '&': + # ... we have a receiver_beacon + message.update(parse_ogn_receiver_beacon(message['comment'])) + message = replace_lonlat_with_wkt(message) + beacon = ReceiverBeacon(**message) + else: + # ... we have a aircraft_beacon + message.update(parse_ogn_aircraft_beacon(message['comment'])) + message = replace_lonlat_with_wkt(message) + beacon = AircraftBeacon(**message) + except ParseError as e: + logger.error('Received message: {}'.format(raw_message)) + logger.error('Drop packet, {}'.format(e.message)) + except TypeError as e: + logger.error('TypeError: {}'.format(raw_message)) - # connect beacon with receiver - receiver = session.query(Receiver.id) \ - .filter(Receiver.name == beacon.name) \ - .first() - if receiver is None: - receiver = Receiver() - receiver.name = beacon.name - session.add(receiver) - beacon.receiver_id = receiver.id - else: - # ... we have a aircraft_beacon - message.update(parse_ogn_aircraft_beacon(message['comment'])) - message = replace_lonlat_with_wkt(message) - beacon = AircraftBeacon(**message) + return beacon - # connect beacon with device - device = session.query(Device) \ - .filter(Device.address == beacon.address) \ - .first() - if device is None: - device = Device() - device.address = beacon.address - session.add(device) - beacon.device_id = device.id - # update device - device.aircraft_type = beacon.aircraft_type - device.stealth = beacon.stealth - if beacon.hardware_version is not None: - device.hardware_version = beacon.hardware_version - if beacon.software_version is not None: - device.software_version = beacon.software_version - if beacon.real_address is not None: - device.real_address = beacon.real_address +def add_beacon_to_db(beacon): + if type(beacon == ReceiverBeacon): + # connect beacon with receiver + receiver = session.query(Receiver.id) \ + .filter(Receiver.name == beacon.name) \ + .first() + if receiver is None: + receiver = Receiver() + receiver.name = beacon.name + session.add(receiver) + beacon.receiver_id = receiver.id + elif type(beacon == AircraftBeacon): + # connect beacon with device + device = session.query(Device) \ + .filter(Device.address == beacon.address) \ + .first() + if device is None: + device = Device() + device.address = beacon.address + session.add(device) + beacon.device_id = device.id - # connect beacon with receiver - receiver = session.query(Receiver.id) \ - .filter(Receiver.name == beacon.receiver_name) \ - .first() - if receiver is None: - receiver = Receiver() - receiver.name = beacon.receiver_name - session.add(receiver) - beacon.receiver_id = receiver.id + # update device + device.aircraft_type = beacon.aircraft_type + device.stealth = beacon.stealth + if beacon.hardware_version is not None: + device.hardware_version = beacon.hardware_version + if beacon.software_version is not None: + device.software_version = beacon.software_version + if beacon.real_address is not None: + device.real_address = beacon.real_address - session.add(beacon) - session.commit() + # connect beacon with receiver + receiver = session.query(Receiver.id) \ + .filter(Receiver.name == beacon.receiver_name) \ + .first() + if receiver is None: + receiver = Receiver() + receiver.name = beacon.receiver_name + session.add(receiver) + beacon.receiver_id = receiver.id + + session.add(beacon) + session.commit() + + +def process_beacon(raw_message, reference_date=None): + beacon = message_to_beacon(raw_message, reference_date) + if beacon is not None: + add_beacon_to_db(beacon) logger.debug('Received message: {}'.format(raw_message)) - except ParseError as e: - logger.error('Received message: {}'.format(raw_message)) - logger.error('Drop packet, {}'.format(e.message)) - except TypeError as e: - logger.error('TypeError: {}'.format(raw_message))