diff --git a/README.md b/README.md index 3ed1ab2..28d135c 100644 --- a/README.md +++ b/README.md @@ -30,12 +30,16 @@ beacon = parse("FLRDDDEAD>APRS,qAS,EDER:/114500h5029.86N/00956.98E'342/049/A=005 reference_timestamp=datetime(2015, 7, 31, 12, 34, 56)) ``` -### Connect to OGN and display all incoming beacons. +### Simple Example: +Connect to OGN and display all incoming beacons. ```python +import asyncio + from ogn.client import AprsClient from ogn.parser import parse, ParseError + def process_beacon(raw_message): try: beacon = parse(raw_message) @@ -45,16 +49,69 @@ def process_beacon(raw_message): except NotImplementedError as e: print('{}: {}'.format(e, raw_message)) -client = AprsClient(aprs_user='N0CALL') -client.connect() -try: - client.run(callback=process_beacon, autoreconnect=True) -except KeyboardInterrupt: - print('\nStop ogn gateway') - client.disconnect() +async def run_aprs(): + client = AprsClient(aprs_user='N0CALL') + await client.connect() + await client.run(callback=process_beacon, autoreconnect=True) + +if __name__ == '__main__': + try: + asyncio.run(run_aprs()) + except KeyboardInterrupt: + print('\nStop ogn gateway') + # await client.disconnect() + ``` +### Concurrent Example: +Same as above, but also concurrently execute your custom long-running function: + +```python +import asyncio + +from ogn.client import AprsClient +from ogn.parser import parse, ParseError + + +def process_beacon(raw_message): + try: + beacon = parse(raw_message) + print('Received {aprs_type}: {raw_message}'.format(**beacon)) + except ParseError as e: + print('Error, {}'.format(e.message)) + except NotImplementedError as e: + print('{}: {}'.format(e, raw_message)) + + +async def run_aprs(): + client = AprsClient(aprs_user='N0CALL') + await client.connect() + await client.run(callback=process_beacon, autoreconnect=True) + + +async def run_another_long_running_function(): + import datetime + while True: + print(f'The time is {datetime.datetime.now()}') + await asyncio.sleep(10) + + +async def main(): + await asyncio.gather(run_aprs(), run_another_long_running_function()) + + +if __name__ == '__main__': + asyncio.run(main()) +``` + + + + + + + + ### Connect to telnet console and display all decoded beacons. ```python diff --git a/ogn/client/client.py b/ogn/client/client.py index d172046..21498a8 100644 --- a/ogn/client/client.py +++ b/ogn/client/client.py @@ -1,3 +1,4 @@ +import asyncio import socket import logging from time import time, sleep @@ -24,26 +25,34 @@ class AprsClient: self._sock_peer_ip = None self._kill = False - def connect(self, retries=1, wait_period=15): + self.reader = None + self.writer = None + + async def connect(self, retries=1, wait_period=15): # create socket, connect to server, login and make a file object associated with the socket while retries > 0: retries -= 1 try: - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - self.sock.settimeout(5) + #self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + #self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + #self.sock.settimeout(5) if self.aprs_filter: port = self.settings.APRS_SERVER_PORT_CLIENT_DEFINED_FILTERS else: port = self.settings.APRS_SERVER_PORT_FULL_FEED - self.sock.connect((self.settings.APRS_SERVER_HOST, port)) - self._sock_peer_ip = self.sock.getpeername()[0] + # self.sock.connect((self.settings.APRS_SERVER_HOST, port)) + self.reader, self.writer = await asyncio.open_connection(self.settings.APRS_SERVER_HOST, port) + # self._sock_peer_ip = self.sock.getpeername()[0] + if (sock := self.writer.get_extra_info('socket')): + self._sock_peer_ip = sock.getpeername()[0] login = create_aprs_login(self.aprs_user, -1, self.settings.APRS_APP_NAME, self.settings.APRS_APP_VER, self.aprs_filter) - self.sock.send(login.encode()) - self.sock_file = self.sock.makefile('rb') + #self.sock.send(login.encode()) + self.writer.write(login.encode()) + await self.writer.drain() + #self.sock_file = self.sock.makefile('rb') self._kill = False self.logger.info("Connect to OGN ({}/{}:{}) as {} with filter: {}". @@ -59,18 +68,20 @@ class AprsClient: self._kill = True self.logger.critical('Could not connect to OGN.') - def disconnect(self): + async def disconnect(self): self.logger.info('Disconnect from {}'.format(self._sock_peer_ip)) try: # close everything - self.sock.shutdown(0) - self.sock.close() + # self.sock.shutdown(0) + # self.sock.close() + self.writer.close() + await self.writer.wait_closed() except OSError: self.logger.error('Socket close error') self._kill = True - def run(self, callback, timed_callback=lambda client: None, autoreconnect=False, ignore_decoding_error=True, + async def run(self, callback, timed_callback=lambda client: None, autoreconnect=False, ignore_decoding_error=True, **kwargs): while not self._kill: try: @@ -78,13 +89,16 @@ class AprsClient: while not self._kill: if time() - keepalive_time > self.settings.APRS_KEEPALIVE_TIME: self.logger.info('Send keepalive to {}'.format(self._sock_peer_ip)) - self.sock.send('#keepalive\n'.encode()) + #self.sock.send('#keepalive\n'.encode()) + self.writer.write('#keepalive\n'.encode()) + await self.writer.drain() timed_callback(self) keepalive_time = time() # Read packet string from socket - packet_b = self.sock_file.readline().strip() - packet_str = packet_b.decode(errors="replace") if ignore_decoding_error else packet_b.decode() + #packet_b = self.sock_file.readline().strip() + packet_b = await self.reader.readline() + packet_str = packet_b.strip().decode(errors="replace") if ignore_decoding_error else packet_b.decode() # A zero length line should not be return if keepalives are being sent # A zero length line will only be returned after ~30m if keepalives are not sent @@ -103,7 +117,7 @@ class AprsClient: self.logger.debug(packet_b) if autoreconnect and not self._kill: - self.connect(retries=100) + await self.connect(retries=100) else: return diff --git a/setup.py b/setup.py index 633f618..69b6667 100644 --- a/setup.py +++ b/setup.py @@ -43,7 +43,7 @@ setup( 'dev': [ 'nose==1.3.7', 'coveralls==3.2.0', - 'flake8==3.9.2' + 'flake8==4.0.0' ] }, zip_safe=False