Implement SQL merger, fixes #63

pull/68/head
Konstantin Gründger 2019-01-29 23:06:35 +01:00
rodzic e24129b13c
commit 1866bd9d4e
4 zmienionych plików z 32 dodań i 67 usunięć

Wyświetl plik

@ -35,9 +35,7 @@ class DbSaver:
if key in my_map:
other = my_map[key]
params1 = dict([(k, v) for k, v in message.items() if v is not None])
params2 = dict([(k, v) for k, v in other.items() if v is not None])
merged = {**params1, **params2}
merged = {k: message[k] if message[k] is not None else other[k] for k in message.keys()}
my_map[key] = merged
else:
my_map[key] = message
@ -50,9 +48,11 @@ class DbSaver:
message['location'] = message.pop('location_wkt') # total_time_wasted_here = 3
if message['beacon_type'] in AIRCRAFT_BEACON_TYPES:
self._put_in_map(message=message, my_map=self.aircraft_message_map)
even_messages = {k: message[k] if k in message else None for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS}
self._put_in_map(message=even_messages, my_map=self.aircraft_message_map)
elif message['beacon_type'] in RECEIVER_BEACON_TYPES:
self._put_in_map(message=message, my_map=self.receiver_message_map)
even_messages = {k: message[k] if k in message else None for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS}
self._put_in_map(message=even_messages, my_map=self.receiver_message_map)
else:
print("Ignore beacon_type: {}".format(message['beacon_type']))
return
@ -64,12 +64,10 @@ class DbSaver:
def flush(self):
if len(self.aircraft_message_map) > 0:
messages = list(self.aircraft_message_map.values())
even_messages = [{k: message[k] if k in message else None for k in BEACON_KEY_FIELDS + AIRCRAFT_BEACON_FIELDS} for message in messages]
upsert(session=self.session, model=AircraftBeacon, rows=even_messages, update_cols=AIRCRAFT_BEACON_FIELDS)
upsert(session=self.session, model=AircraftBeacon, rows=messages, update_cols=AIRCRAFT_BEACON_FIELDS)
if len(self.receiver_message_map) > 0:
messages = list(self.receiver_message_map.values())
even_messages = [{k: message[k] if k in message else None for k in BEACON_KEY_FIELDS + RECEIVER_BEACON_FIELDS} for message in messages]
upsert(session=self.session, model=ReceiverBeacon, rows=even_messages, update_cols=RECEIVER_BEACON_FIELDS)
upsert(session=self.session, model=ReceiverBeacon, rows=messages, update_cols=RECEIVER_BEACON_FIELDS)
self.session.commit()
self.aircraft_message_map = dict()

Wyświetl plik

@ -33,7 +33,7 @@ class Beacon(AbstractConcreteBase, Base):
raw_message = None #Column(String)
reference_timestamp = None #Column(DateTime, index=True)
@property
@hybrid_property
def location(self):
if self.location_wkt is None:
return None
@ -41,3 +41,6 @@ class Beacon(AbstractConcreteBase, Base):
coords = to_shape(self.location_wkt)
return Location(lat=coords.y, lon=coords.x)
@location.expression
def location(cls):
return cls.location_wkt

Wyświetl plik

@ -6,46 +6,7 @@ from ogn.gateway.process_tools import Merger
class MergerTest(unittest.TestCase):
def test_different_keys(self):
a = {'name': 'Jeff', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45)}
b = {'name': 'John', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45)}
c = {'name': 'John', 'receiver_name': 'Observer2', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45)}
d = {'name': 'John', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 46)}
callback = MagicMock()
merger = Merger(callback=callback)
merger.add_message(a)
callback.add_message.assert_not_called()
merger.add_message(b)
callback.add_message.assert_not_called()
merger.add_message(c)
callback.add_message.assert_not_called()
merger.add_message(d)
callback.add_message.assert_called_once_with(b)
merger.flush()
calls = [call(a), call(c), call(d)]
callback.add_message.assert_has_calls(calls, any_order=True)
def test_pair(self):
a = {'name': 'Jeff', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45), 'field_a': None, 'field_b': 3.141}
b = {'name': 'Jeff', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45), 'field_a': 'WTF', 'field_c': None, 'field_d': 1.4142}
merged = {'name': 'Jeff', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45), 'field_a': 'WTF', 'field_b': 3.141, 'field_d': 1.4142}
callback = MagicMock()
merger = Merger(callback=callback)
merger.add_message(a)
callback.add_message.assert_not_called()
merger.add_message(b)
callback.add_message.assert_called_once_with(merged)
merger.flush()
callback.add_message.assert_called_once_with(merged)

Wyświetl plik

@ -1,4 +1,4 @@
import time
import datetime
import unittest
from unittest.mock import MagicMock
@ -6,36 +6,39 @@ from ogn.gateway.process_tools import DbSaver
class DbSaverTest(unittest.TestCase):
def test(self):
a = "Albert"
b = "Bertram"
c = "Caspar"
def test_different_keys(self):
a = {'name': 'Jeff', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45)}
b = {'name': 'John', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45)}
c = {'name': 'John', 'receiver_name': 'Observer2', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45)}
d = {'name': 'John', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 46)}
session = MagicMock()
saver = DbSaver(session=session)
saver.add_message(a)
session.bulk_save_objects.assert_not_called()
saver.add_message(b)
session.bulk_save_objects.assert_not_called()
saver.add_message(c)
saver.flush()
session.bulk_save_objects.assert_called_once_with([a, b, c])
saver.add_message(d)
session.commit.assert_not_called()
def test_timeout(self):
a = "Xanthippe"
b = "Yvonne"
saver.flush()
session.commit.assert_called_once()
def test_pair(self):
a = {'name': 'Jeff', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45), 'field_a': None, 'field_b': 3.141}
b = {'name': 'Jeff', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45), 'field_a': 'WTF', 'field_c': None, 'field_d': 1.4142}
merged = {'name': 'Jeff', 'receiver_name': 'Observer1', 'timestamp': datetime.datetime(2018, 5, 20, 18, 4, 45), 'field_a': 'WTF', 'field_b': 3.141, 'field_d': 1.4142}
session = MagicMock()
saver = DbSaver(session=session)
saver.add_message(a)
session.bulk_save_objects.assert_not_called()
time.sleep(1)
session.commit.assert_not_called()
saver.add_message(b)
session.bulk_save_objects.assert_called_once_with([a, b])
session.commit.assert_not_called()
saver.flush()
session.commit.assert_not_called()
if __name__ == '__main__':