kopia lustrzana https://github.com/jprochazka/adsb-receiver
				
				
				
			Added flight recording and maintenance jobs to the backend
							rodzic
							
								
									116e14dd04
								
							
						
					
					
						commit
						f443d068ce
					
				|  | @ -3,7 +3,6 @@ import psycopg2 | |||
| import sqlite3 | ||||
| import yaml | ||||
| 
 | ||||
| 
 | ||||
| config=yaml.safe_load(open("config.yml")) | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
|  | @ -0,0 +1,152 @@ | |||
| import datetime | ||||
| import fcntl | ||||
| import json | ||||
| import logging | ||||
| import os | ||||
| 
 | ||||
| from urllib import urlopen | ||||
| from backend.db import create_connection | ||||
| 
 | ||||
| class AircraftProcessor(object): | ||||
| 
 | ||||
|     # Read JSON supplied by dump1090 | ||||
|     def read_json(): | ||||
|         try: | ||||
|             raw_json = urlopen('http://127.0.0.1/dump1090/data/aircraft.json') | ||||
|             json_object = json.load(raw_json) | ||||
|             return json_object | ||||
|         except: | ||||
|             logging.error("There was a problem consuming aircraft.json") | ||||
|             return | ||||
| 
 | ||||
|     # Begin processing data retrived from dump1090 | ||||
|     def process_all_aircraft(self, all_aircraft): | ||||
| 
 | ||||
|         connection = create_connection() | ||||
|         self.cursor = connection.cursor() | ||||
| 
 | ||||
|         for aircraft in all_aircraft: | ||||
|             self.process_aircraft(aircraft) | ||||
| 
 | ||||
|         connection.commit() | ||||
|         connection.close() | ||||
| 
 | ||||
|         return | ||||
| 
 | ||||
|     # Process the aircraft | ||||
|     def process_aircraft(self, aircraft): | ||||
|         tracked=False | ||||
|         aircraft_id=None | ||||
|          | ||||
|         try: | ||||
|             self.cursor.execute("SELECT COUNT(*) FROM aircraft WHERE icao = %s", (aircraft["hex"],)) | ||||
|             if self.cursor.fetchone()[0] > 0: | ||||
|                 tracked=True | ||||
|         except Exception as ex: | ||||
|             logging.error(f"Error encountered while checking if aircraft '{aircraft["hex"]}' has already been added", exc_info=ex) | ||||
|             return | ||||
| 
 | ||||
|         if tracked: | ||||
|             query = "UPDATE aircraft SET last_seen = %s WHERE icao = %s", | ||||
|             parameters = (now, aircraft["hex"]) | ||||
|             error_message = f"Error encountered while trying to update aircraft '{aircraft["hex"]}'" | ||||
|         else: | ||||
|             query = "INSERT INTO aircraft (icao, first_seen, last_seen) VALUES (%s, %s, %s)", | ||||
|             parameters = (aircraft["hex"], now, now) | ||||
|             error_message = f"Error encountered while trying to insert aircraft '{aircraft["hex"]}'" | ||||
| 
 | ||||
|         try: | ||||
|             self.cursor.execute(query, parameters) | ||||
|             aircraft_id = self.cursor.lastrowid | ||||
|         except Exception as ex: | ||||
|             logging.error(error_message, exc_info=ex) | ||||
|             return | ||||
| 
 | ||||
|         if 'flight' in aircraft: | ||||
|             self.process_flight(aircraft_id, aircraft) | ||||
| 
 | ||||
|         return | ||||
| 
 | ||||
|     # Process the flight | ||||
|     def process_flight(self, aircraft_id, aircraft): | ||||
|         tracked=False | ||||
|         try: | ||||
|             self.cursor.execute("SELECT COUNT(*) FROM flights WHERE flight = %s", (aircraft["flight"],)) | ||||
|             if self.cursor.fetchone()[0] > 0: | ||||
|                 tracked=True | ||||
|         except Exception as ex: | ||||
|             logging.error(f"Error encountered while checking if flight '{aircraft["flight"]}' has already been added", exc_info=ex) | ||||
|             return | ||||
| 
 | ||||
|         if tracked: | ||||
|             query = "UPDATE flights SET last_seen = %s WHERE icao = %s", | ||||
|             parameters = (now, aircraft["flight"]) | ||||
|             error_message = f"Error encountered while trying to update flight '{aircraft["flight"]}'" | ||||
|         else: | ||||
|             query = "INSERT INTO flights (aircraft, flight, first_seen, last_seen) VALUES (%s, %s, %s, %s)", | ||||
|             parameters = (aircraft_id, aircraft["flight"], now, now) | ||||
|             error_message = f"Error encountered while trying to insert flight '{aircraft["flight"]}'" | ||||
| 
 | ||||
|         try: | ||||
|             self.cursor.execute(query, parameters) | ||||
|             flight_id = self.cursor.lastrowid | ||||
|         except Exception as ex: | ||||
|             logging.error(error_message, exc_info=ex) | ||||
|             return | ||||
| 
 | ||||
|         position_keys = ('lat', 'lon', 'nav_altitude', 'gs', 'track', 'geom_rate', 'hex') | ||||
|         if (all (key in aircraft for key in position_keys) and aircraft["altitude"] != "ground"): | ||||
|             self.process_positions(aircraft_id, flight_id, aircraft) | ||||
| 
 | ||||
|         return | ||||
| 
 | ||||
|     # Process positions | ||||
|     def process_positions(self, aircraft_id , flight_id, aircraft): | ||||
|         tracked=False | ||||
|         try: | ||||
|             self.cursor.execute("SELECT COUNT(*) FROM positions WHERE flight = %s AND message = %s", (flight_id, aircraft["messages"])) | ||||
|             if self.cursor.fetchone()[0] > 0: | ||||
|                 tracked=True | ||||
|         except Exception as ex: | ||||
|             logging.error(f"Error encountered while checking if position has already been added for message ID '{aircraft["messages"]}' related to flight '{flight_id}'", exc_info=ex) | ||||
|             return | ||||
| 
 | ||||
|         if tracked: | ||||
|             return | ||||
| 
 | ||||
|         squawk = None | ||||
|         if 'squawk' in aircraft: | ||||
|             squawk = aircraft["squawk"] | ||||
| 
 | ||||
|         try: | ||||
|             self.cursor.execute( | ||||
|                 "INSERT INTO positions (flight, time, message, squawk, latitude, longitude, track, altitude, verticleRate, speed, aircraft) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",  | ||||
|                 (flight_id, now, aircraft["messages"], squawk, aircraft["lat"], aircraft["lon"], aircraft["track"], aircraft["nav_altitude"], aircraft["geom_rate"], aircraft["gs"], aircraft_id) | ||||
|             ) | ||||
|             flight_id = self.cursor.lastrowid | ||||
|         except Exception as ex: | ||||
|             logging.error(f"Error encountered while inserting position data for message ID '{aircraft["messages"]}' related to flight '{flight_id}'", exc_info=ex) | ||||
|             return | ||||
|              | ||||
|         return | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     processor = AircraftProcessor() | ||||
| 
 | ||||
|     logging.info(f"Beginning flight recording job on {datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")}") | ||||
| 
 | ||||
|     # Do not allow another instance of the job to run | ||||
|     lock_file = open('/tmp/flights.py.lock','w') | ||||
|     try: | ||||
|         fcntl.flock(lock_file, fcntl.LOCK_EX|fcntl.LOCK_NB) | ||||
|     except (IOError, OSError): | ||||
|         logging.info('Another instance already running') | ||||
|         quit() | ||||
| 
 | ||||
|     # Begin flight recording job | ||||
|     lock_file.write('%d\n'%os.getpid()) | ||||
|     now = datetime.datetime.now() | ||||
|     data = processor.read_json() | ||||
|     processor.process_all_aircraft(data["aircraft"]) | ||||
|     logging.info(f"Flight recording job ended on {datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")}") | ||||
|     lock_file.flush() | ||||
|  | @ -0,0 +1,90 @@ | |||
| import fcntl | ||||
| import logging | ||||
| import os | ||||
| import yaml | ||||
| 
 | ||||
| from datetime import datetime, timedelta | ||||
| from backend.db import create_connection | ||||
| 
 | ||||
| config = yaml.safe_load(open("config.yml")) | ||||
| 
 | ||||
| class MaintenanceProcessor(object): | ||||
| 
 | ||||
|     def begin_maintenance(self): | ||||
| 
 | ||||
|         if config['database']['maintenance']['purge_old_aircraft']: | ||||
|             connection = create_connection() | ||||
|             self.cursor = connection.cursor() | ||||
| 
 | ||||
|             days_to_save = config['database']['maintenance']['days_to_save'], | ||||
|             cutoff_date = datetime.now() - timedelta(days = days_to_save) | ||||
| 
 | ||||
|             self.purge_aircraft(cutoff_date) | ||||
| 
 | ||||
|             connection.commit() | ||||
|             connection.close() | ||||
| 
 | ||||
|         return | ||||
| 
 | ||||
|     # Remove aircraft not seen since the specified date | ||||
|     def purge_aircraft(self, cutoff_date): | ||||
|         try: | ||||
|             self.cursor.execute("SELECT id FROM aircraft WHERE last_seen < %s", (cutoff_date,)) | ||||
|             aircraft_ids = self.cursor.fetchall() | ||||
|         except Exception as ex: | ||||
|             logging.error(f"Error encountered while getting aircraft IDs not seen since '{cutoff_date}'", exc_info=ex) | ||||
|             return | ||||
| 
 | ||||
|         if len(aircraft_ids) > 0: | ||||
|             id = tuple(aircraft_ids) | ||||
|             aircraft_id_params = {'id': id} | ||||
| 
 | ||||
|             try: | ||||
|                 self.cursor.execute("DELETE FROM aircraft WHERE id IN %(t)s", aircraft_id_params) | ||||
|             except Exception as ex: | ||||
|                 logging.error(f"Error deleting aircraft not seen since '{cutoff_date}'", exc_info=ex) | ||||
|                 return | ||||
|          | ||||
|             self.purge_related_flights(aircraft_id_params, cutoff_date) | ||||
|             self.purge_related_positions(aircraft_id_params, cutoff_date) | ||||
| 
 | ||||
|         return | ||||
| 
 | ||||
|     # Remove flights related to aircraft not seen since the specified date | ||||
|     def purge_related_flights(self, aircraft_id_params, cutoff_date): | ||||
|         try: | ||||
|             self.cursor.execute("DELETE FROM flights WHERE aircraft = %(t)s", aircraft_id_params) | ||||
|         except Exception as ex: | ||||
|             logging.error(f"Error deleting flights related to aircraft not seen since '{cutoff_date}'", exc_info=ex) | ||||
|             return | ||||
|          | ||||
|         return | ||||
| 
 | ||||
|     # Remove positions related to aircraft not seen since the specified date | ||||
|     def purge_related_positions(self, aircraft_id_params, cutoff_date): | ||||
|         try: | ||||
|             self.cursor.execute("DELETE FROM positions WHERE aircraft = %(t)s", aircraft_id_params) | ||||
|         except Exception as ex: | ||||
|             logging.error(f"Error deleting positions related to aircraft not seen since '{cutoff_date}'", exc_info=ex) | ||||
|             return | ||||
|          | ||||
|         return | ||||
|      | ||||
| if __name__ == "__main__": | ||||
|     processor = MaintenanceProcessor() | ||||
| 
 | ||||
|     logging.info(f"Beginning maintenance job on {datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")}") | ||||
| 
 | ||||
|     # Do not allow another instance of the job to run | ||||
|     lock_file = open('/tmp/maintenance.py.lock','w') | ||||
|     try: | ||||
|         fcntl.flock(lock_file, fcntl.LOCK_EX|fcntl.LOCK_NB) | ||||
|     except (IOError, OSError): | ||||
|         logging.info('Another instance already running') | ||||
|         quit() | ||||
|      | ||||
|     # Begin maintenance job | ||||
|     lock_file.write('%d\n'%os.getpid()) | ||||
|     processor.begin_maintenance() | ||||
|     logging.info(f"Maintenance job ended on {datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")}") | ||||
|     lock_file.flush() | ||||
|  | @ -8,7 +8,6 @@ from marshmallow import Schema, fields, ValidationError | |||
| from backend.db import create_connection | ||||
| 
 | ||||
| blog = Blueprint('blog', __name__) | ||||
| config=yaml.safe_load(open("config.yml")) | ||||
| 
 | ||||
| 
 | ||||
| class CreateBlogPostRequestSchema(Schema): | ||||
|  |  | |||
|  | @ -6,7 +6,6 @@ from flask_jwt_extended import jwt_required | |||
| from backend.db import create_connection | ||||
| 
 | ||||
| flights = Blueprint('flights', __name__) | ||||
| config=yaml.safe_load(open("config.yml")) | ||||
|          | ||||
| 
 | ||||
| @flights.route('/api/flight/<string:flight>', methods=['GET']) | ||||
|  |  | |||
|  | @ -7,7 +7,6 @@ from marshmallow import Schema, fields, ValidationError | |||
| from backend.db import create_connection | ||||
| 
 | ||||
| links = Blueprint('links', __name__) | ||||
| config=yaml.safe_load(open("config.yml")) | ||||
| 
 | ||||
| 
 | ||||
| class CreateLinkRequestSchema(Schema): | ||||
|  |  | |||
|  | @ -5,9 +5,7 @@ from flask import abort, Blueprint, jsonify, request | |||
| from flask_jwt_extended import jwt_required | ||||
| from backend.db import create_connection | ||||
| 
 | ||||
| 
 | ||||
| notifications = Blueprint('notifications', __name__) | ||||
| config=yaml.safe_load(open("config.yml")) | ||||
| 
 | ||||
| 
 | ||||
| @notifications.route('/api/notification/<string:flight>', methods=['DELETE']) | ||||
|  |  | |||
|  | @ -7,7 +7,6 @@ from marshmallow import Schema, fields, ValidationError | |||
| from backend.db import create_connection | ||||
| 
 | ||||
| settings = Blueprint('settings', __name__) | ||||
| config=yaml.safe_load(open("config.yml")) | ||||
| 
 | ||||
| 
 | ||||
| class UpdateSettingRequestSchema(Schema): | ||||
|  |  | |||
|  | @ -7,7 +7,6 @@ from marshmallow import Schema, fields, ValidationError | |||
| from backend.db import create_connection | ||||
| 
 | ||||
| users = Blueprint('users', __name__) | ||||
| config=yaml.safe_load(open("config.yml")) | ||||
| 
 | ||||
| 
 | ||||
| class CreateUserRequestSchema(Schema): | ||||
|  |  | |||
|  | @ -11,4 +11,7 @@ database: | |||
|         host: "127.0.0.1" | ||||
|         user: "portaluser" | ||||
|         password: "password" | ||||
|         database: "adsbportal" | ||||
|         database: "adsbportal" | ||||
|     maintenance: | ||||
|         purge_old_aircraft: True | ||||
|         days_to_save: 30 | ||||
		Ładowanie…
	
		Reference in New Issue
	
	 jprochazka
						jprochazka