kopia lustrzana https://github.com/projecthorus/chasemapper
Add logging of chase data to disk.
rodzic
78f3dbdc4b
commit
ccc271c35b
|
@ -0,0 +1,167 @@
|
|||
#!/usr/bin/env python
|
||||
#
|
||||
# Project Horus - Chase Logging
|
||||
#
|
||||
# Copyright (C) 2019 Mark Jessop <vk5qi@rfhead.net>
|
||||
# Released under GNU GPL v3 or later
|
||||
#
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from threading import Thread
|
||||
try:
|
||||
# Python 2
|
||||
from Queue import Queue
|
||||
except ImportError:
|
||||
# Python 3
|
||||
from queue import Queue
|
||||
|
||||
|
||||
class ChaseLogger(object):
|
||||
""" Chase Data Logger Class.
|
||||
Log all chase data into a file as lines of JSON.
|
||||
"""
|
||||
|
||||
def __init__(self, filename):
|
||||
|
||||
self.filename = filename
|
||||
|
||||
|
||||
# Input Queue.
|
||||
self.input_queue = Queue()
|
||||
|
||||
# Open the file.
|
||||
try:
|
||||
self.f = open(self.filename, 'a')
|
||||
except Exception as e:
|
||||
self.log_error("Logging - Could not open log file - %s" % str(e))
|
||||
return
|
||||
|
||||
# Start queue processing thread.
|
||||
self.input_processing_running = True
|
||||
self.log_process_thread = Thread(target=self.process_queue)
|
||||
self.log_process_thread.start()
|
||||
|
||||
|
||||
def add_car_position(self, data):
|
||||
""" Log a chase car position update.
|
||||
Input dict expected to be in the format:
|
||||
{
|
||||
'time' : _time_dt,
|
||||
'lat' : _lat,
|
||||
'lon' : _lon,
|
||||
'alt' : _alt,
|
||||
'comment': _comment
|
||||
}
|
||||
|
||||
"""
|
||||
|
||||
data['log_type'] = 'CAR POSITION'
|
||||
data['log_time'] = datetime.datetime.utcnow().isoformat()
|
||||
|
||||
# Convert the input datetime object into a string.
|
||||
data['time'] = data['time'].isoformat()
|
||||
|
||||
# Add it to the queue if we are running.
|
||||
if self.input_processing_running:
|
||||
self.input_queue.put(data)
|
||||
else:
|
||||
self.log_error("Processing not running, discarding.")
|
||||
|
||||
def add_balloon_telemetry(self, data):
|
||||
""" Log balloon telemetry.
|
||||
"""
|
||||
|
||||
data['log_type'] = 'BALLOON TELEMETRY'
|
||||
data['log_time'] = datetime.datetime.utcnow().isoformat()
|
||||
|
||||
# Convert the input datetime object into a string.
|
||||
data['time'] = data['time_dt'].isoformat()
|
||||
# Remove the time_dt element (this cannot be serialised to JSON).
|
||||
data.pop('time_dt')
|
||||
|
||||
# Add it to the queue if we are running.
|
||||
if self.input_processing_running:
|
||||
self.input_queue.put(data)
|
||||
else:
|
||||
self.log_error("Processing not running, discarding.")
|
||||
|
||||
|
||||
def add_balloon_prediction(self, data):
|
||||
""" Log a prediction run """
|
||||
|
||||
data['log_type'] = 'PREDICTION'
|
||||
data['log_time'] = datetime.datetime.utcnow().isoformat()
|
||||
|
||||
|
||||
# Add it to the queue if we are running.
|
||||
if self.input_processing_running:
|
||||
self.input_queue.put(data)
|
||||
else:
|
||||
self.log_error("Processing not running, discarding.")
|
||||
|
||||
|
||||
def process_queue(self):
|
||||
""" Process data from the input queue, and write telemetry to log files.
|
||||
"""
|
||||
self.log_info("Started Chase Logger Thread.")
|
||||
|
||||
while self.input_processing_running:
|
||||
|
||||
# Process everything in the queue.
|
||||
while self.input_queue.qsize() > 0:
|
||||
try:
|
||||
_data = self.input_queue.get_nowait()
|
||||
_data_str = json.dumps(_data)
|
||||
self.f.write(_data_str + "\n")
|
||||
except Exception as e:
|
||||
self.log_error("Error processing data - %s" % str(e))
|
||||
|
||||
# Sleep while waiting for some new data.
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
def running(self):
|
||||
""" Check if the logging thread is running.
|
||||
|
||||
Returns:
|
||||
bool: True if the logging thread is running.
|
||||
"""
|
||||
return self.input_processing_running
|
||||
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self.input_processing_running = False
|
||||
self.f.close()
|
||||
except Exception as e:
|
||||
self.log_error("Error when closing - %s" % str(e))
|
||||
|
||||
self.log_info("Stopped Telemetry Logger Thread.")
|
||||
|
||||
|
||||
def log_debug(self, line):
|
||||
""" Helper function to log a debug message with a descriptive heading.
|
||||
Args:
|
||||
line (str): Message to be logged.
|
||||
"""
|
||||
logging.debug("Chase Logger - %s" % line)
|
||||
|
||||
|
||||
def log_info(self, line):
|
||||
""" Helper function to log an informational message with a descriptive heading.
|
||||
Args:
|
||||
line (str): Message to be logged.
|
||||
"""
|
||||
logging.info("Chase Logger - %s" % line)
|
||||
|
||||
|
||||
def log_error(self, line):
|
||||
""" Helper function to log an error message with a descriptive heading.
|
||||
Args:
|
||||
line (str): Message to be logged.
|
||||
"""
|
||||
logging.error("Chase Logger - %s" % line)
|
||||
|
|
@ -26,6 +26,7 @@ from chasemapper.atmosphere import time_to_landing
|
|||
from chasemapper.listeners import OziListener, UDPListener
|
||||
from chasemapper.predictor import predictor_spawn_download, model_download_running
|
||||
from chasemapper.habitat import HabitatChaseUploader
|
||||
from chasemapper.logger import ChaseLogger
|
||||
|
||||
|
||||
# Define Flask Application, and allow automatic reloading of templates for dev work
|
||||
|
@ -38,6 +39,8 @@ app.jinja_env.auto_reload = True
|
|||
socketio = SocketIO(app)
|
||||
|
||||
|
||||
# Chase Logger Instance (Initialised in main)
|
||||
chase_logger = None
|
||||
|
||||
# Global stores of data.
|
||||
|
||||
|
@ -249,6 +252,9 @@ def handle_new_payload_position(data):
|
|||
# Update the web client.
|
||||
flask_emit_event('telemetry_event', current_payloads[_callsign]['telem'])
|
||||
|
||||
# Add the position into the logger
|
||||
chase_logger.add_balloon_telemetry(data)
|
||||
|
||||
|
||||
#
|
||||
# Predictor Code
|
||||
|
@ -399,7 +405,10 @@ def run_prediction():
|
|||
}
|
||||
flask_emit_event('predictor_update', _client_data)
|
||||
|
||||
# Clear the predictor-runnign semaphore
|
||||
# Add the prediction run to the logger.
|
||||
chase_logger.add_balloon_prediction(_client_data)
|
||||
|
||||
# Clear the predictor-running semaphore
|
||||
predictor_semaphore = False
|
||||
|
||||
|
||||
|
@ -592,6 +601,8 @@ def udp_listener_car_callback(data):
|
|||
if habitat_uploader != None:
|
||||
habitat_uploader.update_position(data)
|
||||
|
||||
# Add the car position to the logger.
|
||||
chase_logger.add_car_position(_car_position_update)
|
||||
|
||||
# Add other listeners here...
|
||||
|
||||
|
@ -744,6 +755,7 @@ if __name__ == "__main__":
|
|||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-c", "--config", type=str, default="horusmapper.cfg", help="Configuration file.")
|
||||
parser.add_argument("-v", "--verbose", action="store_true", default=False, help="Verbose output.")
|
||||
parser.add_argument("-l", "--log", type=str, default="chaselog.log", help="Log file name.")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Configure logging
|
||||
|
@ -763,6 +775,9 @@ if __name__ == "__main__":
|
|||
web_handler = WebHandler()
|
||||
logging.getLogger().addHandler(web_handler)
|
||||
|
||||
# Start the Chase Logger
|
||||
chase_logger = ChaseLogger(args.log)
|
||||
|
||||
# Attempt to read in config file.
|
||||
chasemapper_config = read_config(args.config)
|
||||
# Die if we cannot read a valid config file.
|
||||
|
@ -806,6 +821,9 @@ if __name__ == "__main__":
|
|||
predictor_thread_running = False
|
||||
data_monitor_thread_running = False
|
||||
|
||||
# Close the chase logger
|
||||
chase_logger.close()
|
||||
|
||||
if habitat_uploader != None:
|
||||
habitat_uploader.close()
|
||||
|
||||
|
|
Ładowanie…
Reference in New Issue