diff --git a/build/portal/backend/backend/db.py b/build/portal/backend/backend/db.py index c709c45..5939f07 100644 --- a/build/portal/backend/backend/db.py +++ b/build/portal/backend/backend/db.py @@ -3,7 +3,6 @@ import psycopg2 import sqlite3 import yaml - config=yaml.safe_load(open("config.yml")) diff --git a/build/portal/backend/backend/jobs/flights.py b/build/portal/backend/backend/jobs/flights.py new file mode 100644 index 0000000..0987125 --- /dev/null +++ b/build/portal/backend/backend/jobs/flights.py @@ -0,0 +1,152 @@ +import datetime +import fcntl +import json +import logging +import os + +from urllib import urlopen +from backend.db import create_connection + +class AircraftProcessor(object): + + # Read JSON supplied by dump1090 + def read_json(): + try: + raw_json = urlopen('http://127.0.0.1/dump1090/data/aircraft.json') + json_object = json.load(raw_json) + return json_object + except: + logging.error("There was a problem consuming aircraft.json") + return + + # Begin processing data retrived from dump1090 + def process_all_aircraft(self, all_aircraft): + + connection = create_connection() + self.cursor = connection.cursor() + + for aircraft in all_aircraft: + self.process_aircraft(aircraft) + + connection.commit() + connection.close() + + return + + # Process the aircraft + def process_aircraft(self, aircraft): + tracked=False + aircraft_id=None + + try: + self.cursor.execute("SELECT COUNT(*) FROM aircraft WHERE icao = %s", (aircraft["hex"],)) + if self.cursor.fetchone()[0] > 0: + tracked=True + except Exception as ex: + logging.error(f"Error encountered while checking if aircraft '{aircraft["hex"]}' has already been added", exc_info=ex) + return + + if tracked: + query = "UPDATE aircraft SET last_seen = %s WHERE icao = %s", + parameters = (now, aircraft["hex"]) + error_message = f"Error encountered while trying to update aircraft '{aircraft["hex"]}'" + else: + query = "INSERT INTO aircraft (icao, first_seen, last_seen) VALUES (%s, %s, %s)", + parameters = (aircraft["hex"], now, now) + error_message = f"Error encountered while trying to insert aircraft '{aircraft["hex"]}'" + + try: + self.cursor.execute(query, parameters) + aircraft_id = self.cursor.lastrowid + except Exception as ex: + logging.error(error_message, exc_info=ex) + return + + if 'flight' in aircraft: + self.process_flight(aircraft_id, aircraft) + + return + + # Process the flight + def process_flight(self, aircraft_id, aircraft): + tracked=False + try: + self.cursor.execute("SELECT COUNT(*) FROM flights WHERE flight = %s", (aircraft["flight"],)) + if self.cursor.fetchone()[0] > 0: + tracked=True + except Exception as ex: + logging.error(f"Error encountered while checking if flight '{aircraft["flight"]}' has already been added", exc_info=ex) + return + + if tracked: + query = "UPDATE flights SET last_seen = %s WHERE icao = %s", + parameters = (now, aircraft["flight"]) + error_message = f"Error encountered while trying to update flight '{aircraft["flight"]}'" + else: + query = "INSERT INTO flights (aircraft, flight, first_seen, last_seen) VALUES (%s, %s, %s, %s)", + parameters = (aircraft_id, aircraft["flight"], now, now) + error_message = f"Error encountered while trying to insert flight '{aircraft["flight"]}'" + + try: + self.cursor.execute(query, parameters) + flight_id = self.cursor.lastrowid + except Exception as ex: + logging.error(error_message, exc_info=ex) + return + + position_keys = ('lat', 'lon', 'nav_altitude', 'gs', 'track', 'geom_rate', 'hex') + if (all (key in aircraft for key in position_keys) and aircraft["altitude"] != "ground"): + self.process_positions(aircraft_id, flight_id, aircraft) + + return + + # Process positions + def process_positions(self, aircraft_id , flight_id, aircraft): + tracked=False + try: + self.cursor.execute("SELECT COUNT(*) FROM positions WHERE flight = %s AND message = %s", (flight_id, aircraft["messages"])) + if self.cursor.fetchone()[0] > 0: + tracked=True + except Exception as ex: + logging.error(f"Error encountered while checking if position has already been added for message ID '{aircraft["messages"]}' related to flight '{flight_id}'", exc_info=ex) + return + + if tracked: + return + + squawk = None + if 'squawk' in aircraft: + squawk = aircraft["squawk"] + + try: + self.cursor.execute( + "INSERT INTO positions (flight, time, message, squawk, latitude, longitude, track, altitude, verticleRate, speed, aircraft) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", + (flight_id, now, aircraft["messages"], squawk, aircraft["lat"], aircraft["lon"], aircraft["track"], aircraft["nav_altitude"], aircraft["geom_rate"], aircraft["gs"], aircraft_id) + ) + flight_id = self.cursor.lastrowid + except Exception as ex: + logging.error(f"Error encountered while inserting position data for message ID '{aircraft["messages"]}' related to flight '{flight_id}'", exc_info=ex) + return + + return + +if __name__ == "__main__": + processor = AircraftProcessor() + + logging.info(f"Beginning flight recording job on {datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")}") + + # Do not allow another instance of the job to run + lock_file = open('/tmp/flights.py.lock','w') + try: + fcntl.flock(lock_file, fcntl.LOCK_EX|fcntl.LOCK_NB) + except (IOError, OSError): + logging.info('Another instance already running') + quit() + + # Begin flight recording job + lock_file.write('%d\n'%os.getpid()) + now = datetime.datetime.now() + data = processor.read_json() + processor.process_all_aircraft(data["aircraft"]) + logging.info(f"Flight recording job ended on {datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")}") + lock_file.flush() \ No newline at end of file diff --git a/build/portal/backend/backend/jobs/maintenance.py b/build/portal/backend/backend/jobs/maintenance.py new file mode 100644 index 0000000..d90443a --- /dev/null +++ b/build/portal/backend/backend/jobs/maintenance.py @@ -0,0 +1,90 @@ +import fcntl +import logging +import os +import yaml + +from datetime import datetime, timedelta +from backend.db import create_connection + +config = yaml.safe_load(open("config.yml")) + +class MaintenanceProcessor(object): + + def begin_maintenance(self): + + if config['database']['maintenance']['purge_old_aircraft']: + connection = create_connection() + self.cursor = connection.cursor() + + days_to_save = config['database']['maintenance']['days_to_save'], + cutoff_date = datetime.now() - timedelta(days = days_to_save) + + self.purge_aircraft(cutoff_date) + + connection.commit() + connection.close() + + return + + # Remove aircraft not seen since the specified date + def purge_aircraft(self, cutoff_date): + try: + self.cursor.execute("SELECT id FROM aircraft WHERE last_seen < %s", (cutoff_date,)) + aircraft_ids = self.cursor.fetchall() + except Exception as ex: + logging.error(f"Error encountered while getting aircraft IDs not seen since '{cutoff_date}'", exc_info=ex) + return + + if len(aircraft_ids) > 0: + id = tuple(aircraft_ids) + aircraft_id_params = {'id': id} + + try: + self.cursor.execute("DELETE FROM aircraft WHERE id IN %(t)s", aircraft_id_params) + except Exception as ex: + logging.error(f"Error deleting aircraft not seen since '{cutoff_date}'", exc_info=ex) + return + + self.purge_related_flights(aircraft_id_params, cutoff_date) + self.purge_related_positions(aircraft_id_params, cutoff_date) + + return + + # Remove flights related to aircraft not seen since the specified date + def purge_related_flights(self, aircraft_id_params, cutoff_date): + try: + self.cursor.execute("DELETE FROM flights WHERE aircraft = %(t)s", aircraft_id_params) + except Exception as ex: + logging.error(f"Error deleting flights related to aircraft not seen since '{cutoff_date}'", exc_info=ex) + return + + return + + # Remove positions related to aircraft not seen since the specified date + def purge_related_positions(self, aircraft_id_params, cutoff_date): + try: + self.cursor.execute("DELETE FROM positions WHERE aircraft = %(t)s", aircraft_id_params) + except Exception as ex: + logging.error(f"Error deleting positions related to aircraft not seen since '{cutoff_date}'", exc_info=ex) + return + + return + +if __name__ == "__main__": + processor = MaintenanceProcessor() + + logging.info(f"Beginning maintenance job on {datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")}") + + # Do not allow another instance of the job to run + lock_file = open('/tmp/maintenance.py.lock','w') + try: + fcntl.flock(lock_file, fcntl.LOCK_EX|fcntl.LOCK_NB) + except (IOError, OSError): + logging.info('Another instance already running') + quit() + + # Begin maintenance job + lock_file.write('%d\n'%os.getpid()) + processor.begin_maintenance() + logging.info(f"Maintenance job ended on {datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")}") + lock_file.flush() \ No newline at end of file diff --git a/build/portal/backend/backend/routes/blog.py b/build/portal/backend/backend/routes/blog.py index 8d3fcd6..1e512b0 100644 --- a/build/portal/backend/backend/routes/blog.py +++ b/build/portal/backend/backend/routes/blog.py @@ -8,7 +8,6 @@ from marshmallow import Schema, fields, ValidationError from backend.db import create_connection blog = Blueprint('blog', __name__) -config=yaml.safe_load(open("config.yml")) class CreateBlogPostRequestSchema(Schema): diff --git a/build/portal/backend/backend/routes/flights.py b/build/portal/backend/backend/routes/flights.py index dc0889a..afdccb3 100644 --- a/build/portal/backend/backend/routes/flights.py +++ b/build/portal/backend/backend/routes/flights.py @@ -6,7 +6,6 @@ from flask_jwt_extended import jwt_required from backend.db import create_connection flights = Blueprint('flights', __name__) -config=yaml.safe_load(open("config.yml")) @flights.route('/api/flight/', methods=['GET']) diff --git a/build/portal/backend/backend/routes/links.py b/build/portal/backend/backend/routes/links.py index 2b839f8..eff79be 100644 --- a/build/portal/backend/backend/routes/links.py +++ b/build/portal/backend/backend/routes/links.py @@ -7,7 +7,6 @@ from marshmallow import Schema, fields, ValidationError from backend.db import create_connection links = Blueprint('links', __name__) -config=yaml.safe_load(open("config.yml")) class CreateLinkRequestSchema(Schema): diff --git a/build/portal/backend/backend/routes/notifications.py b/build/portal/backend/backend/routes/notifications.py index 65c8e1d..90c46a4 100644 --- a/build/portal/backend/backend/routes/notifications.py +++ b/build/portal/backend/backend/routes/notifications.py @@ -5,9 +5,7 @@ from flask import abort, Blueprint, jsonify, request from flask_jwt_extended import jwt_required from backend.db import create_connection - notifications = Blueprint('notifications', __name__) -config=yaml.safe_load(open("config.yml")) @notifications.route('/api/notification/', methods=['DELETE']) diff --git a/build/portal/backend/backend/routes/settings.py b/build/portal/backend/backend/routes/settings.py index a82fc1a..565b5d9 100644 --- a/build/portal/backend/backend/routes/settings.py +++ b/build/portal/backend/backend/routes/settings.py @@ -7,7 +7,6 @@ from marshmallow import Schema, fields, ValidationError from backend.db import create_connection settings = Blueprint('settings', __name__) -config=yaml.safe_load(open("config.yml")) class UpdateSettingRequestSchema(Schema): diff --git a/build/portal/backend/backend/routes/users.py b/build/portal/backend/backend/routes/users.py index 53a2bda..c337b26 100644 --- a/build/portal/backend/backend/routes/users.py +++ b/build/portal/backend/backend/routes/users.py @@ -7,7 +7,6 @@ from marshmallow import Schema, fields, ValidationError from backend.db import create_connection users = Blueprint('users', __name__) -config=yaml.safe_load(open("config.yml")) class CreateUserRequestSchema(Schema): diff --git a/build/portal/backend/config.yml b/build/portal/backend/config.yml index 844f267..1002e04 100644 --- a/build/portal/backend/config.yml +++ b/build/portal/backend/config.yml @@ -11,4 +11,7 @@ database: host: "127.0.0.1" user: "portaluser" password: "password" - database: "adsbportal" \ No newline at end of file + database: "adsbportal" + maintenance: + purge_old_aircraft: True + days_to_save: 30 \ No newline at end of file