Use temporary tables for import and updating relations (AircraftBeacon->Device, AircraftBeacon->Receiver, ReceiverBeacon->Receiver)

pull/68/head
Konstantin Gründger 2017-06-03 11:38:55 +02:00
rodzic 276763f37f
commit 0cf6eb56f7
4 zmienionych plików z 181 dodań i 45 usunięć

Wyświetl plik

@ -108,6 +108,9 @@ def update_relations():
ins2 = insert(Device).from_select([Device.address], missing_addresses_query) ins2 = insert(Device).from_select([Device.address], missing_addresses_query)
session.execute(ins2) session.execute(ins2)
session.commit()
print("Inserted {} Receivers and {} Devices".format(ins, ins2))
return
# Update AircraftBeacons # Update AircraftBeacons
upd = session.query(AircraftBeacon) \ upd = session.query(AircraftBeacon) \
@ -128,3 +131,179 @@ def update_relations():
session.commit() session.commit()
print("Updated {} AircraftBeacons and {} ReceiverBeacons". print("Updated {} AircraftBeacons and {} ReceiverBeacons".
format(upd, upd2)) format(upd, upd2))
@manager.command
def import_aircraft_beacon_logfile(csv_logfile, logfile='main.log', loglevel='INFO'):
"""Import csv logfile <arg: csv logfile>."""
SQL_TEMPTABLE_STATEMENT = """
CREATE TABLE aircraft_beacon_temp(
location geometry,
altitude integer,
name character varying,
receiver_name character varying(9),
"timestamp" timestamp without time zone,
track integer,
ground_speed double precision,
address_type smallint,
aircraft_type smallint,
stealth boolean,
address character varying(6),
climb_rate double precision,
turn_rate double precision,
flightlevel double precision,
signal_quality double precision,
error_count integer,
frequency_offset double precision,
gps_status character varying,
software_version double precision,
hardware_version smallint,
real_address character varying(6),
signal_power double precision
)
"""
session.execute(SQL_TEMPTABLE_STATEMENT)
SQL_COPY_STATEMENT = """
COPY aircraft_beacon_temp(%s) FROM STDIN WITH
CSV
HEADER
DELIMITER AS ','
"""
file = open(csv_logfile, 'r')
column_names = ','.join(AircraftBeacon.get_csv_columns())
sql = SQL_COPY_STATEMENT % column_names
print("Start importing logfile")
conn = session.connection().connection
cursor = conn.cursor()
cursor.copy_expert(sql=sql, file=file)
conn.commit()
cursor.close()
print("Read logfile into temporary table")
# create device if not exist
session.execute("""
INSERT INTO device(address)
SELECT DISTINCT(t.address)
FROM aircraft_beacon_temp t
WHERE NOT EXISTS (SELECT 1 FROM device d WHERE d.address = t.address)
""")
print("Inserted missing Devices")
# create receiver if not exist
session.execute("""
INSERT INTO receiver(name)
SELECT DISTINCT(t.receiver_name)
FROM aircraft_beacon_temp t
WHERE NOT EXISTS (SELECT 1 FROM receiver r WHERE r.name = t.receiver_name)
""")
print("Inserted missing Receivers")
session.execute("""
INSERT INTO aircraft_beacon(location, altitude, name, receiver_name, timestamp, track, ground_speed,
address_type, aircraft_type, stealth, address, climb_rate, turn_rate, flightlevel, signal_quality, error_count, frequency_offset, gps_status, software_version, hardware_version, real_address, signal_power,
status, receiver_id, device_id)
SELECT t.location, t.altitude, t.name, t.receiver_name, t.timestamp, t.track, t.ground_speed,
t.address_type, t.aircraft_type, t.stealth, t.address, t.climb_rate, t.turn_rate, t.flightlevel, t.signal_quality, t.error_count, t.frequency_offset, t.gps_status, t.software_version, t.hardware_version, t.real_address, t.signal_power,
0, r.id, d.id
FROM aircraft_beacon_temp t, receiver r, device d
WHERE t.receiver_name = r.name AND t.address = d.address
""")
print("Wrote AircraftBeacons from temporary table into final table")
session.execute("""DROP TABLE aircraft_beacon_temp""")
print("Dropped temporary table")
session.commit()
print("Finished")
@manager.command
def import_receiver_beacon_logfile(csv_logfile, logfile='main.log', loglevel='INFO'):
"""Import csv logfile <arg: csv logfile>."""
SQL_TEMPTABLE_STATEMENT = """
CREATE TABLE receiver_beacon_temp(
location geometry,
altitude integer,
name character varying,
receiver_name character varying(9),
"timestamp" timestamp without time zone,
track integer,
ground_speed double precision,
version character varying,
platform character varying,
cpu_load double precision,
free_ram double precision,
total_ram double precision,
ntp_error double precision,
rt_crystal_correction double precision,
voltage double precision,
amperage double precision,
cpu_temp double precision,
senders_visible integer,
senders_total integer,
rec_input_noise double precision,
senders_signal double precision,
senders_messages integer,
good_senders_signal double precision,
good_senders integer,
good_and_bad_senders integer
)
"""
session.execute(SQL_TEMPTABLE_STATEMENT)
SQL_COPY_STATEMENT = """
COPY receiver_beacon_temp(%s) FROM STDIN WITH
CSV
HEADER
DELIMITER AS ','
"""
file = open(csv_logfile, 'r')
column_names = ','.join(ReceiverBeacon.get_csv_columns())
sql = SQL_COPY_STATEMENT % column_names
print("Start importing logfile")
conn = session.connection().connection
cursor = conn.cursor()
cursor.copy_expert(sql=sql, file=file)
conn.commit()
cursor.close()
print("Read logfile into temporary table")
# create receiver if not exist
session.execute("""
INSERT INTO receiver(name)
SELECT DISTINCT(t.name)
FROM receiver_beacon_temp t
WHERE NOT EXISTS (SELECT 1 FROM receiver r WHERE r.name = t.name)
""")
print("Inserted missing Receivers")
session.execute("""
INSERT INTO receiver_beacon(location, altitude, name, receiver_name, timestamp, track, ground_speed,
version, platform, cpu_load, free_ram, total_ram, ntp_error, rt_crystal_correction, voltage,amperage, cpu_temp, senders_visible, senders_total, rec_input_noise, senders_signal, senders_messages, good_senders_signal, good_senders, good_and_bad_senders,
status, receiver_id)
SELECT t.location, t.altitude, t.name, t.receiver_name, t.timestamp, t.track, t.ground_speed,
t.version, t.platform, t.cpu_load, t.free_ram, t.total_ram, t.ntp_error, t.rt_crystal_correction, t.voltage,amperage, t.cpu_temp, t.senders_visible, t.senders_total, t.rec_input_noise, t.senders_signal, t.senders_messages, t.good_senders_signal, t.good_senders, t.good_and_bad_senders,
0, r.id
FROM receiver_beacon_temp t, receiver r
WHERE t.name = r.name
""")
print("Wrote ReceiverBeacons from temporary table into final table")
session.execute("""DROP TABLE receiver_beacon_temp""")
print("Dropped temporary table")
session.commit()
print("Finished")

Wyświetl plik

@ -2,7 +2,6 @@ import logging
from ogn.client import AprsClient from ogn.client import AprsClient
from ogn.gateway.process import process_beacon, message_to_beacon from ogn.gateway.process import process_beacon, message_to_beacon
from ogn.commands.dbutils import session
from datetime import datetime from datetime import datetime
from manager import Manager from manager import Manager
from ogn.model import AircraftBeacon, ReceiverBeacon from ogn.model import AircraftBeacon, ReceiverBeacon
@ -44,47 +43,6 @@ def run(aprs_user='anon-dev', logfile='main.log', loglevel='INFO'):
logging.shutdown() logging.shutdown()
@manager.command
def import_beacon_logfile(csv_logfile, logfile='main.log', loglevel='INFO'):
"""Import csv logfile <arg: csv logfile>."""
# Enable logging
log_handlers = [logging.StreamHandler()]
if logfile:
log_handlers.append(logging.FileHandler(logfile))
logging.basicConfig(format=logging_formatstr, level=loglevel, handlers=log_handlers)
logger = logging.getLogger(__name__)
SQL_STATEMENT = """
COPY %s(%s) FROM STDIN WITH
CSV
HEADER
DELIMITER AS ','
"""
file = open(csv_logfile, 'r')
first_line = file.readline().strip()
aircraft_beacon_columns = ','.join(AircraftBeacon.get_csv_columns())
receiver_beacon_columns = ','.join(ReceiverBeacon.get_csv_columns())
if first_line == aircraft_beacon_columns:
sql = SQL_STATEMENT % ('aircraft_beacon', aircraft_beacon_columns)
elif first_line == receiver_beacon_columns:
sql = SQL_STATEMENT % ('receiver_beacon', receiver_beacon_columns)
else:
print("Not a valid logfile: {}".format(csv_logfile))
return
logger.info("Reading logfile")
conn = session.connection().connection
cursor = conn.cursor()
cursor.copy_expert(sql=sql, file=file)
conn.commit()
cursor.close()
logger.info("Import finished")
# get total lines of the input file # get total lines of the input file
def file_len(fname): def file_len(fname):
with open(fname) as f: with open(fname) as f:
@ -130,7 +88,6 @@ def convert_logfile(ogn_logfile, logfile='main.log', loglevel='INFO'):
progress = -1 progress = -1
num_lines = 0 num_lines = 0
from ogn.model import AircraftBeacon, ReceiverBeacon
import csv import csv
wr_ab = csv.writer(fout_ab, delimiter=',') wr_ab = csv.writer(fout_ab, delimiter=',')
wr_ab.writerow(AircraftBeacon.get_csv_columns()) wr_ab.writerow(AircraftBeacon.get_csv_columns())

Wyświetl plik

@ -24,7 +24,7 @@ class AircraftBeacon(Beacon):
real_address = Column(String(6)) real_address = Column(String(6))
signal_power = Column(Float) signal_power = Column(Float)
status = Column(SmallInteger) status = Column(SmallInteger, index=True)
# Relations # Relations
receiver_id = Column(Integer, ForeignKey('receiver.id', ondelete='SET NULL'), index=True) receiver_id = Column(Integer, ForeignKey('receiver.id', ondelete='SET NULL'), index=True)

Wyświetl plik

@ -29,7 +29,7 @@ class ReceiverBeacon(Beacon):
good_senders = Column(Integer) good_senders = Column(Integer)
good_and_bad_senders = Column(Integer) good_and_bad_senders = Column(Integer)
status = Column(SmallInteger) status = Column(SmallInteger, index=True)
# Relations # Relations
receiver_id = Column(Integer, ForeignKey('receiver.id', ondelete='SET NULL'), index=True) receiver_id = Column(Integer, ForeignKey('receiver.id', ondelete='SET NULL'), index=True)