diff --git a/ogn/gateway/client.py b/ogn/gateway/client.py index d73e300..2309120 100644 --- a/ogn/gateway/client.py +++ b/ogn/gateway/client.py @@ -3,56 +3,68 @@ import logging from time import time from ogn.gateway import settings -from ogn.commands.dbutils import session from ogn.aprs_parser import parse_aprs from ogn.aprs_utils import create_aprs_login from ogn.exceptions import AprsParseError, OgnParseError -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker -from ogn.model import Base - class ognGateway: - def __init__(self): - pass - def connect_db(self): - self.session = session + def __init__(self, aprs_user, aprs_filter=''): self.logger = logging.getLogger(__name__) + self.logger.info("Connect to OGN as {} with filter '{}'".format(aprs_user, (aprs_filter if aprs_filter else 'full-feed'))) + self.aprs_user = aprs_user + self.aprs_filter = aprs_filter - def connect(self, aprs_user): + def connect(self): # create socket, connect to server, login and make a file object associated with the socket self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) self.sock.connect((settings.APRS_SERVER_HOST, settings.APRS_SERVER_PORT)) - login = create_aprs_login(aprs_user, -1, settings.APRS_APP_NAME, settings.APRS_APP_VER, settings.APRS_FILTER) + login = create_aprs_login(self.aprs_user, -1, settings.APRS_APP_NAME, settings.APRS_APP_VER, self.aprs_filter) self.sock.send(login.encode()) self.sock_file = self.sock.makefile('rw') def disconnect(self): - # close everything - self.sock.shutdown(0) - self.sock.close() + self.logger.info('Disconnect') + try: + # close everything + self.sock.shutdown(0) + self.sock.close() + except OSError as e: + self.logger.error('Socket close error', exc_info=True) + + def run(self, callback, autoreconnect=False): + self.process_beacon = callback - def run(self): - keepalive_time = time() while True: - if time() - keepalive_time > settings.APRS_KEEPALIVE_TIME: - logger.debug('Sending keepalive') - self.sock.send("#keepalive".encode()) + try: keepalive_time = time() + while True: + if time() - keepalive_time > settings.APRS_KEEPALIVE_TIME: + self.logger.info('Send keepalive') + self.sock.send('#keepalive'.encode()) + keepalive_time = time() - # Read packet string from socket - packet_str = self.sock_file.readline().strip() + # Read packet string from socket + packet_str = self.sock_file.readline().strip() - # A zero length line should not be return if keepalives are being sent - # A zero length line will only be returned after ~30m if keepalives are not sent - if len(packet_str) == 0: - logger.warning('Read returns zero length string. Failure. Orderly closeout') - break + # A zero length line should not be return if keepalives are being sent + # A zero length line will only be returned after ~30m if keepalives are not sent + if len(packet_str) == 0: + self.logger.warning('Read returns zero length string. Failure. Orderly closeout') + break - self.proceed_line(packet_str) + self.proceed_line(packet_str) + except BrokenPipeError: + self.logger.error('BrokenPipeError', exc_info=True) + except socket.error: + self.logger.error('socket.error', exc_info=True) + + if autoreconnect: + self.connect() + else: + return def proceed_line(self, line): try: @@ -66,5 +78,4 @@ class ognGateway: return if beacon is not None: - self.session.add(beacon) - self.session.commit() + self.process_beacon(beacon) diff --git a/ogn/gateway/manage.py b/ogn/gateway/manage.py index 75a15ad..6c409fc 100644 --- a/ogn/gateway/manage.py +++ b/ogn/gateway/manage.py @@ -1,12 +1,11 @@ -import socket import logging from ogn.gateway.client import ognGateway +from ogn.commands.dbutils import session from manager import Manager manager = Manager() -DB_URI = 'sqlite:///beacons.db' logging_formatstr = '%(asctime)s - %(levelname).4s - %(name)s - %(message)s' log_levels = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'] @@ -30,29 +29,18 @@ def run(aprs_user='anon-dev', logfile='main.log', loglevel='INFO'): logging.basicConfig(format=logging_formatstr, level=loglevel, handlers=log_handlers) - user_interrupted = False - gateway = ognGateway() + print('Start ogn gateway') + gateway = ognGateway(aprs_user) + gateway.connect() - print("Connect to DB") - gateway.connect_db() + def process_beacon(beacon): + session.add(beacon) + session.commit() - while user_interrupted is False: - gateway.connect(aprs_user) - try: - gateway.run() - except KeyboardInterrupt: - logger.error('User interrupted', exc_info=True) - user_interrupted = True - except BrokenPipeError: - logger.error('BrokenPipeError', exc_info=True) - except socket.error: - logger.error('Socket error', exc_info=True) + try: + gateway.run(callback=process_beacon, autoreconnect=True) + except KeyboardInterrupt: + print('\nStop ogn gateway') - try: - logger.info('Close socket') - gateway.disconnect() - except OSError as e: - print('Socket close error: {}'.format(e.strerror)) - logger.error('Socket close error', exc_info=True) - - print("\nExit OGN gateway") + gateway.disconnect() + logging.shutdown()