gateway: Add callback 'process_beacon'

This change enables us to define a custom callback which is
called when the gateway receives a packet
(e.g. save the incoming beacon in a database and generate
some statistics/aggregate data).
pull/14/head
Fabian P. Schmidt 2015-11-30 15:11:10 +01:00
rodzic 851559aced
commit 81fccc154e
2 zmienionych plików z 53 dodań i 54 usunięć

Wyświetl plik

@ -3,44 +3,47 @@ import logging
from time import time from time import time
from ogn.gateway import settings from ogn.gateway import settings
from ogn.commands.dbutils import session
from ogn.aprs_parser import parse_aprs from ogn.aprs_parser import parse_aprs
from ogn.aprs_utils import create_aprs_login from ogn.aprs_utils import create_aprs_login
from ogn.exceptions import AprsParseError, OgnParseError from ogn.exceptions import AprsParseError, OgnParseError
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from ogn.model import Base
class ognGateway: class ognGateway:
def __init__(self): def __init__(self, aprs_user, aprs_filter=''):
pass
def connect_db(self):
self.session = session
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.logger.info("Connect to OGN as {} with filter '{}'".format(aprs_user, (aprs_filter if aprs_filter else 'full-feed')))
self.aprs_user = aprs_user
self.aprs_filter = aprs_filter
def connect(self, aprs_user): def connect(self):
# create socket, connect to server, login and make a file object associated with the socket # create socket, connect to server, login and make a file object associated with the socket
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.sock.connect((settings.APRS_SERVER_HOST, settings.APRS_SERVER_PORT)) self.sock.connect((settings.APRS_SERVER_HOST, settings.APRS_SERVER_PORT))
login = create_aprs_login(aprs_user, -1, settings.APRS_APP_NAME, settings.APRS_APP_VER, settings.APRS_FILTER) login = create_aprs_login(self.aprs_user, -1, settings.APRS_APP_NAME, settings.APRS_APP_VER, self.aprs_filter)
self.sock.send(login.encode()) self.sock.send(login.encode())
self.sock_file = self.sock.makefile('rw') self.sock_file = self.sock.makefile('rw')
def disconnect(self): def disconnect(self):
self.logger.info('Disconnect')
try:
# close everything # close everything
self.sock.shutdown(0) self.sock.shutdown(0)
self.sock.close() self.sock.close()
except OSError as e:
self.logger.error('Socket close error', exc_info=True)
def run(self): def run(self, callback, autoreconnect=False):
self.process_beacon = callback
while True:
try:
keepalive_time = time() keepalive_time = time()
while True: while True:
if time() - keepalive_time > settings.APRS_KEEPALIVE_TIME: if time() - keepalive_time > settings.APRS_KEEPALIVE_TIME:
logger.debug('Sending keepalive') self.logger.info('Send keepalive')
self.sock.send("#keepalive".encode()) self.sock.send('#keepalive'.encode())
keepalive_time = time() keepalive_time = time()
# Read packet string from socket # Read packet string from socket
@ -49,10 +52,19 @@ class ognGateway:
# A zero length line should not be return if keepalives are being sent # A zero length line should not be return if keepalives are being sent
# A zero length line will only be returned after ~30m if keepalives are not sent # A zero length line will only be returned after ~30m if keepalives are not sent
if len(packet_str) == 0: if len(packet_str) == 0:
logger.warning('Read returns zero length string. Failure. Orderly closeout') self.logger.warning('Read returns zero length string. Failure. Orderly closeout')
break break
self.proceed_line(packet_str) self.proceed_line(packet_str)
except BrokenPipeError:
self.logger.error('BrokenPipeError', exc_info=True)
except socket.error:
self.logger.error('socket.error', exc_info=True)
if autoreconnect:
self.connect()
else:
return
def proceed_line(self, line): def proceed_line(self, line):
try: try:
@ -66,5 +78,4 @@ class ognGateway:
return return
if beacon is not None: if beacon is not None:
self.session.add(beacon) self.process_beacon(beacon)
self.session.commit()

Wyświetl plik

@ -1,12 +1,11 @@
import socket
import logging import logging
from ogn.gateway.client import ognGateway from ogn.gateway.client import ognGateway
from ogn.commands.dbutils import session
from manager import Manager from manager import Manager
manager = Manager() manager = Manager()
DB_URI = 'sqlite:///beacons.db'
logging_formatstr = '%(asctime)s - %(levelname).4s - %(name)s - %(message)s' logging_formatstr = '%(asctime)s - %(levelname).4s - %(name)s - %(message)s'
log_levels = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'] log_levels = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG']
@ -30,29 +29,18 @@ def run(aprs_user='anon-dev', logfile='main.log', loglevel='INFO'):
logging.basicConfig(format=logging_formatstr, level=loglevel, handlers=log_handlers) logging.basicConfig(format=logging_formatstr, level=loglevel, handlers=log_handlers)
user_interrupted = False print('Start ogn gateway')
gateway = ognGateway() gateway = ognGateway(aprs_user)
gateway.connect()
print("Connect to DB") def process_beacon(beacon):
gateway.connect_db() session.add(beacon)
session.commit()
while user_interrupted is False:
gateway.connect(aprs_user)
try: try:
gateway.run() gateway.run(callback=process_beacon, autoreconnect=True)
except KeyboardInterrupt: except KeyboardInterrupt:
logger.error('User interrupted', exc_info=True) print('\nStop ogn gateway')
user_interrupted = True
except BrokenPipeError:
logger.error('BrokenPipeError', exc_info=True)
except socket.error:
logger.error('Socket error', exc_info=True)
try:
logger.info('Close socket')
gateway.disconnect() gateway.disconnect()
except OSError as e: logging.shutdown()
print('Socket close error: {}'.format(e.strerror))
logger.error('Socket close error', exc_info=True)
print("\nExit OGN gateway")