adding debug statements to determine root cause of qos failures

pull/182/head
Andrew Mirsky 2025-05-25 12:44:54 -04:00
rodzic 7af3709750
commit 255e38a947
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: A98E67635CDF2C39
3 zmienionych plików z 470 dodań i 463 usunięć

Wyświetl plik

@ -1,3 +1,4 @@
import logging
from abc import ABC, abstractmethod
import asyncio
@ -33,6 +34,9 @@ DISCONNECT = 0x0E
RESERVED_15 = 0x0F
logger = logging.getLogger(__name__)
class MQTTFixedHeader:
"""Represents the fixed header of an MQTT packet."""
@ -206,6 +210,7 @@ class MQTTPacket(Generic[_VH, _P, _FH]):
async def to_stream(self, writer: WriterAdapter) -> None:
"""Write the entire packet to the stream."""
logger.debug(f">> writing packet to stream: {self}")
writer.write(self.to_bytes())
await writer.drain()
self.protocol_ts = datetime.now(UTC)
@ -248,6 +253,7 @@ class MQTTPacket(Generic[_VH, _P, _FH]):
else:
instance = cls(fixed_header, variable_header, payload)
instance.protocol_ts = datetime.now(UTC)
logger.debug(f">> read packet from stream: {instance!r}")
return instance
@property

Wyświetl plik

@ -146,13 +146,14 @@ max-returns = 10
# ----------------------------------- PYTEST -----------------------------------
[tool.pytest.ini_options]
addopts = ["--cov=amqtt", "--cov-report=term-missing", "--cov-report=xml"]
#addopts = ["--cov=amqtt", "--cov-report=term-missing", "--cov-report=xml"]
addopts = ["--tb=short", "--capture=tee-sys"]
testpaths = ["tests"]
asyncio_mode = "auto"
timeout = 10
asyncio_default_fixture_loop_scope = "function"
# log_cli = true
# log_level = "INFO"
log_cli = true
log_level = "DEBUG"
# ------------------------------------ MYPY ------------------------------------
[tool.mypy]

Wyświetl plik

@ -1,508 +1,508 @@
# import asyncio
# import logging
# import secrets
# from typing import Any
# import unittest
import asyncio
import logging
import secrets
from typing import Any
import unittest
# from amqtt.adapters import StreamReaderAdapter, StreamWriterAdapter
# from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
# from amqtt.mqtt.protocol.handler import ProtocolHandler
# from amqtt.mqtt.puback import PubackPacket
# from amqtt.mqtt.pubcomp import PubcompPacket
# from amqtt.mqtt.publish import PublishPacket
# from amqtt.mqtt.pubrec import PubrecPacket
# from amqtt.mqtt.pubrel import PubrelPacket
# from amqtt.plugins.manager import PluginManager
# from amqtt.session import IncomingApplicationMessage, OutgoingApplicationMessage, Session
from amqtt.adapters import StreamReaderAdapter, StreamWriterAdapter
from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
from amqtt.mqtt.protocol.handler import ProtocolHandler
from amqtt.mqtt.puback import PubackPacket
from amqtt.mqtt.pubcomp import PubcompPacket
from amqtt.mqtt.publish import PublishPacket
from amqtt.mqtt.pubrec import PubrecPacket
from amqtt.mqtt.pubrel import PubrelPacket
from amqtt.plugins.manager import PluginManager
from amqtt.session import IncomingApplicationMessage, OutgoingApplicationMessage, Session
# formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
# logging.basicConfig(level=logging.DEBUG, format=formatter)
# log = logging.getLogger(__name__)
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
log = logging.getLogger(__name__)
# def rand_packet_id():
# return secrets.randbelow(65536)
def rand_packet_id():
return secrets.randbelow(65536)
# def adapt(reader, writer):
# return StreamReaderAdapter(reader), StreamWriterAdapter(writer)
def adapt(reader, writer):
return StreamReaderAdapter(reader), StreamWriterAdapter(writer)
# class ProtocolHandlerTest(unittest.TestCase):
# def setUp(self):
# self.loop = asyncio.new_event_loop()
# asyncio.set_event_loop(self.loop)
# self.plugin_manager = PluginManager("amqtt.test.plugins", context=None)
class ProtocolHandlerTest(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.plugin_manager = PluginManager("amqtt.test.plugins", context=None)
# def tearDown(self):
# self.loop.close()
def tearDown(self):
self.loop.close()
# def test_init_handler(self):
# Session()
# handler = ProtocolHandler(self.plugin_manager)
# assert handler.session is None
# assert handler._loop is self.loop
# self.check_empty_waiters(handler)
def test_init_handler(self):
Session()
handler = ProtocolHandler(self.plugin_manager)
assert handler.session is None
assert handler._loop is self.loop
self.check_empty_waiters(handler)
# def test_start_stop(self):
# async def server_mock(reader, writer) -> None:
# pass
def test_start_stop(self):
async def server_mock(reader, writer) -> None:
pass
# async def test_coro() -> None:
# try:
# s = Session()
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
# reader_adapted, writer_adapted = adapt(reader, writer)
# handler = ProtocolHandler(self.plugin_manager)
# handler.attach(s, reader_adapted, writer_adapted)
# await self.start_handler(handler, s)
# await self.stop_handler(handler, s)
# future.set_result(True)
# except Exception as ae:
# future.set_exception(ae)
async def test_coro() -> None:
try:
s = Session()
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
reader_adapted, writer_adapted = adapt(reader, writer)
handler = ProtocolHandler(self.plugin_manager)
handler.attach(s, reader_adapted, writer_adapted)
await self.start_handler(handler, s)
await self.stop_handler(handler, s)
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
# future: asyncio.Future[Any] = asyncio.Future()
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
# server = self.loop.run_until_complete(coro)
# self.loop.run_until_complete(test_coro())
# server.close()
# self.loop.run_until_complete(server.wait_closed())
# exception = future.exception()
# if exception:
# raise exception
future: asyncio.Future[Any] = asyncio.Future()
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
exception = future.exception()
if exception:
raise exception
# def test_publish_qos0(self):
# async def server_mock(reader, writer) -> None:
# try:
# packet = await PublishPacket.from_stream(reader)
# assert packet.variable_header.topic_name == "/topic"
# assert packet.qos == QOS_0
# assert packet.packet_id is None
# except Exception as ae:
# future.set_exception(ae)
def test_publish_qos0(self):
async def server_mock(reader, writer) -> None:
try:
packet = await PublishPacket.from_stream(reader)
assert packet.variable_header.topic_name == "/topic"
assert packet.qos == QOS_0
assert packet.packet_id is None
except Exception as ae:
future.set_exception(ae)
# async def test_coro() -> None:
# try:
# s = Session()
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
# reader_adapted, writer_adapted = adapt(reader, writer)
# handler = ProtocolHandler(self.plugin_manager)
# handler.attach(s, reader_adapted, writer_adapted)
# await self.start_handler(handler, s)
# message = await handler.mqtt_publish(
# "/topic",
# b"test_data",
# QOS_0,
# False,
# )
# assert isinstance(message, OutgoingApplicationMessage)
# assert message.publish_packet is not None
# assert message.puback_packet is None
# assert message.pubrec_packet is None
# assert message.pubrel_packet is None
# assert message.pubcomp_packet is None
# await self.stop_handler(handler, s)
# future.set_result(True)
# except Exception as ae:
# future.set_exception(ae)
async def test_coro() -> None:
try:
s = Session()
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
reader_adapted, writer_adapted = adapt(reader, writer)
handler = ProtocolHandler(self.plugin_manager)
handler.attach(s, reader_adapted, writer_adapted)
await self.start_handler(handler, s)
message = await handler.mqtt_publish(
"/topic",
b"test_data",
QOS_0,
False,
)
assert isinstance(message, OutgoingApplicationMessage)
assert message.publish_packet is not None
assert message.puback_packet is None
assert message.pubrec_packet is None
assert message.pubrel_packet is None
assert message.pubcomp_packet is None
await self.stop_handler(handler, s)
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
# future: asyncio.Future[Any] = asyncio.Future()
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
# server = self.loop.run_until_complete(coro)
# self.loop.run_until_complete(test_coro())
# server.close()
# self.loop.run_until_complete(server.wait_closed())
# exception = future.exception()
# if exception:
# raise exception
future: asyncio.Future[Any] = asyncio.Future()
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
exception = future.exception()
if exception:
raise exception
# def test_publish_qos1(self):
# self.handler: ProtocolHandler | None = None
def test_publish_qos1(self):
self.handler: ProtocolHandler | None = None
# async def server_mock(reader, writer) -> None:
# packet = await PublishPacket.from_stream(reader)
# try:
# assert packet.variable_header.topic_name == "/topic"
# assert packet.qos == QOS_1
# assert packet.packet_id is not None
# assert packet.packet_id in self.session.inflight_out
# assert self.handler is not None
# assert packet.packet_id in self.handler._puback_waiters
# puback = PubackPacket.build(packet.packet_id)
# await puback.to_stream(writer)
# except Exception as ae:
# future.set_exception(ae)
async def server_mock(reader, writer) -> None:
packet = await PublishPacket.from_stream(reader)
try:
assert packet.variable_header.topic_name == "/topic"
assert packet.qos == QOS_1
assert packet.packet_id is not None
assert packet.packet_id in self.session.inflight_out
assert self.handler is not None
assert packet.packet_id in self.handler._puback_waiters
puback = PubackPacket.build(packet.packet_id)
await puback.to_stream(writer)
except Exception as ae:
future.set_exception(ae)
# async def test_coro() -> None:
# try:
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
# reader_adapted, writer_adapted = adapt(reader, writer)
# self.handler = ProtocolHandler(self.plugin_manager)
# self.handler.attach(self.session, reader_adapted, writer_adapted)
# await self.start_handler(self.handler, self.session)
# message = await self.handler.mqtt_publish(
# "/topic",
# b"test_data",
# QOS_1,
# False,
# )
# assert isinstance(message, OutgoingApplicationMessage)
# assert message.publish_packet is not None
# assert message.puback_packet is not None
# assert message.pubrec_packet is None
# assert message.pubrel_packet is None
# assert message.pubcomp_packet is None
# await self.stop_handler(self.handler, self.session)
# if not future.done():
# future.set_result(True)
# except Exception as ae:
# future.set_exception(ae)
async def test_coro() -> None:
try:
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
reader_adapted, writer_adapted = adapt(reader, writer)
self.handler = ProtocolHandler(self.plugin_manager)
self.handler.attach(self.session, reader_adapted, writer_adapted)
await self.start_handler(self.handler, self.session)
message = await self.handler.mqtt_publish(
"/topic",
b"test_data",
QOS_1,
False,
)
assert isinstance(message, OutgoingApplicationMessage)
assert message.publish_packet is not None
assert message.puback_packet is not None
assert message.pubrec_packet is None
assert message.pubrel_packet is None
assert message.pubcomp_packet is None
await self.stop_handler(self.handler, self.session)
if not future.done():
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
# self.handler = None
# self.session = Session()
# future: asyncio.Future[Any] = asyncio.Future()
self.handler = None
self.session = Session()
future: asyncio.Future[Any] = asyncio.Future()
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
# server = self.loop.run_until_complete(coro)
# self.loop.run_until_complete(test_coro())
# server.close()
# self.loop.run_until_complete(server.wait_closed())
# exception = future.exception()
# if exception:
# raise exception
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
exception = future.exception()
if exception:
raise exception
# def test_publish_qos2(self):
# async def server_mock(reader, writer) -> None:
# try:
# packet = await PublishPacket.from_stream(reader)
# assert packet.topic_name == "/topic"
# assert packet.qos == QOS_2
# assert packet.packet_id is not None
# assert packet.packet_id in self.session.inflight_out
# assert self.handler is not None
# assert packet.packet_id in self.handler._pubrec_waiters
# pubrec = PubrecPacket.build(packet.packet_id)
# await pubrec.to_stream(writer)
def test_publish_qos2(self):
async def server_mock(reader, writer) -> None:
try:
packet = await PublishPacket.from_stream(reader)
assert packet.topic_name == "/topic"
assert packet.qos == QOS_2
assert packet.packet_id is not None
assert packet.packet_id in self.session.inflight_out
assert self.handler is not None
assert packet.packet_id in self.handler._pubrec_waiters
pubrec = PubrecPacket.build(packet.packet_id)
await pubrec.to_stream(writer)
# await PubrelPacket.from_stream(reader)
# assert packet.packet_id in self.handler._pubcomp_waiters
# pubcomp = PubcompPacket.build(packet.packet_id)
# await pubcomp.to_stream(writer)
# except Exception as ae:
# future.set_exception(ae)
await PubrelPacket.from_stream(reader)
assert packet.packet_id in self.handler._pubcomp_waiters
pubcomp = PubcompPacket.build(packet.packet_id)
await pubcomp.to_stream(writer)
except Exception as ae:
future.set_exception(ae)
# async def test_coro() -> None:
# try:
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
# reader_adapted, writer_adapted = adapt(reader, writer)
# self.handler = ProtocolHandler(self.plugin_manager)
# self.handler.attach(self.session, reader_adapted, writer_adapted)
# await self.start_handler(self.handler, self.session)
# message = await self.handler.mqtt_publish(
# "/topic",
# b"test_data",
# QOS_2,
# False,
# )
# assert isinstance(message, OutgoingApplicationMessage)
# assert message.publish_packet is not None
# assert message.puback_packet is None
# assert message.pubrec_packet is not None
# assert message.pubrel_packet is not None
# assert message.pubcomp_packet is not None
# await self.stop_handler(self.handler, self.session)
# if not future.done():
# future.set_result(True)
# except Exception as ae:
# future.set_exception(ae)
async def test_coro() -> None:
try:
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
reader_adapted, writer_adapted = adapt(reader, writer)
self.handler = ProtocolHandler(self.plugin_manager)
self.handler.attach(self.session, reader_adapted, writer_adapted)
await self.start_handler(self.handler, self.session)
message = await self.handler.mqtt_publish(
"/topic",
b"test_data",
QOS_2,
False,
)
assert isinstance(message, OutgoingApplicationMessage)
assert message.publish_packet is not None
assert message.puback_packet is None
assert message.pubrec_packet is not None
assert message.pubrel_packet is not None
assert message.pubcomp_packet is not None
await self.stop_handler(self.handler, self.session)
if not future.done():
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
# self.handler = None
# self.session = Session()
# future: asyncio.Future[Any] = asyncio.Future()
self.handler = None
self.session = Session()
future: asyncio.Future[Any] = asyncio.Future()
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
# server = self.loop.run_until_complete(coro)
# self.loop.run_until_complete(test_coro())
# server.close()
# self.loop.run_until_complete(server.wait_closed())
# exception = future.exception()
# if exception:
# raise exception
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
exception = future.exception()
if exception:
raise exception
# def test_receive_qos0(self):
# async def server_mock(reader, writer) -> None:
# packet = PublishPacket.build(
# "/topic",
# b"test_data",
# rand_packet_id(),
# False,
# QOS_0,
# False,
# )
# await packet.to_stream(writer)
def test_receive_qos0(self):
async def server_mock(reader, writer) -> None:
packet = PublishPacket.build(
"/topic",
b"test_data",
rand_packet_id(),
False,
QOS_0,
False,
)
await packet.to_stream(writer)
# async def test_coro() -> None:
# try:
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
# reader_adapted, writer_adapted = adapt(reader, writer)
# self.handler = ProtocolHandler(self.plugin_manager)
# self.handler.attach(self.session, reader_adapted, writer_adapted)
# await self.start_handler(self.handler, self.session)
# message = await self.handler.mqtt_deliver_next_message()
# assert isinstance(message, IncomingApplicationMessage)
# assert message.publish_packet is not None
# assert message.puback_packet is None
# assert message.pubrec_packet is None
# assert message.pubrel_packet is None
# assert message.pubcomp_packet is None
# await self.stop_handler(self.handler, self.session)
# future.set_result(True)
# except Exception as ae:
# future.set_exception(ae)
async def test_coro() -> None:
try:
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
reader_adapted, writer_adapted = adapt(reader, writer)
self.handler = ProtocolHandler(self.plugin_manager)
self.handler.attach(self.session, reader_adapted, writer_adapted)
await self.start_handler(self.handler, self.session)
message = await self.handler.mqtt_deliver_next_message()
assert isinstance(message, IncomingApplicationMessage)
assert message.publish_packet is not None
assert message.puback_packet is None
assert message.pubrec_packet is None
assert message.pubrel_packet is None
assert message.pubcomp_packet is None
await self.stop_handler(self.handler, self.session)
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
# self.handler = None
# self.session = Session()
# future: asyncio.Future[Any] = asyncio.Future()
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
# server = self.loop.run_until_complete(coro)
# self.loop.run_until_complete(test_coro())
# server.close()
# self.loop.run_until_complete(server.wait_closed())
# exception = future.exception()
# if exception:
# raise exception
self.handler = None
self.session = Session()
future: asyncio.Future[Any] = asyncio.Future()
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
exception = future.exception()
if exception:
raise exception
# def test_receive_qos1(self):
# async def server_mock(reader, writer) -> None:
# try:
# packet = PublishPacket.build(
# "/topic",
# b"test_data",
# rand_packet_id(),
# False,
# QOS_1,
# False,
# )
# await packet.to_stream(writer)
# puback = await PubackPacket.from_stream(reader)
# assert puback is not None
# assert packet.packet_id == puback.packet_id
# except Exception as ae:
# future.set_exception(ae)
def test_receive_qos1(self):
async def server_mock(reader, writer) -> None:
try:
packet = PublishPacket.build(
"/topic",
b"test_data",
rand_packet_id(),
False,
QOS_1,
False,
)
await packet.to_stream(writer)
puback = await PubackPacket.from_stream(reader)
assert puback is not None
assert packet.packet_id == puback.packet_id
except Exception as ae:
future.set_exception(ae)
# async def test_coro() -> None:
# try:
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
# reader_adapted, writer_adapted = adapt(reader, writer)
# self.handler = ProtocolHandler(self.plugin_manager)
# self.handler.attach(self.session, reader_adapted, writer_adapted)
# await self.start_handler(self.handler, self.session)
# message = await self.handler.mqtt_deliver_next_message()
# assert isinstance(message, IncomingApplicationMessage)
# assert message.publish_packet is not None
# assert message.puback_packet is not None
# assert message.pubrec_packet is None
# assert message.pubrel_packet is None
# assert message.pubcomp_packet is None
# await self.stop_handler(self.handler, self.session)
# future.set_result(True)
# except Exception as ae:
# future.set_exception(ae)
async def test_coro() -> None:
try:
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
reader_adapted, writer_adapted = adapt(reader, writer)
self.handler = ProtocolHandler(self.plugin_manager)
self.handler.attach(self.session, reader_adapted, writer_adapted)
await self.start_handler(self.handler, self.session)
message = await self.handler.mqtt_deliver_next_message()
assert isinstance(message, IncomingApplicationMessage)
assert message.publish_packet is not None
assert message.puback_packet is not None
assert message.pubrec_packet is None
assert message.pubrel_packet is None
assert message.pubcomp_packet is None
await self.stop_handler(self.handler, self.session)
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
# self.handler = None
# self.session = Session()
# future: asyncio.Future[Any] = asyncio.Future()
# self.event = asyncio.Event()
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
# server = self.loop.run_until_complete(coro)
# self.loop.run_until_complete(test_coro())
# server.close()
# self.loop.run_until_complete(server.wait_closed())
# exception = future.exception()
# if exception:
# raise exception
self.handler = None
self.session = Session()
future: asyncio.Future[Any] = asyncio.Future()
self.event = asyncio.Event()
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
exception = future.exception()
if exception:
raise exception
# def test_receive_qos2(self):
# async def server_mock(reader, writer) -> None:
# try:
# packet = PublishPacket.build(
# "/topic",
# b"test_data",
# rand_packet_id(),
# False,
# QOS_2,
# False,
# )
# await packet.to_stream(writer)
# pubrec = await PubrecPacket.from_stream(reader)
# assert pubrec is not None
# assert packet.packet_id == pubrec.packet_id
# assert self.handler is not None
# assert packet.packet_id in self.handler._pubrel_waiters
# pubrel = PubrelPacket.build(packet.packet_id)
# await pubrel.to_stream(writer)
# pubcomp = await PubcompPacket.from_stream(reader)
# assert pubcomp is not None
# assert packet.packet_id == pubcomp.packet_id
# except Exception as ae:
# future.set_exception(ae)
def test_receive_qos2(self):
async def server_mock(reader, writer) -> None:
try:
packet = PublishPacket.build(
"/topic",
b"test_data",
rand_packet_id(),
False,
QOS_2,
False,
)
await packet.to_stream(writer)
pubrec = await PubrecPacket.from_stream(reader)
assert pubrec is not None
assert packet.packet_id == pubrec.packet_id
assert self.handler is not None
assert packet.packet_id in self.handler._pubrel_waiters
pubrel = PubrelPacket.build(packet.packet_id)
await pubrel.to_stream(writer)
pubcomp = await PubcompPacket.from_stream(reader)
assert pubcomp is not None
assert packet.packet_id == pubcomp.packet_id
except Exception as ae:
future.set_exception(ae)
# async def test_coro() -> None:
# try:
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
# reader_adapted, writer_adapted = adapt(reader, writer)
# self.handler = ProtocolHandler(self.plugin_manager)
# self.handler.attach(self.session, reader_adapted, writer_adapted)
# await self.start_handler(self.handler, self.session)
# message = await self.handler.mqtt_deliver_next_message()
# assert isinstance(message, IncomingApplicationMessage)
# assert message.publish_packet is not None
# assert message.puback_packet is None
# assert message.pubrec_packet is not None
# assert message.pubrel_packet is not None
# assert message.pubcomp_packet is not None
# await self.stop_handler(self.handler, self.session)
# future.set_result(True)
# except Exception as ae:
# future.set_exception(ae)
async def test_coro() -> None:
try:
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
reader_adapted, writer_adapted = adapt(reader, writer)
self.handler = ProtocolHandler(self.plugin_manager)
self.handler.attach(self.session, reader_adapted, writer_adapted)
await self.start_handler(self.handler, self.session)
message = await self.handler.mqtt_deliver_next_message()
assert isinstance(message, IncomingApplicationMessage)
assert message.publish_packet is not None
assert message.puback_packet is None
assert message.pubrec_packet is not None
assert message.pubrel_packet is not None
assert message.pubcomp_packet is not None
await self.stop_handler(self.handler, self.session)
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
# self.handler = None
# self.session = Session()
# future: asyncio.Future[Any] = asyncio.Future()
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
# server = self.loop.run_until_complete(coro)
# self.loop.run_until_complete(test_coro())
# server.close()
# self.loop.run_until_complete(server.wait_closed())
# exception = future.exception()
# if exception:
# raise exception
self.handler = None
self.session = Session()
future: asyncio.Future[Any] = asyncio.Future()
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
exception = future.exception()
if exception:
raise exception
# async def start_handler(self, handler, session):
# self.check_empty_waiters(handler)
# self.check_no_message(session)
# await handler.start()
# assert handler._reader_ready
async def start_handler(self, handler, session):
self.check_empty_waiters(handler)
self.check_no_message(session)
await handler.start()
assert handler._reader_ready
# async def stop_handler(self, handler, session):
# await handler.stop()
# assert handler._reader_stopped
# self.check_empty_waiters(handler)
# self.check_no_message(session)
async def stop_handler(self, handler, session):
await handler.stop()
assert handler._reader_stopped
self.check_empty_waiters(handler)
self.check_no_message(session)
# def check_empty_waiters(self, handler):
# assert not handler._puback_waiters
# assert not handler._pubrec_waiters
# assert not handler._pubrel_waiters
# assert not handler._pubcomp_waiters
def check_empty_waiters(self, handler):
assert not handler._puback_waiters
assert not handler._pubrec_waiters
assert not handler._pubrel_waiters
assert not handler._pubcomp_waiters
# def check_no_message(self, session):
# assert not session.inflight_out
# assert not session.inflight_in
def check_no_message(self, session):
assert not session.inflight_out
assert not session.inflight_in
# def test_publish_qos1_retry(self):
# async def server_mock(reader, writer) -> None:
# packet = await PublishPacket.from_stream(reader)
# try:
# assert packet.topic_name == "/topic"
# assert packet.qos == QOS_1
# assert packet.packet_id is not None
# assert packet.packet_id in self.session.inflight_out
# assert self.handler is not None
# assert packet.packet_id in self.handler._puback_waiters
# puback = PubackPacket.build(packet.packet_id)
# await puback.to_stream(writer)
# except Exception as ae:
# future.set_exception(ae)
def test_publish_qos1_retry(self):
async def server_mock(reader, writer) -> None:
packet = await PublishPacket.from_stream(reader)
try:
assert packet.topic_name == "/topic"
assert packet.qos == QOS_1
assert packet.packet_id is not None
assert packet.packet_id in self.session.inflight_out
assert self.handler is not None
assert packet.packet_id in self.handler._puback_waiters
puback = PubackPacket.build(packet.packet_id)
await puback.to_stream(writer)
except Exception as ae:
future.set_exception(ae)
# async def test_coro() -> None:
# try:
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
# reader_adapted, writer_adapted = adapt(reader, writer)
# self.handler = ProtocolHandler(self.plugin_manager)
# self.handler.attach(self.session, reader_adapted, writer_adapted)
# await self.handler.start()
# await self.stop_handler(self.handler, self.session)
# if not future.done():
# future.set_result(True)
# except Exception as ae:
# future.set_exception(ae)
async def test_coro() -> None:
try:
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
reader_adapted, writer_adapted = adapt(reader, writer)
self.handler = ProtocolHandler(self.plugin_manager)
self.handler.attach(self.session, reader_adapted, writer_adapted)
await self.handler.start()
await self.stop_handler(self.handler, self.session)
if not future.done():
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
# self.handler = None
# self.session = Session()
# message = OutgoingApplicationMessage(1, "/topic", QOS_1, b"test_data", False)
# message.publish_packet = PublishPacket.build(
# "/topic",
# b"test_data",
# rand_packet_id(),
# False,
# QOS_1,
# False,
# )
# self.session.inflight_out[1] = message
# future: asyncio.Future[Any] = asyncio.Future()
self.handler = None
self.session = Session()
message = OutgoingApplicationMessage(1, "/topic", QOS_1, b"test_data", False)
message.publish_packet = PublishPacket.build(
"/topic",
b"test_data",
rand_packet_id(),
False,
QOS_1,
False,
)
self.session.inflight_out[1] = message
future: asyncio.Future[Any] = asyncio.Future()
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
# server = self.loop.run_until_complete(coro)
# self.loop.run_until_complete(test_coro())
# server.close()
# self.loop.run_until_complete(server.wait_closed())
# exception = future.exception()
# if exception:
# raise exception
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
exception = future.exception()
if exception:
raise exception
# def test_publish_qos2_retry(self):
# async def server_mock(reader, writer) -> None:
# try:
# packet = await PublishPacket.from_stream(reader)
# assert packet.topic_name == "/topic"
# assert packet.qos == QOS_2
# assert packet.packet_id is not None
# assert packet.packet_id in self.session.inflight_out
# assert self.handler is not None
# assert packet.packet_id in self.handler._pubrec_waiters
# pubrec = PubrecPacket.build(packet.packet_id)
# await pubrec.to_stream(writer)
def test_publish_qos2_retry(self):
async def server_mock(reader, writer) -> None:
try:
packet = await PublishPacket.from_stream(reader)
assert packet.topic_name == "/topic"
assert packet.qos == QOS_2
assert packet.packet_id is not None
assert packet.packet_id in self.session.inflight_out
assert self.handler is not None
assert packet.packet_id in self.handler._pubrec_waiters
pubrec = PubrecPacket.build(packet.packet_id)
await pubrec.to_stream(writer)
# await PubrelPacket.from_stream(reader)
# assert packet.packet_id in self.handler._pubcomp_waiters
# pubcomp = PubcompPacket.build(packet.packet_id)
# await pubcomp.to_stream(writer)
# except Exception as ae:
# future.set_exception(ae)
await PubrelPacket.from_stream(reader)
assert packet.packet_id in self.handler._pubcomp_waiters
pubcomp = PubcompPacket.build(packet.packet_id)
await pubcomp.to_stream(writer)
except Exception as ae:
future.set_exception(ae)
# async def test_coro() -> None:
# try:
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
# reader_adapted, writer_adapted = adapt(reader, writer)
# self.handler = ProtocolHandler(self.plugin_manager)
# self.handler.attach(self.session, reader_adapted, writer_adapted)
# await self.handler.start()
# await self.stop_handler(self.handler, self.session)
# if not future.done():
# future.set_result(True)
# except Exception as ae:
# future.set_exception(ae)
async def test_coro() -> None:
try:
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
reader_adapted, writer_adapted = adapt(reader, writer)
self.handler = ProtocolHandler(self.plugin_manager)
self.handler.attach(self.session, reader_adapted, writer_adapted)
await self.handler.start()
await self.stop_handler(self.handler, self.session)
if not future.done():
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
# self.handler = None
# self.session = Session()
# message = OutgoingApplicationMessage(1, "/topic", QOS_2, b"test_data", False)
# message.publish_packet = PublishPacket.build(
# "/topic",
# b"test_data",
# rand_packet_id(),
# False,
# QOS_2,
# False,
# )
# self.session.inflight_out[1] = message
# future: asyncio.Future[Any] = asyncio.Future()
self.handler = None
self.session = Session()
message = OutgoingApplicationMessage(1, "/topic", QOS_2, b"test_data", False)
message.publish_packet = PublishPacket.build(
"/topic",
b"test_data",
rand_packet_id(),
False,
QOS_2,
False,
)
self.session.inflight_out[1] = message
future: asyncio.Future[Any] = asyncio.Future()
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
# server = self.loop.run_until_complete(coro)
# self.loop.run_until_complete(test_coro())
# server.close()
# self.loop.run_until_complete(server.wait_closed())
# exception = future.exception()
# if exception:
# raise exception
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
server = self.loop.run_until_complete(coro)
self.loop.run_until_complete(test_coro())
server.close()
self.loop.run_until_complete(server.wait_closed())
exception = future.exception()
if exception:
raise exception