Refactoring: use f-string instead of format function

master
Konstantin Gründger 2025-04-23 12:20:30 +02:00
rodzic 3059b43dce
commit 2dd2934dba
4 zmienionych plików z 17 dodań i 19 usunięć

Wyświetl plik

@ -7,9 +7,9 @@ from ogn.client import settings
def create_aprs_login(user_name, pass_code, app_name, app_version, aprs_filter=None):
if not aprs_filter:
return "user {} pass {} vers {} {}\n".format(user_name, pass_code, app_name, app_version)
return f"user {user_name} pass {pass_code} vers {app_name} {app_version}\n"
else:
return "user {} pass {} vers {} {} filter {}\n".format(user_name, pass_code, app_name, app_version, aprs_filter)
return f"user {user_name} pass {pass_code} vers {app_name} {app_version} filter {aprs_filter}\n"
class AprsClient:
@ -46,21 +46,19 @@ class AprsClient:
self.sock_file = self.sock.makefile('rb')
self._kill = False
self.logger.info("Connect to OGN ({}/{}:{}) as {} with filter: {}".
format(self.settings.APRS_SERVER_HOST, self._sock_peer_ip, port, self.aprs_user,
"'" + self.aprs_filter + "'" if self.aprs_filter else 'none (full-feed)'))
self.logger.info(f"Connect to OGN ({self.settings.APRS_SERVER_HOST}/{self._sock_peer_ip}:{port}) as {self.aprs_user} with filter: '{self.aprs_filter}'" if self.aprs_filter else 'none (full-feed)')
break
except (socket.error, ConnectionError) as e:
self.logger.error('Connect error: {}'.format(e))
self.logger.error(f'Connect error: {e}')
if retries > 0:
self.logger.info('Waiting {}s before next connection try ({} attempts left).'.format(wait_period, retries))
self.logger.info(f'Waiting {wait_period}s before next connection try ({retries} attempts left).')
sleep(wait_period)
else:
self._kill = True
self.logger.critical('Could not connect to OGN.')
def disconnect(self):
self.logger.info('Disconnect from {}'.format(self._sock_peer_ip))
self.logger.info(f'Disconnect from {self._sock_peer_ip}')
try:
# close everything
self.sock.shutdown(0)
@ -77,7 +75,7 @@ class AprsClient:
keepalive_time = time()
while not self._kill:
if time() - keepalive_time > self.settings.APRS_KEEPALIVE_TIME:
self.logger.info('Send keepalive to {}'.format(self._sock_peer_ip))
self.logger.info(f'Send keepalive to {self._sock_peer_ip}')
self.sock.send('#keepalive\n'.encode())
timed_callback(self)
keepalive_time = time()
@ -95,7 +93,7 @@ class AprsClient:
callback(packet_str, **kwargs)
except (socket.error, ConnectionError) as e:
self.logger.error('Connect error: {}'.format(e))
self.logger.error(f'Connect error: {e}')
except OSError:
self.logger.error('OSError')
except UnicodeDecodeError:

Wyświetl plik

@ -8,12 +8,12 @@ class BaseParser():
elif aprs_type.startswith('status'):
data = self.parse_status(aprs_comment)
else:
raise ValueError("aprs_type {} unknown".format(aprs_type))
raise ValueError(f"aprs_type '{aprs_type}' unknown")
data.update({'beacon_type': self.beacon_type})
return data
def parse_position(self, aprs_comment):
raise NotImplementedError("Position parser for parser '{}' not yet implemented".format(self.beacon_type))
raise NotImplementedError(f"Position parser for parser '{self.beacon_type}' not yet implemented")
def parse_status(self, aprs_comment):
raise NotImplementedError("Status parser for parser '{}' not yet implemented".format(self.beacon_type))
raise NotImplementedError(f"Status parser for parser '{self.beacon_type}' not yet implemented")

Wyświetl plik

@ -12,7 +12,7 @@ class AprsParseError(ParseError):
def __init__(self, aprs_string):
self.aprs_string = aprs_string
self.message = "This is not a valid APRS packet: {}".format(aprs_string)
self.message = f"This is not a valid APRS packet: {aprs_string}"
super(AprsParseError, self).__init__(self.message)
@ -21,5 +21,5 @@ class OgnParseError(ParseError):
def __init__(self, aprs_comment):
self.aprs_comment = aprs_comment
self.message = "This is not a valid OGN message: {}".format(aprs_comment)
self.message = f"This is not a valid OGN message: {aprs_comment}"
super(OgnParseError, self).__init__(self.message)

Wyświetl plik

@ -70,7 +70,7 @@ def test_run(mock_socket):
KeyboardInterrupt()]
try:
client.run(callback=lambda msg: print("got: {}".format(msg)), autoreconnect=True)
client.run(callback=lambda msg: print(f"got: {msg}"), autoreconnect=True)
except KeyboardInterrupt:
pass
finally:
@ -95,7 +95,7 @@ def test_run_keepalive(mock_socket, mock_time):
timed_callback = mock.MagicMock()
try:
client.run(callback=lambda msg: print("got: {}".format(msg)), timed_callback=timed_callback)
client.run(callback=lambda msg: print(f"got: {msg}"), timed_callback=timed_callback)
except KeyboardInterrupt:
pass
finally:
@ -137,9 +137,9 @@ def test_50_live_messages():
return
try:
message = parse(raw_message)
print("{}: {}".format(message['aprs_type'], raw_message))
print(f"{message['aprs_type']}: {raw_message}")
except NotImplementedError as e:
print("{}: {}".format(e, raw_message))
print(f"{e}: {raw_message}")
return
if remaining_messages > 0:
remaining_messages -= 1