Prepare csv and use super fast „COPY FROM“ sql command

pull/68/head
Konstantin Gründger 2017-05-26 22:56:38 +02:00
rodzic b414d5ba84
commit 276763f37f
7 zmienionych plików z 273 dodań i 97 usunięć

Wyświetl plik

@ -5,6 +5,7 @@ from ogn.gateway.process import process_beacon, message_to_beacon
from ogn.commands.dbutils import session from ogn.commands.dbutils import session
from datetime import datetime from datetime import datetime
from manager import Manager from manager import Manager
from ogn.model import AircraftBeacon, ReceiverBeacon
manager = Manager() manager = Manager()
@ -44,21 +45,8 @@ def run(aprs_user='anon-dev', logfile='main.log', loglevel='INFO'):
@manager.command @manager.command
def import_logfile(ogn_logfile, reference_date, logfile='main.log', loglevel='INFO'): def import_beacon_logfile(csv_logfile, logfile='main.log', loglevel='INFO'):
"""Import OGN-data from ogn-log-files <arg: ogn-logfile, reference_date>. Reference date must be given in YYYY-MM-DD.""" """Import csv logfile <arg: csv logfile>."""
# Check if filename exists
try:
f = open(ogn_logfile, 'r')
except:
print('\nError reading ogn-logfile:', ogn_logfile)
return
try:
reference_date = datetime.strptime(reference_date, "%Y-%m-%d")
except:
print('\nError in reference_date argument', reference_date)
return
# Enable logging # Enable logging
log_handlers = [logging.StreamHandler()] log_handlers = [logging.StreamHandler()]
@ -66,15 +54,121 @@ def import_logfile(ogn_logfile, reference_date, logfile='main.log', loglevel='IN
log_handlers.append(logging.FileHandler(logfile)) log_handlers.append(logging.FileHandler(logfile))
logging.basicConfig(format=logging_formatstr, level=loglevel, handlers=log_handlers) logging.basicConfig(format=logging_formatstr, level=loglevel, handlers=log_handlers)
beacons = list() logger = logging.getLogger(__name__)
SQL_STATEMENT = """
COPY %s(%s) FROM STDIN WITH
CSV
HEADER
DELIMITER AS ','
"""
file = open(csv_logfile, 'r')
first_line = file.readline().strip()
aircraft_beacon_columns = ','.join(AircraftBeacon.get_csv_columns())
receiver_beacon_columns = ','.join(ReceiverBeacon.get_csv_columns())
if first_line == aircraft_beacon_columns:
sql = SQL_STATEMENT % ('aircraft_beacon', aircraft_beacon_columns)
elif first_line == receiver_beacon_columns:
sql = SQL_STATEMENT % ('receiver_beacon', receiver_beacon_columns)
else:
print("Not a valid logfile: {}".format(csv_logfile))
return
logger.info("Reading logfile")
conn = session.connection().connection
cursor = conn.cursor()
cursor.copy_expert(sql=sql, file=file)
conn.commit()
cursor.close()
logger.info("Import finished")
# get total lines of the input file
def file_len(fname):
with open(fname) as f:
for i, l in enumerate(f):
pass
return i + 1
@manager.command
def convert_logfile(ogn_logfile, logfile='main.log', loglevel='INFO'):
"""Convert ogn logfiles to csv logfiles (one for aircraft beacons and one for receiver beacons) <arg: ogn-logfile>. Logfile name: blablabla.txt_YYYY-MM-DD."""
# Enable logging
log_handlers = [logging.StreamHandler()]
if logfile:
log_handlers.append(logging.FileHandler(logfile))
logging.basicConfig(format=logging_formatstr, level=loglevel, handlers=log_handlers)
logger = logging.getLogger(__name__)
import re
match = re.search('^.+\.txt\_(\d{4}\-\d{2}\-\d{2})', ogn_logfile)
if match:
reference_date = match.group(1)
else:
print("filename does not match pattern")
return
fin = open(ogn_logfile, 'r')
fout_ab = open('aircraft_beacons.csv_' + reference_date, 'w')
fout_rb = open('receiver_beacons.csv_' + reference_date, 'w')
try:
reference_date = datetime.strptime(reference_date, "%Y-%m-%d")
except:
print('\nError in reference_date argument', reference_date)
return
aircraft_beacons = list()
receiver_beacons = list()
total = file_len(ogn_logfile)
progress = -1
num_lines = 0
from ogn.model import AircraftBeacon, ReceiverBeacon
import csv
wr_ab = csv.writer(fout_ab, delimiter=',')
wr_ab.writerow(AircraftBeacon.get_csv_columns())
wr_rb = csv.writer(fout_rb, delimiter=',')
wr_rb.writerow(ReceiverBeacon.get_csv_columns())
print('Start importing ogn-logfile') print('Start importing ogn-logfile')
for line in f: for line in fin:
num_lines += 1
if int(100 * num_lines / total) != progress:
progress = round(100 * num_lines / total)
logger.info("Reading line {} ({}%)".format(num_lines, progress))
if len(aircraft_beacons) > 0:
for beacon in aircraft_beacons:
wr_ab.writerow(beacon.get_csv_values())
aircraft_beacons = list()
if len(receiver_beacons) > 0:
for beacon in receiver_beacons:
wr_rb.writerow(beacon.get_csv_values())
receiver_beacons = list()
beacon = message_to_beacon(line, reference_date=reference_date) beacon = message_to_beacon(line, reference_date=reference_date)
if beacon is not None: if beacon is not None:
beacons.append(beacon) if isinstance(beacon, AircraftBeacon):
aircraft_beacons.append(beacon)
elif isinstance(beacon, ReceiverBeacon):
receiver_beacons.append(beacon)
session.bulk_save_objects(beacons) if len(aircraft_beacons) > 0:
for beacon in aircraft_beacons:
wr_ab.writerow([beacon.get_csv_values()])
if len(receiver_beacons) > 0:
for beacon in receiver_beacons:
wr_rb.writerow(beacon.get_csv_values())
fin.close()
fout_ab.close()
fout_rb.close()
f.close()
logging.shutdown() logging.shutdown()

Wyświetl plik

@ -1,6 +1,6 @@
import logging import logging
from ogn.commands.dbutils import session from ogn.commands.dbutils import session
from ogn.model import AircraftBeacon, ReceiverBeacon, Device, Receiver, Location from ogn.model import AircraftBeacon, ReceiverBeacon, Location
from ogn.parser import parse_aprs, parse_ogn_receiver_beacon, parse_ogn_aircraft_beacon, ParseError from ogn.parser import parse_aprs, parse_ogn_receiver_beacon, parse_ogn_aircraft_beacon, ParseError
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -54,54 +54,9 @@ def message_to_beacon(raw_message, reference_date):
return beacon return beacon
def add_beacon_to_db(beacon):
if type(beacon == ReceiverBeacon):
# connect beacon with receiver
receiver = session.query(Receiver.id) \
.filter(Receiver.name == beacon.name) \
.first()
if receiver is None:
receiver = Receiver()
receiver.name = beacon.name
session.add(receiver)
beacon.receiver_id = receiver.id
elif type(beacon == AircraftBeacon):
# connect beacon with device
device = session.query(Device) \
.filter(Device.address == beacon.address) \
.first()
if device is None:
device = Device()
device.address = beacon.address
session.add(device)
beacon.device_id = device.id
# update device
device.aircraft_type = beacon.aircraft_type
device.stealth = beacon.stealth
if beacon.hardware_version is not None:
device.hardware_version = beacon.hardware_version
if beacon.software_version is not None:
device.software_version = beacon.software_version
if beacon.real_address is not None:
device.real_address = beacon.real_address
# connect beacon with receiver
receiver = session.query(Receiver.id) \
.filter(Receiver.name == beacon.receiver_name) \
.first()
if receiver is None:
receiver = Receiver()
receiver.name = beacon.receiver_name
session.add(receiver)
beacon.receiver_id = receiver.id
session.add(beacon)
session.commit()
def process_beacon(raw_message, reference_date=None): def process_beacon(raw_message, reference_date=None):
beacon = message_to_beacon(raw_message, reference_date) beacon = message_to_beacon(raw_message, reference_date)
if beacon is not None: if beacon is not None:
add_beacon_to_db(beacon) session.add(beacon)
session.commit()
logger.debug('Received message: {}'.format(raw_message)) logger.debug('Received message: {}'.format(raw_message))

Wyświetl plik

@ -14,15 +14,14 @@ class AircraftBeacon(Beacon):
address = Column(String(6)) address = Column(String(6))
climb_rate = Column(Float) climb_rate = Column(Float)
turn_rate = Column(Float) turn_rate = Column(Float)
flightlevel = Column(Float)
signal_quality = Column(Float) signal_quality = Column(Float)
error_count = Column(Integer) error_count = Column(Integer)
frequency_offset = Column(Float) frequency_offset = Column(Float)
gps_status = Column(String) gps_status = Column(String)
software_version = Column(Float) software_version = Column(Float)
hardware_version = Column(SmallInteger) hardware_version = Column(SmallInteger)
real_address = Column(String(6)) real_address = Column(String(6))
flightlevel = Column(Float)
signal_power = Column(Float) signal_power = Column(Float)
status = Column(SmallInteger) status = Column(SmallInteger)
@ -36,17 +35,72 @@ class AircraftBeacon(Beacon):
def __repr__(self): def __repr__(self):
return "<AircraftBeacon %s: %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s>" % ( return "<AircraftBeacon %s: %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s>" % (
self.name,
self.address_type,
self.aircraft_type,
self.timestamp,
self.address_type, self.address_type,
self.aircraft_type, self.aircraft_type,
self.stealth, self.stealth,
self.address, self.address,
self.climb_rate, self.climb_rate,
self.turn_rate, self.turn_rate,
self.signal_power, self.flightlevel,
self.signal_quality,
self.error_count, self.error_count,
self.frequency_offset, self.frequency_offset,
self.gps_status) self.gps_status,
self.software_version,
self.hardware_version,
self.real_address,
self.signal_power,
self.status)
@classmethod
def get_csv_columns(self):
return['location',
'altitude',
'name',
'receiver_name',
'timestamp',
'track',
'ground_speed',
'address_type',
'aircraft_type',
'stealth',
'address',
'climb_rate',
'turn_rate',
'flightlevel',
'signal_quality',
'error_count',
'frequency_offset',
'gps_status',
'software_version',
'hardware_version',
'real_address',
'signal_power']
def get_csv_values(self):
return [
self.location_wkt,
int(self.altitude),
self.name,
self.receiver_name,
self.timestamp,
self.track,
round(self.ground_speed, 1) if self.ground_speed else None,
self.address_type,
self.aircraft_type,
self.stealth,
self.address,
round(self.climb_rate, 1) if self.climb_rate else None,
round(self.turn_rate, 1) if self.turn_rate else None,
self.flightlevel,
self.signal_quality,
int(self.error_count) if self.error_count else None,
self.frequency_offset,
self.gps_status,
self.software_version,
self.hardware_version,
self.real_address,
self.signal_power]

Wyświetl plik

@ -1,4 +1,4 @@
from sqlalchemy import Column, Float, String, Integer, ForeignKey from sqlalchemy import Column, Float, String, Integer, SmallInteger, ForeignKey
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from .beacon import Beacon from .beacon import Beacon
@ -11,30 +11,113 @@ class ReceiverBeacon(Beacon):
version = Column(String) version = Column(String)
platform = Column(String) platform = Column(String)
cpu_load = Column(Float) cpu_load = Column(Float)
cpu_temp = Column(Float)
free_ram = Column(Float) free_ram = Column(Float)
total_ram = Column(Float) total_ram = Column(Float)
ntp_error = Column(Float) ntp_error = Column(Float)
rt_crystal_correction = Column(Float) rt_crystal_correction = Column(Float)
voltage = Column(Float)
amperage = Column(Float)
cpu_temp = Column(Float)
senders_visible = Column(Integer)
senders_total = Column(Integer)
rec_crystal_correction = 0 # obsolete since 0.2.0 rec_crystal_correction = 0 # obsolete since 0.2.0
rec_crystal_correction_fine = 0 # obsolete since 0.2.0 rec_crystal_correction_fine = 0 # obsolete since 0.2.0
rec_input_noise = Column(Float) rec_input_noise = Column(Float)
senders_visible = Column(Integer)
senders_total = Column(Integer)
senders_signal = Column(Float) senders_signal = Column(Float)
senders_messages = Column(Integer) senders_messages = Column(Integer)
good_senders_signal = Column(Float) good_senders_signal = Column(Float)
good_senders = Column(Integer) good_senders = Column(Integer)
good_and_bad_senders = Column(Integer) good_and_bad_senders = Column(Integer)
voltage = Column(Float) status = Column(SmallInteger)
amperage = Column(Float)
# Relations # Relations
receiver_id = Column(Integer, ForeignKey('receiver.id', ondelete='SET NULL'), index=True) receiver_id = Column(Integer, ForeignKey('receiver.id', ondelete='SET NULL'), index=True)
receiver = relationship('Receiver', foreign_keys=[receiver_id]) receiver = relationship('Receiver', foreign_keys=[receiver_id])
def __repr__(self): def __repr__(self):
return "<ReceiverBeacon %s: %s>" % (self.name, self.version) return "<ReceiverBeacon %s: %s>" % (
self.version,
self.platform,
self.cpu_load,
self.free_ram,
self.total_ram,
self.ntp_error,
self.rt_crystal_correction,
self.voltage,
self.amperage,
self.cpu_temp,
self.senders_visible,
self.senders_total,
# self.rec_crystal_correction,
# self.rec_crystal_correction_fine,
self.rec_input_noise,
self.senders_signal,
self.senders_messages,
self.good_senders_signal,
self.good_senders,
self.good_and_bad_senders,
self.status)
@classmethod
def get_csv_columns(self):
return['location',
'altitude',
'name',
'receiver_name',
'timestamp',
'track',
'ground_speed',
'version',
'platform',
'cpu_load',
'free_ram',
'total_ram',
'ntp_error',
'rt_crystal_correction',
'voltage',
'amperage',
'cpu_temp',
'senders_visible',
'senders_total',
# 'rec_crystal_correction',
# 'rec_crystal_correction_fine',
'rec_input_noise',
'senders_signal',
'senders_messages',
'good_senders_signal',
'good_senders',
'good_and_bad_senders']
def get_csv_values(self):
return [
self.location_wkt,
int(self.altitude) if self.altitude else None,
self.name,
self.receiver_name,
self.timestamp,
self.track,
self.ground_speed,
self.version,
self.platform,
self.cpu_load,
self.free_ram,
self.total_ram,
self.ntp_error,
self.rt_crystal_correction,
self.voltage,
self.amperage,
self.cpu_temp,
int(self.senders_visible) if self.senders_visible else None,
int(self.senders_total) if self.senders_visible else None,
# self.rec_crystal_correction,
# self.rec_crystal_correction_fine,
self.rec_input_noise,
self.senders_signal,
int(self.senders_messages) if self.senders_messages else None,
self.good_senders_signal,
int(self.good_senders) if self.good_senders else None,
int(self.good_and_bad_senders) if self.good_and_bad_senders else None]

Wyświetl plik

@ -137,5 +137,6 @@ class TestDB(unittest.TestCase):
entries_changed = compute_logbook_entries(session) entries_changed = compute_logbook_entries(session)
self.assertEqual(entries_changed, '1/0') self.assertEqual(entries_changed, '1/0')
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

Wyświetl plik

@ -98,5 +98,6 @@ class TestDB(unittest.TestCase):
compute_takeoff_and_landing(session) compute_takeoff_and_landing(session)
self.assertEqual(self.count_takeoff_and_landings(), 2) self.assertEqual(self.count_takeoff_and_landings(), 2)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

Wyświetl plik

@ -18,18 +18,6 @@ class GatewayManagerTest(unittest.TestCase):
self.assertEqual(instance.run.call_count, 1) self.assertEqual(instance.run.call_count, 1)
instance.disconnect.assert_called_once_with() instance.disconnect.assert_called_once_with()
# try to import stored OGN logfile
@mock.patch('ogn.gateway.manage.import_logfile')
def test_run_import_logfile(self, mock_import_logfile):
# instance = mock_import_logfile.return_value
# import_logfile(ogn_logfile="tests/OGN_log.txt_2016-09-21", reference_date="2016-09-21")
# instance.connect.assert_called_once_with()
# self.assertEqual(import_logfile.call_count, 1)
# instance.disconnect.assert_called_once_with()
pass
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()