ogn-python/app/commands/gateway.py

135 wiersze
4.8 KiB
Python
Czysty Zwykły widok Historia

2019-09-25 21:17:30 +00:00
import os
2020-05-30 12:28:30 +00:00
from datetime import datetime, timezone
2020-10-27 19:46:14 +00:00
import time
2019-09-25 21:17:30 +00:00
2019-09-12 20:53:42 +00:00
from flask import current_app
2019-02-25 19:00:51 +00:00
from flask.cli import AppGroup
2019-09-25 21:17:30 +00:00
import click
2020-05-30 12:28:30 +00:00
from tqdm import tqdm
2019-02-25 19:00:51 +00:00
from ogn.client import AprsClient
2020-10-27 19:46:14 +00:00
from ogn.parser import parse
2019-09-25 21:17:30 +00:00
2020-05-30 12:28:30 +00:00
from app import redis_client
2020-10-27 19:46:14 +00:00
from app.gateway.beacon_conversion import aprs_string_to_message
from app.gateway.message_handling import receiver_status_message_to_csv_string, receiver_position_message_to_csv_string, sender_position_message_to_csv_string
from app.collect.gateway import transfer_from_redis_to_database
2019-02-25 19:00:51 +00:00
2019-08-31 08:14:41 +00:00
user_cli = AppGroup("gateway")
2019-02-25 19:00:51 +00:00
user_cli.help = "Connection to APRS servers."
2019-08-31 08:14:41 +00:00
@user_cli.command("run")
2020-10-27 19:46:14 +00:00
@click.option("--aprs_filter", default='')
def run(aprs_filter):
"""
Run the aprs client, parse the incoming data and put it to redis.
"""
2019-02-25 19:00:51 +00:00
2019-09-12 20:53:42 +00:00
current_app.logger.warning("Start ogn gateway")
2020-10-27 19:46:14 +00:00
client = AprsClient(current_app.config['APRS_USER'], aprs_filter)
2019-02-25 19:00:51 +00:00
client.connect()
2020-05-30 12:28:30 +00:00
def insert_into_redis(aprs_string):
2020-10-27 19:46:14 +00:00
# Convert aprs_string to message dict, add MGRS Position, flatten gps precision, etc. etc. ...
message = aprs_string_to_message(aprs_string)
if message is None:
return
# separate between tables (receiver/sender) and aprs_type (status/position)
if message['beacon_type'] in ('aprs_receiver', 'receiver'):
if message['aprs_type'] == 'status':
redis_target = 'receiver_status'
csv_string = receiver_status_message_to_csv_string(message, none_character=r'\N')
elif message['aprs_type'] == 'position':
redis_target = 'receiver_position'
csv_string = receiver_position_message_to_csv_string(message, none_character=r'\N')
else:
return
else:
if message['aprs_type'] == 'status':
return # no interesting data we want to keep
elif message['aprs_type'] == 'position':
redis_target = 'sender_position'
csv_string = sender_position_message_to_csv_string(message, none_character=r'\N')
else:
return
mapping = {csv_string: str(time.time())}
redis_client.zadd(name=redis_target, mapping=mapping, nx=True)
2020-05-30 12:28:30 +00:00
insert_into_redis.beacon_counter += 1
2020-10-27 19:46:14 +00:00
current_minute = datetime.utcnow().minute
if current_minute != insert_into_redis.last_minute:
current_app.logger.warning(f"{insert_into_redis.beacon_counter:7d}")
2020-05-30 12:28:30 +00:00
insert_into_redis.beacon_counter = 0
2020-10-27 19:46:14 +00:00
insert_into_redis.last_minute = current_minute
2020-05-30 12:28:30 +00:00
insert_into_redis.beacon_counter = 0
2020-10-27 19:46:14 +00:00
insert_into_redis.last_minute = datetime.utcnow().minute
2020-05-30 12:28:30 +00:00
try:
client.run(callback=insert_into_redis, autoreconnect=True)
except KeyboardInterrupt:
current_app.logger.warning("\nStop ogn gateway")
2019-02-25 19:00:51 +00:00
client.disconnect()
2019-09-25 21:17:30 +00:00
2020-10-27 19:46:14 +00:00
@user_cli.command("transfer")
def transfer():
"""Transfer data from redis to the database."""
transfer_from_redis_to_database()
2020-05-30 12:28:30 +00:00
@user_cli.command("printout")
2020-10-27 19:46:14 +00:00
@click.option("--aprs_filter", default='')
def printout(aprs_filter):
2020-05-30 12:28:30 +00:00
"""Run the aprs client and just print out the data stream."""
2020-10-27 19:46:14 +00:00
2020-05-30 12:28:30 +00:00
current_app.logger.warning("Start ogn gateway")
2020-10-27 19:46:14 +00:00
client = AprsClient(current_app.config['APRS_USER'], aprs_filter=aprs_filter)
2020-05-30 12:28:30 +00:00
client.connect()
try:
client.run(callback=lambda x: print(f"{datetime.utcnow()}: {x}"), autoreconnect=True)
except KeyboardInterrupt:
current_app.logger.warning("\nStop ogn gateway")
client.disconnect()
2019-09-25 21:17:30 +00:00
@user_cli.command("convert")
@click.argument("path")
def file_import(path):
"""Convert APRS logfiles into csv files for fast bulk import."""
for (root, dirs, files) in os.walk(path):
for file in sorted(files):
2020-05-30 12:28:30 +00:00
print(file)
convert(os.path.join(root, file))
@user_cli.command("calculate")
@click.argument("path")
def file_calculate(path):
"""Import csv files, calculate geographic features (distance, radial, agl, ...) and make data distinct."""
file_tuples = []
for (root, dirs, files) in os.walk(path):
for file in sorted(files):
if file.startswith('aircraft_beacons') and file.endswith('.csv.gz'):
ab_filename = os.path.join(root, file)
rb_filename = os.path.join(root, 'receiver' + file[8:])
target_filename = os.path.join(root, file + '2')
if os.path.isfile(target_filename):
print("Outputfile {} already exists. Skipping".format(target_filename))
else:
file_tuples.append((ab_filename, rb_filename, target_filename))
2019-09-25 21:17:30 +00:00
2020-05-30 12:28:30 +00:00
pbar = tqdm(file_tuples)
for file_tuple in pbar:
pbar.set_description("Converting {}".format(file_tuple[0]))
calculate(file_tuple[0], file_tuple[1], file_tuple[2])