Added receiver status beacon and merge them to the recever_beacon table

pull/78/head
Konstantin Gründger 2020-05-22 13:59:25 +02:00
rodzic 2c31c685ac
commit f443294553
3 zmienionych plików z 199 dodań i 51 usunięć

Wyświetl plik

@ -28,7 +28,7 @@ AIRCRAFT_BEACON_TYPES = ["aprs_aircraft", "flarm", "tracker", "fanet", "lt24", "
RECEIVER_BEACON_TYPES = ["aprs_receiver", "receiver"]
# define fields we want to proceed
AIRCRAFT_BEACON_FIELDS = [
AIRCRAFT_POSITION_BEACON_FIELDS = [
"location",
"altitude",
"name",
@ -59,15 +59,29 @@ AIRCRAFT_BEACON_FIELDS = [
"location_mgrs",
"location_mgrs_short",
"agl",
"reference_timestamp"
]
RECEIVER_BEACON_FIELDS = [
RECEIVER_POSITION_BEACON_FIELDS = [
"location",
"altitude",
"name",
"dstcall",
"receiver_name",
"timestamp",
"reference_timestamp"
]
RECEIVER_STATUS_BEACON_FIELDS = [
"name",
"dstcall",
"receiver_name",
"timestamp",
"version",
"platform",
]
@ -122,7 +136,7 @@ class StringConverter:
current_app.logger.error("Other Exception with string: {}".format(raw_string))
return
if message['aprs_type'] not in ('server', 'position'):
if message['aprs_type'] not in ('server', 'position', 'status'):
return
elif message['aprs_type'] == 'server' and self.auto_update_timestamp is True:
@ -150,8 +164,8 @@ class StringConverter:
return message
def _get_aircraft_beacon_csv_string(self, message, none_character=''):
csv_string = "{0},{1},{2},{3},{4},{5},{6},{7},{8},{9},{10},{11},{12},{13},{14},{15},{16},{17},{18},{19},{20},{21},{22},{23},{24},{25},{26},{27},{28},{29}\n".format(
def _get_aircraft_position_beacon_csv_string(self, message, none_character=''):
csv_string = "{0},{1},{2},{3},{4},{5},{6},{7},{8},{9},{10},{11},{12},{13},{14},{15},{16},{17},{18},{19},{20},{21},{22},{23},{24},{25},{26},{27},{28},{29},{30}\n".format(
message['location'],
int(message['altitude']) if message['altitude'] else none_character,
message['name'],
@ -182,17 +196,33 @@ class StringConverter:
message['location_mgrs'],
message['location_mgrs_short'],
message['agl'] if 'agl' in message else none_character, #29
message['reference_timestamp'],
)
return csv_string
def _get_receiver_beacon_csv_string(self, message, none_character=''):
csv_string = "{0},{1},{2},{3},{4},{5}\n".format(
def _get_receiver_position_beacon_csv_string(self, message, none_character=''):
csv_string = "{0},{1},{2},{3},{4},{5},{6}\n".format(
message['location'],
int(message['altitude']) if message['altitude'] else none_character,
message['name'],
message['dstcall'],
message['receiver_name'],
message['timestamp'],
message['reference_timestamp'],
)
return csv_string
def _get_receiver_status_beacon_csv_string(self, message, none_character=''):
csv_string = "{0},{1},{2},{3},{4},{5}\n".format(
message['name'],
message['dstcall'],
message['receiver_name'],
message['timestamp'],
message['version'] if 'version' in message and message['version'] else none_character,
message['platform'] if 'platform' in message and message['platform'] else none_character,
)
return csv_string
@ -208,8 +238,8 @@ class FileFeeder(StringConverter):
super().__init__(reference_timestamp, reference_timestamp_autoupdate)
def __enter__(self):
self.aircraft_beacons_file.write(','.join(AIRCRAFT_BEACON_FIELDS))
self.receiver_beacons_file.write(','.join(RECEIVER_BEACON_FIELDS))
self.aircraft_beacons_file.write(','.join(AIRCRAFT_POSITION_BEACON_FIELDS))
self.receiver_beacons_file.write(','.join(RECEIVER_POSITION_BEACON_FIELDS))
return self
def __exit__(self, *args):
@ -219,10 +249,10 @@ class FileFeeder(StringConverter):
def add(self, raw_string):
message = self._convert(raw_string)
if message['beacon_type'] in AIRCRAFT_BEACON_TYPES:
csv_string = self._get_aircraft_beacon_csv_string(message)
csv_string = self._get_aircraft_position_beacon_csv_string(message)
self.aircraft_beacons_file.write(csv_string)
elif message['beacon_type'] in RECEIVER_BEACON_TYPES:
csv_string = self._get_receiver_beacon_csv_string(message)
csv_string = self._get_receiver_position_beacon_csv_string(message)
self.receiver_beacons_file.write(csv_string)
@ -231,8 +261,10 @@ class DbFeeder(StringConverter):
self.reference_timestamp = reference_timestamp
self.reference_timestamp_autoupdate = reference_timestamp_autoupdate
self.aircraft_beacons_buffer = StringIO()
self.receiver_beacons_buffer = StringIO()
self.aircraft_position_beacons_buffer = StringIO()
self.aircraft_status_beacons_buffer = StringIO()
self.receiver_position_beacons_buffer = StringIO()
self.receiver_status_beacons_buffer = StringIO()
self.last_flush = datetime.utcnow()
@ -248,51 +280,56 @@ class DbFeeder(StringConverter):
if not message:
return
if message['beacon_type'] in AIRCRAFT_BEACON_TYPES:
csv_string = self._get_aircraft_beacon_csv_string(message, none_character=r'\N')
self.aircraft_beacons_buffer.write(csv_string)
elif message['beacon_type'] in RECEIVER_BEACON_TYPES:
csv_string = self._get_receiver_beacon_csv_string(message, none_character=r'\N')
self.receiver_beacons_buffer.write(csv_string)
if message['beacon_type'] in AIRCRAFT_BEACON_TYPES and message['aprs_type'] == 'position':
csv_string = self._get_aircraft_position_beacon_csv_string(message, none_character=r'\N')
self.aircraft_position_beacons_buffer.write(csv_string)
elif message['beacon_type'] in AIRCRAFT_BEACON_TYPES and message['aprs_type'] == 'status':
pass # ignore it
elif message['beacon_type'] in RECEIVER_BEACON_TYPES and message['aprs_type'] == 'position':
csv_string = self._get_receiver_position_beacon_csv_string(message, none_character=r'\N')
self.receiver_position_beacons_buffer.write(csv_string)
elif message['beacon_type'] in RECEIVER_BEACON_TYPES and message['aprs_type'] == 'status':
csv_string = self._get_receiver_status_beacon_csv_string(message, none_character=r'\N')
self.receiver_status_beacons_buffer.write(csv_string)
else:
current_app.logger.error(f"Not supported beacon type, skipped: {raw_string}")
current_app.logger.error(f"Not supported. beacon_type: '{message['beacon_type']}', aprs_type: '{message['aprs_type']}', skipped: {raw_string}")
if datetime.utcnow() - self.last_flush >= timedelta(seconds=1):
self.flush()
self.last_flush = datetime.utcnow()
def flush(self):
def _flush_position_beacons(self):
connection = db.engine.raw_connection()
cursor = connection.cursor()
self.aircraft_beacons_buffer.seek(0)
self.receiver_beacons_buffer.seek(0)
self.aircraft_position_beacons_buffer.seek(0)
self.receiver_position_beacons_buffer.seek(0)
cursor.execute("CREATE TEMPORARY TABLE aircraft_beacons_temp (LIKE aircraft_beacons) ON COMMIT DROP;")
cursor.execute("CREATE TEMPORARY TABLE receiver_beacons_temp (LIKE receiver_beacons) ON COMMIT DROP;")
cursor.execute("CREATE TEMPORARY TABLE aircraft_position_beacons_temp (LIKE aircraft_beacons) ON COMMIT DROP;")
cursor.execute("CREATE TEMPORARY TABLE receiver_position_beacons_temp (LIKE receiver_beacons) ON COMMIT DROP;")
cursor.copy_from(file=self.aircraft_beacons_buffer, table="aircraft_beacons_temp", sep=",", columns=AIRCRAFT_BEACON_FIELDS)
cursor.copy_from(file=self.receiver_beacons_buffer, table="receiver_beacons_temp", sep=",", columns=RECEIVER_BEACON_FIELDS)
cursor.copy_from(file=self.aircraft_position_beacons_buffer, table="aircraft_position_beacons_temp", sep=",", columns=AIRCRAFT_POSITION_BEACON_FIELDS)
cursor.copy_from(file=self.receiver_position_beacons_buffer, table="receiver_position_beacons_temp", sep=",", columns=RECEIVER_POSITION_BEACON_FIELDS)
# Update receivers
cursor.execute("""
INSERT INTO receivers AS r (name, location, altitude, firstseen, lastseen, timestamp)
SELECT DISTINCT ON (rbt.name)
rbt.name,
rbt.location,
rbt.altitude,
SELECT DISTINCT ON (rpbt.name)
rpbt.name,
rpbt.location,
rpbt.altitude,
timezone('utc', NOW()) AS firstseen,
timezone('utc', NOW()) AS lastseen,
rbt.timestamp
FROM receiver_beacons_temp AS rbt,
rpbt.timestamp
FROM receiver_position_beacons_temp AS rpbt,
(
SELECT
rbt.name,
rpbt.name,
MAX(timestamp) AS timestamp
FROM receiver_beacons_temp AS rbt
GROUP BY rbt.name
FROM receiver_position_beacons_temp AS rpbt
GROUP BY rpbt.name
) AS sq
WHERE rbt.name = sq.name AND rbt.timestamp = sq.timestamp
WHERE rpbt.name = sq.name AND rpbt.timestamp = sq.timestamp
ON CONFLICT (name) DO UPDATE
SET
location = EXCLUDED.location,
@ -303,33 +340,65 @@ class DbFeeder(StringConverter):
# Update agl
cursor.execute("""
UPDATE aircraft_beacons_temp AS abt
UPDATE aircraft_position_beacons_temp AS apbt
SET
agl = ST_Value(e.rast, abt.location)
agl = ST_Value(e.rast, apbt.location)
FROM elevation AS e
WHERE ST_Intersects(abt.location, e.rast)
WHERE ST_Intersects(apbt.location, e.rast)
""")
# ... update receiver related attributes: distance, radial, quality
cursor.execute("""
UPDATE aircraft_beacons_temp AS abt
UPDATE aircraft_position_beacons_temp AS apbt
SET
distance = CAST(ST_DistanceSphere(r.location, abt.location) AS REAL),
radial = CASE WHEN Degrees(ST_Azimuth(r.location, abt.location)) >= 359.5 THEN 0 ELSE CAST(Degrees(ST_Azimuth(r.location, abt.location)) AS INT) END,
quality = CASE WHEN ST_DistanceSphere(r.location, abt.location) > 0 THEN CAST(abt.signal_quality + 20.0 * LOG(ST_DistanceSphere(r.location, abt.location) / 10000) AS REAL) ELSE NULL END
distance = CAST(ST_DistanceSphere(r.location, apbt.location) AS REAL),
radial = CASE WHEN Degrees(ST_Azimuth(r.location, apbt.location)) >= 359.5 THEN 0 ELSE CAST(Degrees(ST_Azimuth(r.location, apbt.location)) AS INT) END,
quality = CASE WHEN ST_DistanceSphere(r.location, apbt.location) > 0 THEN CAST(apbt.signal_quality + 20.0 * LOG(ST_DistanceSphere(r.location, apbt.location) / 10000) AS REAL) ELSE NULL END
FROM receivers AS r
WHERE abt.receiver_name = r.name
WHERE apbt.receiver_name = r.name
""")
# Update devices
cursor.execute("""
INSERT INTO devices AS d (name, address, firstseen, lastseen, aircraft_type, stealth, software_version, hardware_version, real_address)
SELECT DISTINCT ON (apbt.name)
apbt.name,
apbt.address,
timezone('utc', NOW()) AS firstseen,
timezone('utc', NOW()) AS lastseen,
apbt.aircraft_type,
apbt.stealth,
apbt.software_version,
apbt.hardware_version,
apbt.real_address
FROM aircraft_position_beacons_temp AS apbt,
(
SELECT
apbt.name,
MAX(timestamp) AS timestamp
FROM aircraft_position_beacons_temp AS apbt
GROUP BY apbt.name
) AS sq
WHERE apbt.name = sq.name AND apbt.timestamp = sq.timestamp
ON CONFLICT (name) DO UPDATE
SET
lastseen = timezone('utc', NOW()),
aircraft_type = EXCLUDED.aircraft_type,
stealth = EXCLUDED.stealth,
software_version = COALESCE(EXCLUDED.software_version, d.software_version),
hardware_version = COALESCE(EXCLUDED.hardware_version, d.hardware_version),
real_address = COALESCE(EXCLUDED.real_address, d.real_address);
""")
# Insert all the beacons
cursor.execute("""
INSERT INTO aircraft_beacons
SELECT * FROM aircraft_beacons_temp
SELECT * FROM aircraft_position_beacons_temp
ON CONFLICT DO NOTHING;
""")
cursor.execute("""
INSERT INTO receiver_beacons
SELECT * FROM receiver_beacons_temp
SELECT * FROM receiver_position_beacons_temp
ON CONFLICT DO NOTHING;
""")
connection.commit()
@ -337,9 +406,80 @@ class DbFeeder(StringConverter):
cursor.close()
connection.close()
self.aircraft_beacons_buffer = StringIO()
self.receiver_beacons_buffer = StringIO()
self.aircraft_position_beacons_buffer = StringIO()
self.receiver_position_beacons_buffer = StringIO()
def _flush_status_beacons(self):
connection = db.engine.raw_connection()
cursor = connection.cursor()
self.aircraft_status_beacons_buffer.seek(0)
self.receiver_status_beacons_buffer.seek(0)
cursor.execute("CREATE TEMPORARY TABLE aircraft_status_beacons_temp (LIKE aircraft_beacons) ON COMMIT DROP;")
cursor.execute("CREATE TEMPORARY TABLE receiver_status_beacons_temp (LIKE receiver_beacons) ON COMMIT DROP;")
#cursor.copy_from(file=self.aircraft_status_beacons_buffer, table="aircraft_status_beacons_temp", sep=",", columns=AIRCRAFT_STATUS_BEACON_FIELDS)
cursor.copy_from(file=self.receiver_status_beacons_buffer, table="receiver_status_beacons_temp", sep=",", columns=RECEIVER_STATUS_BEACON_FIELDS)
# Update receivers
cursor.execute("""
INSERT INTO receivers AS r (name, timestamp, version, platform)
SELECT DISTINCT ON (rsbt.name)
rsbt.name,
rsbt.timestamp,
rsbt.version,
rsbt.platform
FROM receiver_status_beacons_temp AS rsbt,
(
SELECT
rsbt.name,
MAX(timestamp) AS timestamp
FROM receiver_status_beacons_temp AS rsbt
GROUP BY rsbt.name
) AS sq
WHERE rsbt.name = sq.name AND rsbt.timestamp = sq.timestamp
ON CONFLICT (name) DO UPDATE
SET
version = EXCLUDED.version,
platform = EXCLUDED.platform;
""")
# Update receiver_beacons
cursor.execute("""
INSERT INTO receiver_beacons AS rb (name, dstcall, receiver_name, timestamp, version, platform)
SELECT DISTINCT ON (rsbt.name)
rsbt.name,
rsbt.dstcall,
rsbt.receiver_name,
rsbt.timestamp,
rsbt.version,
rsbt.platform
FROM receiver_status_beacons_temp AS rsbt,
(
SELECT
rsbt.name,
MAX(timestamp) AS timestamp
FROM receiver_status_beacons_temp AS rsbt
GROUP BY rsbt.name
) AS sq
WHERE rsbt.name = sq.name AND rsbt.timestamp = sq.timestamp
ON CONFLICT (name, receiver_name, timestamp) DO UPDATE
SET
version = EXCLUDED.version,
platform = EXCLUDED.platform;
""")
connection.commit()
cursor.close()
connection.close()
self.aircraft_status_beacons_buffer = StringIO()
self.receiver_status_beacons_buffer = StringIO()
def flush(self):
self._flush_position_beacons()
self._flush_status_beacons()
def convert(sourcefile):
print("Fast scan of file '{}'...".format(sourcefile), end='')

Wyświetl plik

@ -30,4 +30,4 @@ class Beacon(AbstractConcreteBase, db.Model):
# Debug information
raw_message = None
reference_timestamp = None
reference_timestamp = db.Column(db.DateTime, nullable=False)

Wyświetl plik

@ -1,3 +1,5 @@
from app import db
from .beacon import Beacon
@ -9,12 +11,18 @@ class ReceiverBeacon(Beacon):
track = None
ground_speed = None
# Receiver specific data
version = db.Column(db.String)
platform = db.Column(db.String)
def __repr__(self):
return "<ReceiverBeacon %s: %s,%s,%s,%s,%s>" % (
return "<ReceiverBeacon %s: %s,%s,%s,%s,%s,%s,%s>" % (
self.name,
self.location,
self.altitude,
self.dstcall,
self.receiver_name,
self.timestamp,
self.version,
self.platform
)