fixes Yakifo/amqtt#157 : get_event_loop is deprecated, replacing with asyncio.run and new_event_loop (where applicable)

pull/193/head
Andrew Mirsky 2025-06-03 11:42:50 -04:00
rodzic 2683e0772e
commit 69a1052382
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: A98E67635CDF2C39
5 zmienionych plików z 9 dodań i 16 usunięć

Wyświetl plik

@ -16,8 +16,6 @@ repos:
hooks: hooks:
- id: ruff - id: ruff
args: args:
- --fix
- --unsafe-fixes
- --line-length=130 - --line-length=130
- --exit-non-zero-on-fix - --exit-non-zero-on-fix

Wyświetl plik

@ -142,7 +142,7 @@ class Broker:
Args: Args:
config: dictionary of configuration options (see [broker configuration](broker_config.md)). config: dictionary of configuration options (see [broker configuration](broker_config.md)).
loop: asyncio loop. defaults to `asyncio.get_event_loop()`. loop: asyncio loop. defaults to `asyncio.new_event_loop()`.
plugin_namespace: plugin namespace to use when loading plugin entry_points. defaults to `amqtt.broker.plugins`. plugin_namespace: plugin namespace to use when loading plugin entry_points. defaults to `amqtt.broker.plugins`.
""" """
@ -170,7 +170,7 @@ class Broker:
self.config.update(config) self.config.update(config)
self._build_listeners_config(self.config) self._build_listeners_config(self.config)
self._loop = loop or asyncio.get_event_loop() self._loop = loop or asyncio.new_event_loop()
self._servers: dict[str, Server] = {} self._servers: dict[str, Server] = {}
self._init_states() self._init_states()
self._sessions: dict[str, tuple[Session, BrokerProtocolHandler]] = {} self._sessions: dict[str, tuple[Session, BrokerProtocolHandler]] = {}

Wyświetl plik

@ -55,20 +55,21 @@ def broker_main(
raise typer.Exit(code=1) from exc raise typer.Exit(code=1) from exc
loop = asyncio.get_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try: try:
broker = Broker(config) broker = Broker(config)
except (BrokerError, ParserError) as exc: except (BrokerError, ParserError) as exc:
typer.echo(f"❌ Broker failed to start: {exc}", err=True) typer.echo(f"❌ Broker failed to start: {exc}", err=True)
raise typer.Exit(code=1) from exc raise typer.Exit(code=1) from exc
_ = loop.create_task(broker.start()) #noqa : RUF006
try: try:
loop.run_until_complete(broker.start())
loop.run_forever() loop.run_forever()
except KeyboardInterrupt: except KeyboardInterrupt:
loop.run_until_complete(broker.shutdown()) loop.run_until_complete(broker.shutdown())
except Exception as exc: except Exception as exc:
typer.echo("Connection failed", err=True) typer.echo("Broker execution halted", err=True)
raise typer.Exit(code=1) from exc raise typer.Exit(code=1) from exc
finally: finally:
loop.close() loop.close()

Wyświetl plik

@ -182,8 +182,6 @@ def publisher_main( # pylint: disable=R0914,R0917 # noqa : PLR0913
logger.debug(f"Using default configuration from {default_config_path}") logger.debug(f"Using default configuration from {default_config_path}")
config = read_yaml_config(default_config_path) config = read_yaml_config(default_config_path)
loop = asyncio.get_event_loop()
if not client_id: if not client_id:
client_id = _gen_client_id() client_id = _gen_client_id()
@ -217,7 +215,7 @@ def publisher_main( # pylint: disable=R0914,R0917 # noqa : PLR0913
) )
with contextlib.suppress(KeyboardInterrupt): with contextlib.suppress(KeyboardInterrupt):
try: try:
loop.run_until_complete( asyncio.run(
do_pub( do_pub(
client=client, client=client,
message_input=message_input, message_input=message_input,
@ -234,8 +232,6 @@ def publisher_main( # pylint: disable=R0914,R0917 # noqa : PLR0913
typer.echo("❌ Connection failed", err=True) typer.echo("❌ Connection failed", err=True)
raise typer.Exit(code=1) from exc raise typer.Exit(code=1) from exc
loop.close()
if __name__ == "__main__": if __name__ == "__main__":
typer.run(main) typer.run(main)

Wyświetl plik

@ -147,8 +147,6 @@ def subscribe_main( # pylint: disable=R0914,R0917 # noqa : PLR0913
logger.debug(f"Using default configuration from {default_config_path}") logger.debug(f"Using default configuration from {default_config_path}")
config = read_yaml_config(default_config_path) config = read_yaml_config(default_config_path)
loop = asyncio.get_event_loop()
if not client_id: if not client_id:
client_id = _gen_client_id() client_id = _gen_client_id()
@ -175,7 +173,7 @@ def subscribe_main( # pylint: disable=R0914,R0917 # noqa : PLR0913
) )
with contextlib.suppress(KeyboardInterrupt): with contextlib.suppress(KeyboardInterrupt):
try: try:
loop.run_until_complete(do_sub(client, asyncio.run(do_sub(client,
url=url, url=url,
topics=topics, topics=topics,
ca_info=ca_info, ca_info=ca_info,
@ -184,10 +182,10 @@ def subscribe_main( # pylint: disable=R0914,R0917 # noqa : PLR0913
max_count=max_count, max_count=max_count,
clean_session=clean_session, clean_session=clean_session,
)) ))
except (ClientError, ConnectError) as exc: except (ClientError, ConnectError) as exc:
typer.echo("❌ Connection failed", err=True) typer.echo("❌ Connection failed", err=True)
raise typer.Exit(code=1) from exc raise typer.Exit(code=1) from exc
loop.close()
if __name__ == "__main__": if __name__ == "__main__":