From 255e38a947afeb6429fbf21f2adddd8c93bfd166 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Sun, 25 May 2025 12:44:54 -0400 Subject: [PATCH 1/5] adding debug statements to determine root cause of qos failures --- amqtt/mqtt/packet.py | 6 + pyproject.toml | 7 +- tests/mqtt/protocol/test_handler.py | 920 ++++++++++++++-------------- 3 files changed, 470 insertions(+), 463 deletions(-) diff --git a/amqtt/mqtt/packet.py b/amqtt/mqtt/packet.py index b674e95..2e7c808 100644 --- a/amqtt/mqtt/packet.py +++ b/amqtt/mqtt/packet.py @@ -1,3 +1,4 @@ +import logging from abc import ABC, abstractmethod import asyncio @@ -33,6 +34,9 @@ DISCONNECT = 0x0E RESERVED_15 = 0x0F +logger = logging.getLogger(__name__) + + class MQTTFixedHeader: """Represents the fixed header of an MQTT packet.""" @@ -206,6 +210,7 @@ class MQTTPacket(Generic[_VH, _P, _FH]): async def to_stream(self, writer: WriterAdapter) -> None: """Write the entire packet to the stream.""" + logger.debug(f">> writing packet to stream: {self}") writer.write(self.to_bytes()) await writer.drain() self.protocol_ts = datetime.now(UTC) @@ -248,6 +253,7 @@ class MQTTPacket(Generic[_VH, _P, _FH]): else: instance = cls(fixed_header, variable_header, payload) instance.protocol_ts = datetime.now(UTC) + logger.debug(f">> read packet from stream: {instance!r}") return instance @property diff --git a/pyproject.toml b/pyproject.toml index 5e319da..4d22e67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -146,13 +146,14 @@ max-returns = 10 # ----------------------------------- PYTEST ----------------------------------- [tool.pytest.ini_options] -addopts = ["--cov=amqtt", "--cov-report=term-missing", "--cov-report=xml"] +#addopts = ["--cov=amqtt", "--cov-report=term-missing", "--cov-report=xml"] +addopts = ["--tb=short", "--capture=tee-sys"] testpaths = ["tests"] asyncio_mode = "auto" timeout = 10 asyncio_default_fixture_loop_scope = "function" -# log_cli = true -# log_level = "INFO" +log_cli = true +log_level = "DEBUG" # ------------------------------------ MYPY ------------------------------------ [tool.mypy] diff --git a/tests/mqtt/protocol/test_handler.py b/tests/mqtt/protocol/test_handler.py index 069a0fb..045e2fb 100644 --- a/tests/mqtt/protocol/test_handler.py +++ b/tests/mqtt/protocol/test_handler.py @@ -1,508 +1,508 @@ -# import asyncio -# import logging -# import secrets -# from typing import Any -# import unittest +import asyncio +import logging +import secrets +from typing import Any +import unittest -# from amqtt.adapters import StreamReaderAdapter, StreamWriterAdapter -# from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 -# from amqtt.mqtt.protocol.handler import ProtocolHandler -# from amqtt.mqtt.puback import PubackPacket -# from amqtt.mqtt.pubcomp import PubcompPacket -# from amqtt.mqtt.publish import PublishPacket -# from amqtt.mqtt.pubrec import PubrecPacket -# from amqtt.mqtt.pubrel import PubrelPacket -# from amqtt.plugins.manager import PluginManager -# from amqtt.session import IncomingApplicationMessage, OutgoingApplicationMessage, Session +from amqtt.adapters import StreamReaderAdapter, StreamWriterAdapter +from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 +from amqtt.mqtt.protocol.handler import ProtocolHandler +from amqtt.mqtt.puback import PubackPacket +from amqtt.mqtt.pubcomp import PubcompPacket +from amqtt.mqtt.publish import PublishPacket +from amqtt.mqtt.pubrec import PubrecPacket +from amqtt.mqtt.pubrel import PubrelPacket +from amqtt.plugins.manager import PluginManager +from amqtt.session import IncomingApplicationMessage, OutgoingApplicationMessage, Session -# formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" -# logging.basicConfig(level=logging.DEBUG, format=formatter) -# log = logging.getLogger(__name__) +formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" +logging.basicConfig(level=logging.DEBUG, format=formatter) +log = logging.getLogger(__name__) -# def rand_packet_id(): -# return secrets.randbelow(65536) +def rand_packet_id(): + return secrets.randbelow(65536) -# def adapt(reader, writer): -# return StreamReaderAdapter(reader), StreamWriterAdapter(writer) +def adapt(reader, writer): + return StreamReaderAdapter(reader), StreamWriterAdapter(writer) -# class ProtocolHandlerTest(unittest.TestCase): -# def setUp(self): -# self.loop = asyncio.new_event_loop() -# asyncio.set_event_loop(self.loop) -# self.plugin_manager = PluginManager("amqtt.test.plugins", context=None) +class ProtocolHandlerTest(unittest.TestCase): + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self.plugin_manager = PluginManager("amqtt.test.plugins", context=None) -# def tearDown(self): -# self.loop.close() + def tearDown(self): + self.loop.close() -# def test_init_handler(self): -# Session() -# handler = ProtocolHandler(self.plugin_manager) -# assert handler.session is None -# assert handler._loop is self.loop -# self.check_empty_waiters(handler) + def test_init_handler(self): + Session() + handler = ProtocolHandler(self.plugin_manager) + assert handler.session is None + assert handler._loop is self.loop + self.check_empty_waiters(handler) -# def test_start_stop(self): -# async def server_mock(reader, writer) -> None: -# pass + def test_start_stop(self): + async def server_mock(reader, writer) -> None: + pass -# async def test_coro() -> None: -# try: -# s = Session() -# reader, writer = await asyncio.open_connection("127.0.0.1", 8888) -# reader_adapted, writer_adapted = adapt(reader, writer) -# handler = ProtocolHandler(self.plugin_manager) -# handler.attach(s, reader_adapted, writer_adapted) -# await self.start_handler(handler, s) -# await self.stop_handler(handler, s) -# future.set_result(True) -# except Exception as ae: -# future.set_exception(ae) + async def test_coro() -> None: + try: + s = Session() + reader, writer = await asyncio.open_connection("127.0.0.1", 8888) + reader_adapted, writer_adapted = adapt(reader, writer) + handler = ProtocolHandler(self.plugin_manager) + handler.attach(s, reader_adapted, writer_adapted) + await self.start_handler(handler, s) + await self.stop_handler(handler, s) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) -# future: asyncio.Future[Any] = asyncio.Future() -# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) -# server = self.loop.run_until_complete(coro) -# self.loop.run_until_complete(test_coro()) -# server.close() -# self.loop.run_until_complete(server.wait_closed()) -# exception = future.exception() -# if exception: -# raise exception + future: asyncio.Future[Any] = asyncio.Future() + coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) + server = self.loop.run_until_complete(coro) + self.loop.run_until_complete(test_coro()) + server.close() + self.loop.run_until_complete(server.wait_closed()) + exception = future.exception() + if exception: + raise exception -# def test_publish_qos0(self): -# async def server_mock(reader, writer) -> None: -# try: -# packet = await PublishPacket.from_stream(reader) -# assert packet.variable_header.topic_name == "/topic" -# assert packet.qos == QOS_0 -# assert packet.packet_id is None -# except Exception as ae: -# future.set_exception(ae) + def test_publish_qos0(self): + async def server_mock(reader, writer) -> None: + try: + packet = await PublishPacket.from_stream(reader) + assert packet.variable_header.topic_name == "/topic" + assert packet.qos == QOS_0 + assert packet.packet_id is None + except Exception as ae: + future.set_exception(ae) -# async def test_coro() -> None: -# try: -# s = Session() -# reader, writer = await asyncio.open_connection("127.0.0.1", 8888) -# reader_adapted, writer_adapted = adapt(reader, writer) -# handler = ProtocolHandler(self.plugin_manager) -# handler.attach(s, reader_adapted, writer_adapted) -# await self.start_handler(handler, s) -# message = await handler.mqtt_publish( -# "/topic", -# b"test_data", -# QOS_0, -# False, -# ) -# assert isinstance(message, OutgoingApplicationMessage) -# assert message.publish_packet is not None -# assert message.puback_packet is None -# assert message.pubrec_packet is None -# assert message.pubrel_packet is None -# assert message.pubcomp_packet is None -# await self.stop_handler(handler, s) -# future.set_result(True) -# except Exception as ae: -# future.set_exception(ae) + async def test_coro() -> None: + try: + s = Session() + reader, writer = await asyncio.open_connection("127.0.0.1", 8888) + reader_adapted, writer_adapted = adapt(reader, writer) + handler = ProtocolHandler(self.plugin_manager) + handler.attach(s, reader_adapted, writer_adapted) + await self.start_handler(handler, s) + message = await handler.mqtt_publish( + "/topic", + b"test_data", + QOS_0, + False, + ) + assert isinstance(message, OutgoingApplicationMessage) + assert message.publish_packet is not None + assert message.puback_packet is None + assert message.pubrec_packet is None + assert message.pubrel_packet is None + assert message.pubcomp_packet is None + await self.stop_handler(handler, s) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) -# future: asyncio.Future[Any] = asyncio.Future() -# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) -# server = self.loop.run_until_complete(coro) -# self.loop.run_until_complete(test_coro()) -# server.close() -# self.loop.run_until_complete(server.wait_closed()) -# exception = future.exception() -# if exception: -# raise exception + future: asyncio.Future[Any] = asyncio.Future() + coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) + server = self.loop.run_until_complete(coro) + self.loop.run_until_complete(test_coro()) + server.close() + self.loop.run_until_complete(server.wait_closed()) + exception = future.exception() + if exception: + raise exception -# def test_publish_qos1(self): -# self.handler: ProtocolHandler | None = None + def test_publish_qos1(self): + self.handler: ProtocolHandler | None = None -# async def server_mock(reader, writer) -> None: -# packet = await PublishPacket.from_stream(reader) -# try: -# assert packet.variable_header.topic_name == "/topic" -# assert packet.qos == QOS_1 -# assert packet.packet_id is not None -# assert packet.packet_id in self.session.inflight_out -# assert self.handler is not None -# assert packet.packet_id in self.handler._puback_waiters -# puback = PubackPacket.build(packet.packet_id) -# await puback.to_stream(writer) -# except Exception as ae: -# future.set_exception(ae) + async def server_mock(reader, writer) -> None: + packet = await PublishPacket.from_stream(reader) + try: + assert packet.variable_header.topic_name == "/topic" + assert packet.qos == QOS_1 + assert packet.packet_id is not None + assert packet.packet_id in self.session.inflight_out + assert self.handler is not None + assert packet.packet_id in self.handler._puback_waiters + puback = PubackPacket.build(packet.packet_id) + await puback.to_stream(writer) + except Exception as ae: + future.set_exception(ae) -# async def test_coro() -> None: -# try: -# reader, writer = await asyncio.open_connection("127.0.0.1", 8888) -# reader_adapted, writer_adapted = adapt(reader, writer) -# self.handler = ProtocolHandler(self.plugin_manager) -# self.handler.attach(self.session, reader_adapted, writer_adapted) -# await self.start_handler(self.handler, self.session) -# message = await self.handler.mqtt_publish( -# "/topic", -# b"test_data", -# QOS_1, -# False, -# ) -# assert isinstance(message, OutgoingApplicationMessage) -# assert message.publish_packet is not None -# assert message.puback_packet is not None -# assert message.pubrec_packet is None -# assert message.pubrel_packet is None -# assert message.pubcomp_packet is None -# await self.stop_handler(self.handler, self.session) -# if not future.done(): -# future.set_result(True) -# except Exception as ae: -# future.set_exception(ae) + async def test_coro() -> None: + try: + reader, writer = await asyncio.open_connection("127.0.0.1", 8888) + reader_adapted, writer_adapted = adapt(reader, writer) + self.handler = ProtocolHandler(self.plugin_manager) + self.handler.attach(self.session, reader_adapted, writer_adapted) + await self.start_handler(self.handler, self.session) + message = await self.handler.mqtt_publish( + "/topic", + b"test_data", + QOS_1, + False, + ) + assert isinstance(message, OutgoingApplicationMessage) + assert message.publish_packet is not None + assert message.puback_packet is not None + assert message.pubrec_packet is None + assert message.pubrel_packet is None + assert message.pubcomp_packet is None + await self.stop_handler(self.handler, self.session) + if not future.done(): + future.set_result(True) + except Exception as ae: + future.set_exception(ae) -# self.handler = None -# self.session = Session() -# future: asyncio.Future[Any] = asyncio.Future() + self.handler = None + self.session = Session() + future: asyncio.Future[Any] = asyncio.Future() -# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) -# server = self.loop.run_until_complete(coro) -# self.loop.run_until_complete(test_coro()) -# server.close() -# self.loop.run_until_complete(server.wait_closed()) -# exception = future.exception() -# if exception: -# raise exception + coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) + server = self.loop.run_until_complete(coro) + self.loop.run_until_complete(test_coro()) + server.close() + self.loop.run_until_complete(server.wait_closed()) + exception = future.exception() + if exception: + raise exception -# def test_publish_qos2(self): -# async def server_mock(reader, writer) -> None: -# try: -# packet = await PublishPacket.from_stream(reader) -# assert packet.topic_name == "/topic" -# assert packet.qos == QOS_2 -# assert packet.packet_id is not None -# assert packet.packet_id in self.session.inflight_out -# assert self.handler is not None -# assert packet.packet_id in self.handler._pubrec_waiters -# pubrec = PubrecPacket.build(packet.packet_id) -# await pubrec.to_stream(writer) + def test_publish_qos2(self): + async def server_mock(reader, writer) -> None: + try: + packet = await PublishPacket.from_stream(reader) + assert packet.topic_name == "/topic" + assert packet.qos == QOS_2 + assert packet.packet_id is not None + assert packet.packet_id in self.session.inflight_out + assert self.handler is not None + assert packet.packet_id in self.handler._pubrec_waiters + pubrec = PubrecPacket.build(packet.packet_id) + await pubrec.to_stream(writer) -# await PubrelPacket.from_stream(reader) -# assert packet.packet_id in self.handler._pubcomp_waiters -# pubcomp = PubcompPacket.build(packet.packet_id) -# await pubcomp.to_stream(writer) -# except Exception as ae: -# future.set_exception(ae) + await PubrelPacket.from_stream(reader) + assert packet.packet_id in self.handler._pubcomp_waiters + pubcomp = PubcompPacket.build(packet.packet_id) + await pubcomp.to_stream(writer) + except Exception as ae: + future.set_exception(ae) -# async def test_coro() -> None: -# try: -# reader, writer = await asyncio.open_connection("127.0.0.1", 8888) -# reader_adapted, writer_adapted = adapt(reader, writer) -# self.handler = ProtocolHandler(self.plugin_manager) -# self.handler.attach(self.session, reader_adapted, writer_adapted) -# await self.start_handler(self.handler, self.session) -# message = await self.handler.mqtt_publish( -# "/topic", -# b"test_data", -# QOS_2, -# False, -# ) -# assert isinstance(message, OutgoingApplicationMessage) -# assert message.publish_packet is not None -# assert message.puback_packet is None -# assert message.pubrec_packet is not None -# assert message.pubrel_packet is not None -# assert message.pubcomp_packet is not None -# await self.stop_handler(self.handler, self.session) -# if not future.done(): -# future.set_result(True) -# except Exception as ae: -# future.set_exception(ae) + async def test_coro() -> None: + try: + reader, writer = await asyncio.open_connection("127.0.0.1", 8888) + reader_adapted, writer_adapted = adapt(reader, writer) + self.handler = ProtocolHandler(self.plugin_manager) + self.handler.attach(self.session, reader_adapted, writer_adapted) + await self.start_handler(self.handler, self.session) + message = await self.handler.mqtt_publish( + "/topic", + b"test_data", + QOS_2, + False, + ) + assert isinstance(message, OutgoingApplicationMessage) + assert message.publish_packet is not None + assert message.puback_packet is None + assert message.pubrec_packet is not None + assert message.pubrel_packet is not None + assert message.pubcomp_packet is not None + await self.stop_handler(self.handler, self.session) + if not future.done(): + future.set_result(True) + except Exception as ae: + future.set_exception(ae) -# self.handler = None -# self.session = Session() -# future: asyncio.Future[Any] = asyncio.Future() + self.handler = None + self.session = Session() + future: asyncio.Future[Any] = asyncio.Future() -# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) -# server = self.loop.run_until_complete(coro) -# self.loop.run_until_complete(test_coro()) -# server.close() -# self.loop.run_until_complete(server.wait_closed()) -# exception = future.exception() -# if exception: -# raise exception + coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) + server = self.loop.run_until_complete(coro) + self.loop.run_until_complete(test_coro()) + server.close() + self.loop.run_until_complete(server.wait_closed()) + exception = future.exception() + if exception: + raise exception -# def test_receive_qos0(self): -# async def server_mock(reader, writer) -> None: -# packet = PublishPacket.build( -# "/topic", -# b"test_data", -# rand_packet_id(), -# False, -# QOS_0, -# False, -# ) -# await packet.to_stream(writer) + def test_receive_qos0(self): + async def server_mock(reader, writer) -> None: + packet = PublishPacket.build( + "/topic", + b"test_data", + rand_packet_id(), + False, + QOS_0, + False, + ) + await packet.to_stream(writer) -# async def test_coro() -> None: -# try: -# reader, writer = await asyncio.open_connection("127.0.0.1", 8888) -# reader_adapted, writer_adapted = adapt(reader, writer) -# self.handler = ProtocolHandler(self.plugin_manager) -# self.handler.attach(self.session, reader_adapted, writer_adapted) -# await self.start_handler(self.handler, self.session) -# message = await self.handler.mqtt_deliver_next_message() -# assert isinstance(message, IncomingApplicationMessage) -# assert message.publish_packet is not None -# assert message.puback_packet is None -# assert message.pubrec_packet is None -# assert message.pubrel_packet is None -# assert message.pubcomp_packet is None -# await self.stop_handler(self.handler, self.session) -# future.set_result(True) -# except Exception as ae: -# future.set_exception(ae) + async def test_coro() -> None: + try: + reader, writer = await asyncio.open_connection("127.0.0.1", 8888) + reader_adapted, writer_adapted = adapt(reader, writer) + self.handler = ProtocolHandler(self.plugin_manager) + self.handler.attach(self.session, reader_adapted, writer_adapted) + await self.start_handler(self.handler, self.session) + message = await self.handler.mqtt_deliver_next_message() + assert isinstance(message, IncomingApplicationMessage) + assert message.publish_packet is not None + assert message.puback_packet is None + assert message.pubrec_packet is None + assert message.pubrel_packet is None + assert message.pubcomp_packet is None + await self.stop_handler(self.handler, self.session) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) -# self.handler = None -# self.session = Session() -# future: asyncio.Future[Any] = asyncio.Future() -# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) -# server = self.loop.run_until_complete(coro) -# self.loop.run_until_complete(test_coro()) -# server.close() -# self.loop.run_until_complete(server.wait_closed()) -# exception = future.exception() -# if exception: -# raise exception + self.handler = None + self.session = Session() + future: asyncio.Future[Any] = asyncio.Future() + coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) + server = self.loop.run_until_complete(coro) + self.loop.run_until_complete(test_coro()) + server.close() + self.loop.run_until_complete(server.wait_closed()) + exception = future.exception() + if exception: + raise exception -# def test_receive_qos1(self): -# async def server_mock(reader, writer) -> None: -# try: -# packet = PublishPacket.build( -# "/topic", -# b"test_data", -# rand_packet_id(), -# False, -# QOS_1, -# False, -# ) -# await packet.to_stream(writer) -# puback = await PubackPacket.from_stream(reader) -# assert puback is not None -# assert packet.packet_id == puback.packet_id -# except Exception as ae: -# future.set_exception(ae) + def test_receive_qos1(self): + async def server_mock(reader, writer) -> None: + try: + packet = PublishPacket.build( + "/topic", + b"test_data", + rand_packet_id(), + False, + QOS_1, + False, + ) + await packet.to_stream(writer) + puback = await PubackPacket.from_stream(reader) + assert puback is not None + assert packet.packet_id == puback.packet_id + except Exception as ae: + future.set_exception(ae) -# async def test_coro() -> None: -# try: -# reader, writer = await asyncio.open_connection("127.0.0.1", 8888) -# reader_adapted, writer_adapted = adapt(reader, writer) -# self.handler = ProtocolHandler(self.plugin_manager) -# self.handler.attach(self.session, reader_adapted, writer_adapted) -# await self.start_handler(self.handler, self.session) -# message = await self.handler.mqtt_deliver_next_message() -# assert isinstance(message, IncomingApplicationMessage) -# assert message.publish_packet is not None -# assert message.puback_packet is not None -# assert message.pubrec_packet is None -# assert message.pubrel_packet is None -# assert message.pubcomp_packet is None -# await self.stop_handler(self.handler, self.session) -# future.set_result(True) -# except Exception as ae: -# future.set_exception(ae) + async def test_coro() -> None: + try: + reader, writer = await asyncio.open_connection("127.0.0.1", 8888) + reader_adapted, writer_adapted = adapt(reader, writer) + self.handler = ProtocolHandler(self.plugin_manager) + self.handler.attach(self.session, reader_adapted, writer_adapted) + await self.start_handler(self.handler, self.session) + message = await self.handler.mqtt_deliver_next_message() + assert isinstance(message, IncomingApplicationMessage) + assert message.publish_packet is not None + assert message.puback_packet is not None + assert message.pubrec_packet is None + assert message.pubrel_packet is None + assert message.pubcomp_packet is None + await self.stop_handler(self.handler, self.session) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) -# self.handler = None -# self.session = Session() -# future: asyncio.Future[Any] = asyncio.Future() -# self.event = asyncio.Event() -# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) -# server = self.loop.run_until_complete(coro) -# self.loop.run_until_complete(test_coro()) -# server.close() -# self.loop.run_until_complete(server.wait_closed()) -# exception = future.exception() -# if exception: -# raise exception + self.handler = None + self.session = Session() + future: asyncio.Future[Any] = asyncio.Future() + self.event = asyncio.Event() + coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) + server = self.loop.run_until_complete(coro) + self.loop.run_until_complete(test_coro()) + server.close() + self.loop.run_until_complete(server.wait_closed()) + exception = future.exception() + if exception: + raise exception -# def test_receive_qos2(self): -# async def server_mock(reader, writer) -> None: -# try: -# packet = PublishPacket.build( -# "/topic", -# b"test_data", -# rand_packet_id(), -# False, -# QOS_2, -# False, -# ) -# await packet.to_stream(writer) -# pubrec = await PubrecPacket.from_stream(reader) -# assert pubrec is not None -# assert packet.packet_id == pubrec.packet_id -# assert self.handler is not None -# assert packet.packet_id in self.handler._pubrel_waiters -# pubrel = PubrelPacket.build(packet.packet_id) -# await pubrel.to_stream(writer) -# pubcomp = await PubcompPacket.from_stream(reader) -# assert pubcomp is not None -# assert packet.packet_id == pubcomp.packet_id -# except Exception as ae: -# future.set_exception(ae) + def test_receive_qos2(self): + async def server_mock(reader, writer) -> None: + try: + packet = PublishPacket.build( + "/topic", + b"test_data", + rand_packet_id(), + False, + QOS_2, + False, + ) + await packet.to_stream(writer) + pubrec = await PubrecPacket.from_stream(reader) + assert pubrec is not None + assert packet.packet_id == pubrec.packet_id + assert self.handler is not None + assert packet.packet_id in self.handler._pubrel_waiters + pubrel = PubrelPacket.build(packet.packet_id) + await pubrel.to_stream(writer) + pubcomp = await PubcompPacket.from_stream(reader) + assert pubcomp is not None + assert packet.packet_id == pubcomp.packet_id + except Exception as ae: + future.set_exception(ae) -# async def test_coro() -> None: -# try: -# reader, writer = await asyncio.open_connection("127.0.0.1", 8888) -# reader_adapted, writer_adapted = adapt(reader, writer) -# self.handler = ProtocolHandler(self.plugin_manager) -# self.handler.attach(self.session, reader_adapted, writer_adapted) -# await self.start_handler(self.handler, self.session) -# message = await self.handler.mqtt_deliver_next_message() -# assert isinstance(message, IncomingApplicationMessage) -# assert message.publish_packet is not None -# assert message.puback_packet is None -# assert message.pubrec_packet is not None -# assert message.pubrel_packet is not None -# assert message.pubcomp_packet is not None -# await self.stop_handler(self.handler, self.session) -# future.set_result(True) -# except Exception as ae: -# future.set_exception(ae) + async def test_coro() -> None: + try: + reader, writer = await asyncio.open_connection("127.0.0.1", 8888) + reader_adapted, writer_adapted = adapt(reader, writer) + self.handler = ProtocolHandler(self.plugin_manager) + self.handler.attach(self.session, reader_adapted, writer_adapted) + await self.start_handler(self.handler, self.session) + message = await self.handler.mqtt_deliver_next_message() + assert isinstance(message, IncomingApplicationMessage) + assert message.publish_packet is not None + assert message.puback_packet is None + assert message.pubrec_packet is not None + assert message.pubrel_packet is not None + assert message.pubcomp_packet is not None + await self.stop_handler(self.handler, self.session) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) -# self.handler = None -# self.session = Session() -# future: asyncio.Future[Any] = asyncio.Future() -# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) -# server = self.loop.run_until_complete(coro) -# self.loop.run_until_complete(test_coro()) -# server.close() -# self.loop.run_until_complete(server.wait_closed()) -# exception = future.exception() -# if exception: -# raise exception + self.handler = None + self.session = Session() + future: asyncio.Future[Any] = asyncio.Future() + coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) + server = self.loop.run_until_complete(coro) + self.loop.run_until_complete(test_coro()) + server.close() + self.loop.run_until_complete(server.wait_closed()) + exception = future.exception() + if exception: + raise exception -# async def start_handler(self, handler, session): -# self.check_empty_waiters(handler) -# self.check_no_message(session) -# await handler.start() -# assert handler._reader_ready + async def start_handler(self, handler, session): + self.check_empty_waiters(handler) + self.check_no_message(session) + await handler.start() + assert handler._reader_ready -# async def stop_handler(self, handler, session): -# await handler.stop() -# assert handler._reader_stopped -# self.check_empty_waiters(handler) -# self.check_no_message(session) + async def stop_handler(self, handler, session): + await handler.stop() + assert handler._reader_stopped + self.check_empty_waiters(handler) + self.check_no_message(session) -# def check_empty_waiters(self, handler): -# assert not handler._puback_waiters -# assert not handler._pubrec_waiters -# assert not handler._pubrel_waiters -# assert not handler._pubcomp_waiters + def check_empty_waiters(self, handler): + assert not handler._puback_waiters + assert not handler._pubrec_waiters + assert not handler._pubrel_waiters + assert not handler._pubcomp_waiters -# def check_no_message(self, session): -# assert not session.inflight_out -# assert not session.inflight_in + def check_no_message(self, session): + assert not session.inflight_out + assert not session.inflight_in -# def test_publish_qos1_retry(self): -# async def server_mock(reader, writer) -> None: -# packet = await PublishPacket.from_stream(reader) -# try: -# assert packet.topic_name == "/topic" -# assert packet.qos == QOS_1 -# assert packet.packet_id is not None -# assert packet.packet_id in self.session.inflight_out -# assert self.handler is not None -# assert packet.packet_id in self.handler._puback_waiters -# puback = PubackPacket.build(packet.packet_id) -# await puback.to_stream(writer) -# except Exception as ae: -# future.set_exception(ae) + def test_publish_qos1_retry(self): + async def server_mock(reader, writer) -> None: + packet = await PublishPacket.from_stream(reader) + try: + assert packet.topic_name == "/topic" + assert packet.qos == QOS_1 + assert packet.packet_id is not None + assert packet.packet_id in self.session.inflight_out + assert self.handler is not None + assert packet.packet_id in self.handler._puback_waiters + puback = PubackPacket.build(packet.packet_id) + await puback.to_stream(writer) + except Exception as ae: + future.set_exception(ae) -# async def test_coro() -> None: -# try: -# reader, writer = await asyncio.open_connection("127.0.0.1", 8888) -# reader_adapted, writer_adapted = adapt(reader, writer) -# self.handler = ProtocolHandler(self.plugin_manager) -# self.handler.attach(self.session, reader_adapted, writer_adapted) -# await self.handler.start() -# await self.stop_handler(self.handler, self.session) -# if not future.done(): -# future.set_result(True) -# except Exception as ae: -# future.set_exception(ae) + async def test_coro() -> None: + try: + reader, writer = await asyncio.open_connection("127.0.0.1", 8888) + reader_adapted, writer_adapted = adapt(reader, writer) + self.handler = ProtocolHandler(self.plugin_manager) + self.handler.attach(self.session, reader_adapted, writer_adapted) + await self.handler.start() + await self.stop_handler(self.handler, self.session) + if not future.done(): + future.set_result(True) + except Exception as ae: + future.set_exception(ae) -# self.handler = None -# self.session = Session() -# message = OutgoingApplicationMessage(1, "/topic", QOS_1, b"test_data", False) -# message.publish_packet = PublishPacket.build( -# "/topic", -# b"test_data", -# rand_packet_id(), -# False, -# QOS_1, -# False, -# ) -# self.session.inflight_out[1] = message -# future: asyncio.Future[Any] = asyncio.Future() + self.handler = None + self.session = Session() + message = OutgoingApplicationMessage(1, "/topic", QOS_1, b"test_data", False) + message.publish_packet = PublishPacket.build( + "/topic", + b"test_data", + rand_packet_id(), + False, + QOS_1, + False, + ) + self.session.inflight_out[1] = message + future: asyncio.Future[Any] = asyncio.Future() -# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) -# server = self.loop.run_until_complete(coro) -# self.loop.run_until_complete(test_coro()) -# server.close() -# self.loop.run_until_complete(server.wait_closed()) -# exception = future.exception() -# if exception: -# raise exception + coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) + server = self.loop.run_until_complete(coro) + self.loop.run_until_complete(test_coro()) + server.close() + self.loop.run_until_complete(server.wait_closed()) + exception = future.exception() + if exception: + raise exception -# def test_publish_qos2_retry(self): -# async def server_mock(reader, writer) -> None: -# try: -# packet = await PublishPacket.from_stream(reader) -# assert packet.topic_name == "/topic" -# assert packet.qos == QOS_2 -# assert packet.packet_id is not None -# assert packet.packet_id in self.session.inflight_out -# assert self.handler is not None -# assert packet.packet_id in self.handler._pubrec_waiters -# pubrec = PubrecPacket.build(packet.packet_id) -# await pubrec.to_stream(writer) + def test_publish_qos2_retry(self): + async def server_mock(reader, writer) -> None: + try: + packet = await PublishPacket.from_stream(reader) + assert packet.topic_name == "/topic" + assert packet.qos == QOS_2 + assert packet.packet_id is not None + assert packet.packet_id in self.session.inflight_out + assert self.handler is not None + assert packet.packet_id in self.handler._pubrec_waiters + pubrec = PubrecPacket.build(packet.packet_id) + await pubrec.to_stream(writer) -# await PubrelPacket.from_stream(reader) -# assert packet.packet_id in self.handler._pubcomp_waiters -# pubcomp = PubcompPacket.build(packet.packet_id) -# await pubcomp.to_stream(writer) -# except Exception as ae: -# future.set_exception(ae) + await PubrelPacket.from_stream(reader) + assert packet.packet_id in self.handler._pubcomp_waiters + pubcomp = PubcompPacket.build(packet.packet_id) + await pubcomp.to_stream(writer) + except Exception as ae: + future.set_exception(ae) -# async def test_coro() -> None: -# try: -# reader, writer = await asyncio.open_connection("127.0.0.1", 8888) -# reader_adapted, writer_adapted = adapt(reader, writer) -# self.handler = ProtocolHandler(self.plugin_manager) -# self.handler.attach(self.session, reader_adapted, writer_adapted) -# await self.handler.start() -# await self.stop_handler(self.handler, self.session) -# if not future.done(): -# future.set_result(True) -# except Exception as ae: -# future.set_exception(ae) + async def test_coro() -> None: + try: + reader, writer = await asyncio.open_connection("127.0.0.1", 8888) + reader_adapted, writer_adapted = adapt(reader, writer) + self.handler = ProtocolHandler(self.plugin_manager) + self.handler.attach(self.session, reader_adapted, writer_adapted) + await self.handler.start() + await self.stop_handler(self.handler, self.session) + if not future.done(): + future.set_result(True) + except Exception as ae: + future.set_exception(ae) -# self.handler = None -# self.session = Session() -# message = OutgoingApplicationMessage(1, "/topic", QOS_2, b"test_data", False) -# message.publish_packet = PublishPacket.build( -# "/topic", -# b"test_data", -# rand_packet_id(), -# False, -# QOS_2, -# False, -# ) -# self.session.inflight_out[1] = message -# future: asyncio.Future[Any] = asyncio.Future() + self.handler = None + self.session = Session() + message = OutgoingApplicationMessage(1, "/topic", QOS_2, b"test_data", False) + message.publish_packet = PublishPacket.build( + "/topic", + b"test_data", + rand_packet_id(), + False, + QOS_2, + False, + ) + self.session.inflight_out[1] = message + future: asyncio.Future[Any] = asyncio.Future() -# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) -# server = self.loop.run_until_complete(coro) -# self.loop.run_until_complete(test_coro()) -# server.close() -# self.loop.run_until_complete(server.wait_closed()) -# exception = future.exception() -# if exception: -# raise exception + coro = asyncio.start_server(server_mock, "127.0.0.1", 8888) + server = self.loop.run_until_complete(coro) + self.loop.run_until_complete(test_coro()) + server.close() + self.loop.run_until_complete(server.wait_closed()) + exception = future.exception() + if exception: + raise exception From a7f2ae5746756af94c05bc18282f27d4a9099a94 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Mon, 26 May 2025 08:48:10 -0400 Subject: [PATCH 2/5] test_init_handler: protocolhandler gets the current loop, if not provided and self.loop isn't the current loop, it's a new loop; ProtocolHandler.__init__: handler was using the session's keepalive, even though if the keepalive is zero, it needs to bypass --- amqtt/mqtt/packet.py | 6 ------ amqtt/mqtt/protocol/handler.py | 2 +- tests/mqtt/protocol/test_handler.py | 2 +- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/amqtt/mqtt/packet.py b/amqtt/mqtt/packet.py index 2e7c808..b674e95 100644 --- a/amqtt/mqtt/packet.py +++ b/amqtt/mqtt/packet.py @@ -1,4 +1,3 @@ -import logging from abc import ABC, abstractmethod import asyncio @@ -34,9 +33,6 @@ DISCONNECT = 0x0E RESERVED_15 = 0x0F -logger = logging.getLogger(__name__) - - class MQTTFixedHeader: """Represents the fixed header of an MQTT packet.""" @@ -210,7 +206,6 @@ class MQTTPacket(Generic[_VH, _P, _FH]): async def to_stream(self, writer: WriterAdapter) -> None: """Write the entire packet to the stream.""" - logger.debug(f">> writing packet to stream: {self}") writer.write(self.to_bytes()) await writer.drain() self.protocol_ts = datetime.now(UTC) @@ -253,7 +248,6 @@ class MQTTPacket(Generic[_VH, _P, _FH]): else: instance = cls(fixed_header, variable_header, payload) instance.protocol_ts = datetime.now(UTC) - logger.debug(f">> read packet from stream: {instance!r}") return instance @property diff --git a/amqtt/mqtt/protocol/handler.py b/amqtt/mqtt/protocol/handler.py index 6bea05f..505de73 100644 --- a/amqtt/mqtt/protocol/handler.py +++ b/amqtt/mqtt/protocol/handler.py @@ -446,7 +446,7 @@ class ProtocolHandler: self.logger.debug(f"{self.session.client_id} Starting reader coro") running_tasks: collections.deque[asyncio.Task[None]] = collections.deque() keepalive_timeout: int | None = self.session.keep_alive - if keepalive_timeout and keepalive_timeout <= 0: + if keepalive_timeout is not None and keepalive_timeout <= 0: keepalive_timeout = None while True: try: diff --git a/tests/mqtt/protocol/test_handler.py b/tests/mqtt/protocol/test_handler.py index 045e2fb..ef8c6a9 100644 --- a/tests/mqtt/protocol/test_handler.py +++ b/tests/mqtt/protocol/test_handler.py @@ -39,7 +39,7 @@ class ProtocolHandlerTest(unittest.TestCase): def test_init_handler(self): Session() - handler = ProtocolHandler(self.plugin_manager) + handler = ProtocolHandler(self.plugin_manager, loop=self.loop) assert handler.session is None assert handler._loop is self.loop self.check_empty_waiters(handler) From 508f42ac30ca39806797cb254fced25728924caf Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Mon, 26 May 2025 08:49:06 -0400 Subject: [PATCH 3/5] reverting changes needed for debug --- pyproject.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 4d22e67..a51ead4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -146,14 +146,14 @@ max-returns = 10 # ----------------------------------- PYTEST ----------------------------------- [tool.pytest.ini_options] -#addopts = ["--cov=amqtt", "--cov-report=term-missing", "--cov-report=xml"] -addopts = ["--tb=short", "--capture=tee-sys"] +addopts = ["--cov=amqtt", "--cov-report=term-missing", "--cov-report=xml"] testpaths = ["tests"] asyncio_mode = "auto" timeout = 10 asyncio_default_fixture_loop_scope = "function" -log_cli = true -log_level = "DEBUG" +#addopts = ["--tb=short", "--capture=tee-sys"] +#log_cli = true +#log_level = "DEBUG" # ------------------------------------ MYPY ------------------------------------ [tool.mypy] From 5c5305baa6b89ec6a171cdfe967c9b3bf68ea90e Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Mon, 26 May 2025 09:03:51 -0400 Subject: [PATCH 4/5] while the explicit close to the asyncio.open_server docs say that you need to close the writer to have it properly exit, it seems that only python 3.12 requires the explicit close --- tests/mqtt/protocol/test_handler.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/tests/mqtt/protocol/test_handler.py b/tests/mqtt/protocol/test_handler.py index ef8c6a9..e110c94 100644 --- a/tests/mqtt/protocol/test_handler.py +++ b/tests/mqtt/protocol/test_handler.py @@ -46,7 +46,8 @@ class ProtocolHandlerTest(unittest.TestCase): def test_start_stop(self): async def server_mock(reader, writer) -> None: - pass + writer.close() # python 3.12 requires an explicit close + await writer.wait_closed() async def test_coro() -> None: try: @@ -78,6 +79,9 @@ class ProtocolHandlerTest(unittest.TestCase): assert packet.variable_header.topic_name == "/topic" assert packet.qos == QOS_0 assert packet.packet_id is None + writer.close() # python 3.12 requires an explicit close + await writer.wait_closed() + except Exception as ae: future.set_exception(ae) @@ -130,6 +134,8 @@ class ProtocolHandlerTest(unittest.TestCase): assert packet.packet_id in self.handler._puback_waiters puback = PubackPacket.build(packet.packet_id) await puback.to_stream(writer) + writer.close() # python 3.12 requires an explicit close + await writer.wait_closed() except Exception as ae: future.set_exception(ae) @@ -188,6 +194,8 @@ class ProtocolHandlerTest(unittest.TestCase): assert packet.packet_id in self.handler._pubcomp_waiters pubcomp = PubcompPacket.build(packet.packet_id) await pubcomp.to_stream(writer) + writer.close() # python 3.12 requires an explicit close + await writer.wait_closed() except Exception as ae: future.set_exception(ae) @@ -240,6 +248,8 @@ class ProtocolHandlerTest(unittest.TestCase): False, ) await packet.to_stream(writer) + writer.close() # python 3.12 requires an explicit close + await writer.wait_closed() async def test_coro() -> None: try: @@ -257,6 +267,7 @@ class ProtocolHandlerTest(unittest.TestCase): assert message.pubcomp_packet is None await self.stop_handler(self.handler, self.session) future.set_result(True) + except Exception as ae: future.set_exception(ae) @@ -287,6 +298,8 @@ class ProtocolHandlerTest(unittest.TestCase): puback = await PubackPacket.from_stream(reader) assert puback is not None assert packet.packet_id == puback.packet_id + writer.close() # python 3.12 requires an explicit close + await writer.wait_closed() except Exception as ae: future.set_exception(ae) @@ -344,6 +357,8 @@ class ProtocolHandlerTest(unittest.TestCase): pubcomp = await PubcompPacket.from_stream(reader) assert pubcomp is not None assert packet.packet_id == pubcomp.packet_id + writer.close() # python 3.12 requires an explicit close + await writer.wait_closed() except Exception as ae: future.set_exception(ae) @@ -412,6 +427,8 @@ class ProtocolHandlerTest(unittest.TestCase): assert packet.packet_id in self.handler._puback_waiters puback = PubackPacket.build(packet.packet_id) await puback.to_stream(writer) + writer.close() # python 3.12 requires an explicit close + await writer.wait_closed() except Exception as ae: future.set_exception(ae) @@ -468,6 +485,9 @@ class ProtocolHandlerTest(unittest.TestCase): assert packet.packet_id in self.handler._pubcomp_waiters pubcomp = PubcompPacket.build(packet.packet_id) await pubcomp.to_stream(writer) + writer.close() # python 3.12 requires an explicit close + await writer.wait_closed() + except Exception as ae: future.set_exception(ae) From ec31def92c368a2b9f4eb07016a8bacac0dae4e0 Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Mon, 26 May 2025 10:46:11 -0400 Subject: [PATCH 5/5] enhancing the 'test_init_handler' case so that it checks to make sure that the ProtocolHandler defaults to the correct loop. --- tests/mqtt/protocol/test_handler.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/tests/mqtt/protocol/test_handler.py b/tests/mqtt/protocol/test_handler.py index e110c94..349016c 100644 --- a/tests/mqtt/protocol/test_handler.py +++ b/tests/mqtt/protocol/test_handler.py @@ -38,11 +38,23 @@ class ProtocolHandlerTest(unittest.TestCase): self.loop.close() def test_init_handler(self): - Session() - handler = ProtocolHandler(self.plugin_manager, loop=self.loop) - assert handler.session is None - assert handler._loop is self.loop - self.check_empty_waiters(handler) + + async def test_coro() -> None: + try: + Session() + handler = ProtocolHandler(self.plugin_manager) + assert handler.session is None + assert handler._loop is self.loop + self.check_empty_waiters(handler) + future.set_result(True) + except Exception as ae: + future.set_exception(ae) + + future: asyncio.Future[Any] = asyncio.Future() + self.loop.run_until_complete(test_coro()) + exception = future.exception() + if exception: + raise exception def test_start_stop(self): async def server_mock(reader, writer) -> None: