2024-12-29 18:23:27 +00:00
|
|
|
# import asyncio
|
|
|
|
# import logging
|
|
|
|
# import secrets
|
|
|
|
# from typing import Any
|
|
|
|
# import unittest
|
|
|
|
|
|
|
|
# from amqtt.adapters import StreamReaderAdapter, StreamWriterAdapter
|
|
|
|
# from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
|
|
|
|
# from amqtt.mqtt.protocol.handler import ProtocolHandler
|
|
|
|
# from amqtt.mqtt.puback import PubackPacket
|
|
|
|
# from amqtt.mqtt.pubcomp import PubcompPacket
|
|
|
|
# from amqtt.mqtt.publish import PublishPacket
|
|
|
|
# from amqtt.mqtt.pubrec import PubrecPacket
|
|
|
|
# from amqtt.mqtt.pubrel import PubrelPacket
|
|
|
|
# from amqtt.plugins.manager import PluginManager
|
2025-04-04 17:18:17 +00:00
|
|
|
# from amqtt.session import IncomingApplicationMessage, OutgoingApplicationMessage, Session
|
2024-12-29 18:23:27 +00:00
|
|
|
|
|
|
|
# formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
|
|
|
# logging.basicConfig(level=logging.DEBUG, format=formatter)
|
|
|
|
# log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
# def rand_packet_id():
|
|
|
|
# return secrets.randbelow(65536)
|
|
|
|
|
|
|
|
|
|
|
|
# def adapt(reader, writer):
|
|
|
|
# return StreamReaderAdapter(reader), StreamWriterAdapter(writer)
|
|
|
|
|
|
|
|
|
|
|
|
# class ProtocolHandlerTest(unittest.TestCase):
|
|
|
|
# def setUp(self):
|
|
|
|
# self.loop = asyncio.new_event_loop()
|
|
|
|
# asyncio.set_event_loop(self.loop)
|
|
|
|
# self.plugin_manager = PluginManager("amqtt.test.plugins", context=None)
|
|
|
|
|
|
|
|
# def tearDown(self):
|
|
|
|
# self.loop.close()
|
|
|
|
|
|
|
|
# def test_init_handler(self):
|
|
|
|
# Session()
|
|
|
|
# handler = ProtocolHandler(self.plugin_manager)
|
|
|
|
# assert handler.session is None
|
|
|
|
# assert handler._loop is self.loop
|
|
|
|
# self.check_empty_waiters(handler)
|
|
|
|
|
|
|
|
# def test_start_stop(self):
|
|
|
|
# async def server_mock(reader, writer) -> None:
|
|
|
|
# pass
|
|
|
|
|
|
|
|
# async def test_coro() -> None:
|
|
|
|
# try:
|
|
|
|
# s = Session()
|
|
|
|
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
|
|
|
|
# reader_adapted, writer_adapted = adapt(reader, writer)
|
|
|
|
# handler = ProtocolHandler(self.plugin_manager)
|
|
|
|
# handler.attach(s, reader_adapted, writer_adapted)
|
|
|
|
# await self.start_handler(handler, s)
|
|
|
|
# await self.stop_handler(handler, s)
|
|
|
|
# future.set_result(True)
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# future: asyncio.Future[Any] = asyncio.Future()
|
|
|
|
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
|
|
|
|
# server = self.loop.run_until_complete(coro)
|
|
|
|
# self.loop.run_until_complete(test_coro())
|
|
|
|
# server.close()
|
|
|
|
# self.loop.run_until_complete(server.wait_closed())
|
|
|
|
# exception = future.exception()
|
|
|
|
# if exception:
|
|
|
|
# raise exception
|
|
|
|
|
|
|
|
# def test_publish_qos0(self):
|
|
|
|
# async def server_mock(reader, writer) -> None:
|
|
|
|
# try:
|
|
|
|
# packet = await PublishPacket.from_stream(reader)
|
|
|
|
# assert packet.variable_header.topic_name == "/topic"
|
|
|
|
# assert packet.qos == QOS_0
|
|
|
|
# assert packet.packet_id is None
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# async def test_coro() -> None:
|
|
|
|
# try:
|
|
|
|
# s = Session()
|
|
|
|
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
|
|
|
|
# reader_adapted, writer_adapted = adapt(reader, writer)
|
|
|
|
# handler = ProtocolHandler(self.plugin_manager)
|
|
|
|
# handler.attach(s, reader_adapted, writer_adapted)
|
|
|
|
# await self.start_handler(handler, s)
|
|
|
|
# message = await handler.mqtt_publish(
|
|
|
|
# "/topic",
|
|
|
|
# b"test_data",
|
|
|
|
# QOS_0,
|
|
|
|
# False,
|
|
|
|
# )
|
|
|
|
# assert isinstance(message, OutgoingApplicationMessage)
|
|
|
|
# assert message.publish_packet is not None
|
|
|
|
# assert message.puback_packet is None
|
|
|
|
# assert message.pubrec_packet is None
|
|
|
|
# assert message.pubrel_packet is None
|
|
|
|
# assert message.pubcomp_packet is None
|
|
|
|
# await self.stop_handler(handler, s)
|
|
|
|
# future.set_result(True)
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# future: asyncio.Future[Any] = asyncio.Future()
|
|
|
|
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
|
|
|
|
# server = self.loop.run_until_complete(coro)
|
|
|
|
# self.loop.run_until_complete(test_coro())
|
|
|
|
# server.close()
|
|
|
|
# self.loop.run_until_complete(server.wait_closed())
|
|
|
|
# exception = future.exception()
|
|
|
|
# if exception:
|
|
|
|
# raise exception
|
|
|
|
|
|
|
|
# def test_publish_qos1(self):
|
|
|
|
# self.handler: ProtocolHandler | None = None
|
|
|
|
|
|
|
|
# async def server_mock(reader, writer) -> None:
|
|
|
|
# packet = await PublishPacket.from_stream(reader)
|
|
|
|
# try:
|
|
|
|
# assert packet.variable_header.topic_name == "/topic"
|
|
|
|
# assert packet.qos == QOS_1
|
|
|
|
# assert packet.packet_id is not None
|
|
|
|
# assert packet.packet_id in self.session.inflight_out
|
|
|
|
# assert self.handler is not None
|
|
|
|
# assert packet.packet_id in self.handler._puback_waiters
|
|
|
|
# puback = PubackPacket.build(packet.packet_id)
|
|
|
|
# await puback.to_stream(writer)
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# async def test_coro() -> None:
|
|
|
|
# try:
|
|
|
|
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
|
|
|
|
# reader_adapted, writer_adapted = adapt(reader, writer)
|
|
|
|
# self.handler = ProtocolHandler(self.plugin_manager)
|
|
|
|
# self.handler.attach(self.session, reader_adapted, writer_adapted)
|
|
|
|
# await self.start_handler(self.handler, self.session)
|
|
|
|
# message = await self.handler.mqtt_publish(
|
|
|
|
# "/topic",
|
|
|
|
# b"test_data",
|
|
|
|
# QOS_1,
|
|
|
|
# False,
|
|
|
|
# )
|
|
|
|
# assert isinstance(message, OutgoingApplicationMessage)
|
|
|
|
# assert message.publish_packet is not None
|
|
|
|
# assert message.puback_packet is not None
|
|
|
|
# assert message.pubrec_packet is None
|
|
|
|
# assert message.pubrel_packet is None
|
|
|
|
# assert message.pubcomp_packet is None
|
|
|
|
# await self.stop_handler(self.handler, self.session)
|
|
|
|
# if not future.done():
|
|
|
|
# future.set_result(True)
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# self.handler = None
|
|
|
|
# self.session = Session()
|
|
|
|
# future: asyncio.Future[Any] = asyncio.Future()
|
|
|
|
|
|
|
|
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
|
|
|
|
# server = self.loop.run_until_complete(coro)
|
|
|
|
# self.loop.run_until_complete(test_coro())
|
|
|
|
# server.close()
|
|
|
|
# self.loop.run_until_complete(server.wait_closed())
|
|
|
|
# exception = future.exception()
|
|
|
|
# if exception:
|
|
|
|
# raise exception
|
|
|
|
|
|
|
|
# def test_publish_qos2(self):
|
|
|
|
# async def server_mock(reader, writer) -> None:
|
|
|
|
# try:
|
|
|
|
# packet = await PublishPacket.from_stream(reader)
|
|
|
|
# assert packet.topic_name == "/topic"
|
|
|
|
# assert packet.qos == QOS_2
|
|
|
|
# assert packet.packet_id is not None
|
|
|
|
# assert packet.packet_id in self.session.inflight_out
|
|
|
|
# assert self.handler is not None
|
|
|
|
# assert packet.packet_id in self.handler._pubrec_waiters
|
|
|
|
# pubrec = PubrecPacket.build(packet.packet_id)
|
|
|
|
# await pubrec.to_stream(writer)
|
|
|
|
|
|
|
|
# await PubrelPacket.from_stream(reader)
|
|
|
|
# assert packet.packet_id in self.handler._pubcomp_waiters
|
|
|
|
# pubcomp = PubcompPacket.build(packet.packet_id)
|
|
|
|
# await pubcomp.to_stream(writer)
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# async def test_coro() -> None:
|
|
|
|
# try:
|
|
|
|
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
|
|
|
|
# reader_adapted, writer_adapted = adapt(reader, writer)
|
|
|
|
# self.handler = ProtocolHandler(self.plugin_manager)
|
|
|
|
# self.handler.attach(self.session, reader_adapted, writer_adapted)
|
|
|
|
# await self.start_handler(self.handler, self.session)
|
|
|
|
# message = await self.handler.mqtt_publish(
|
|
|
|
# "/topic",
|
|
|
|
# b"test_data",
|
|
|
|
# QOS_2,
|
|
|
|
# False,
|
|
|
|
# )
|
|
|
|
# assert isinstance(message, OutgoingApplicationMessage)
|
|
|
|
# assert message.publish_packet is not None
|
|
|
|
# assert message.puback_packet is None
|
|
|
|
# assert message.pubrec_packet is not None
|
|
|
|
# assert message.pubrel_packet is not None
|
|
|
|
# assert message.pubcomp_packet is not None
|
|
|
|
# await self.stop_handler(self.handler, self.session)
|
|
|
|
# if not future.done():
|
|
|
|
# future.set_result(True)
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# self.handler = None
|
|
|
|
# self.session = Session()
|
|
|
|
# future: asyncio.Future[Any] = asyncio.Future()
|
|
|
|
|
|
|
|
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
|
|
|
|
# server = self.loop.run_until_complete(coro)
|
|
|
|
# self.loop.run_until_complete(test_coro())
|
|
|
|
# server.close()
|
|
|
|
# self.loop.run_until_complete(server.wait_closed())
|
|
|
|
# exception = future.exception()
|
|
|
|
# if exception:
|
|
|
|
# raise exception
|
|
|
|
|
|
|
|
# def test_receive_qos0(self):
|
|
|
|
# async def server_mock(reader, writer) -> None:
|
|
|
|
# packet = PublishPacket.build(
|
|
|
|
# "/topic",
|
|
|
|
# b"test_data",
|
|
|
|
# rand_packet_id(),
|
|
|
|
# False,
|
|
|
|
# QOS_0,
|
|
|
|
# False,
|
|
|
|
# )
|
|
|
|
# await packet.to_stream(writer)
|
|
|
|
|
|
|
|
# async def test_coro() -> None:
|
|
|
|
# try:
|
|
|
|
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
|
|
|
|
# reader_adapted, writer_adapted = adapt(reader, writer)
|
|
|
|
# self.handler = ProtocolHandler(self.plugin_manager)
|
|
|
|
# self.handler.attach(self.session, reader_adapted, writer_adapted)
|
|
|
|
# await self.start_handler(self.handler, self.session)
|
|
|
|
# message = await self.handler.mqtt_deliver_next_message()
|
|
|
|
# assert isinstance(message, IncomingApplicationMessage)
|
|
|
|
# assert message.publish_packet is not None
|
|
|
|
# assert message.puback_packet is None
|
|
|
|
# assert message.pubrec_packet is None
|
|
|
|
# assert message.pubrel_packet is None
|
|
|
|
# assert message.pubcomp_packet is None
|
|
|
|
# await self.stop_handler(self.handler, self.session)
|
|
|
|
# future.set_result(True)
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# self.handler = None
|
|
|
|
# self.session = Session()
|
|
|
|
# future: asyncio.Future[Any] = asyncio.Future()
|
|
|
|
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
|
|
|
|
# server = self.loop.run_until_complete(coro)
|
|
|
|
# self.loop.run_until_complete(test_coro())
|
|
|
|
# server.close()
|
|
|
|
# self.loop.run_until_complete(server.wait_closed())
|
|
|
|
# exception = future.exception()
|
|
|
|
# if exception:
|
|
|
|
# raise exception
|
|
|
|
|
|
|
|
# def test_receive_qos1(self):
|
|
|
|
# async def server_mock(reader, writer) -> None:
|
|
|
|
# try:
|
|
|
|
# packet = PublishPacket.build(
|
|
|
|
# "/topic",
|
|
|
|
# b"test_data",
|
|
|
|
# rand_packet_id(),
|
|
|
|
# False,
|
|
|
|
# QOS_1,
|
|
|
|
# False,
|
|
|
|
# )
|
|
|
|
# await packet.to_stream(writer)
|
|
|
|
# puback = await PubackPacket.from_stream(reader)
|
|
|
|
# assert puback is not None
|
|
|
|
# assert packet.packet_id == puback.packet_id
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# async def test_coro() -> None:
|
|
|
|
# try:
|
|
|
|
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
|
|
|
|
# reader_adapted, writer_adapted = adapt(reader, writer)
|
|
|
|
# self.handler = ProtocolHandler(self.plugin_manager)
|
|
|
|
# self.handler.attach(self.session, reader_adapted, writer_adapted)
|
|
|
|
# await self.start_handler(self.handler, self.session)
|
|
|
|
# message = await self.handler.mqtt_deliver_next_message()
|
|
|
|
# assert isinstance(message, IncomingApplicationMessage)
|
|
|
|
# assert message.publish_packet is not None
|
|
|
|
# assert message.puback_packet is not None
|
|
|
|
# assert message.pubrec_packet is None
|
|
|
|
# assert message.pubrel_packet is None
|
|
|
|
# assert message.pubcomp_packet is None
|
|
|
|
# await self.stop_handler(self.handler, self.session)
|
|
|
|
# future.set_result(True)
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# self.handler = None
|
|
|
|
# self.session = Session()
|
|
|
|
# future: asyncio.Future[Any] = asyncio.Future()
|
|
|
|
# self.event = asyncio.Event()
|
|
|
|
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
|
|
|
|
# server = self.loop.run_until_complete(coro)
|
|
|
|
# self.loop.run_until_complete(test_coro())
|
|
|
|
# server.close()
|
|
|
|
# self.loop.run_until_complete(server.wait_closed())
|
|
|
|
# exception = future.exception()
|
|
|
|
# if exception:
|
|
|
|
# raise exception
|
|
|
|
|
|
|
|
# def test_receive_qos2(self):
|
|
|
|
# async def server_mock(reader, writer) -> None:
|
|
|
|
# try:
|
|
|
|
# packet = PublishPacket.build(
|
|
|
|
# "/topic",
|
|
|
|
# b"test_data",
|
|
|
|
# rand_packet_id(),
|
|
|
|
# False,
|
|
|
|
# QOS_2,
|
|
|
|
# False,
|
|
|
|
# )
|
|
|
|
# await packet.to_stream(writer)
|
|
|
|
# pubrec = await PubrecPacket.from_stream(reader)
|
|
|
|
# assert pubrec is not None
|
|
|
|
# assert packet.packet_id == pubrec.packet_id
|
|
|
|
# assert self.handler is not None
|
|
|
|
# assert packet.packet_id in self.handler._pubrel_waiters
|
|
|
|
# pubrel = PubrelPacket.build(packet.packet_id)
|
|
|
|
# await pubrel.to_stream(writer)
|
|
|
|
# pubcomp = await PubcompPacket.from_stream(reader)
|
|
|
|
# assert pubcomp is not None
|
|
|
|
# assert packet.packet_id == pubcomp.packet_id
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# async def test_coro() -> None:
|
|
|
|
# try:
|
|
|
|
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
|
|
|
|
# reader_adapted, writer_adapted = adapt(reader, writer)
|
|
|
|
# self.handler = ProtocolHandler(self.plugin_manager)
|
|
|
|
# self.handler.attach(self.session, reader_adapted, writer_adapted)
|
|
|
|
# await self.start_handler(self.handler, self.session)
|
|
|
|
# message = await self.handler.mqtt_deliver_next_message()
|
|
|
|
# assert isinstance(message, IncomingApplicationMessage)
|
|
|
|
# assert message.publish_packet is not None
|
|
|
|
# assert message.puback_packet is None
|
|
|
|
# assert message.pubrec_packet is not None
|
|
|
|
# assert message.pubrel_packet is not None
|
|
|
|
# assert message.pubcomp_packet is not None
|
|
|
|
# await self.stop_handler(self.handler, self.session)
|
|
|
|
# future.set_result(True)
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# self.handler = None
|
|
|
|
# self.session = Session()
|
|
|
|
# future: asyncio.Future[Any] = asyncio.Future()
|
|
|
|
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
|
|
|
|
# server = self.loop.run_until_complete(coro)
|
|
|
|
# self.loop.run_until_complete(test_coro())
|
|
|
|
# server.close()
|
|
|
|
# self.loop.run_until_complete(server.wait_closed())
|
|
|
|
# exception = future.exception()
|
|
|
|
# if exception:
|
|
|
|
# raise exception
|
|
|
|
|
|
|
|
# async def start_handler(self, handler, session):
|
|
|
|
# self.check_empty_waiters(handler)
|
|
|
|
# self.check_no_message(session)
|
|
|
|
# await handler.start()
|
|
|
|
# assert handler._reader_ready
|
|
|
|
|
|
|
|
# async def stop_handler(self, handler, session):
|
|
|
|
# await handler.stop()
|
|
|
|
# assert handler._reader_stopped
|
|
|
|
# self.check_empty_waiters(handler)
|
|
|
|
# self.check_no_message(session)
|
|
|
|
|
|
|
|
# def check_empty_waiters(self, handler):
|
|
|
|
# assert not handler._puback_waiters
|
|
|
|
# assert not handler._pubrec_waiters
|
|
|
|
# assert not handler._pubrel_waiters
|
|
|
|
# assert not handler._pubcomp_waiters
|
|
|
|
|
|
|
|
# def check_no_message(self, session):
|
|
|
|
# assert not session.inflight_out
|
|
|
|
# assert not session.inflight_in
|
|
|
|
|
|
|
|
# def test_publish_qos1_retry(self):
|
|
|
|
# async def server_mock(reader, writer) -> None:
|
|
|
|
# packet = await PublishPacket.from_stream(reader)
|
|
|
|
# try:
|
|
|
|
# assert packet.topic_name == "/topic"
|
|
|
|
# assert packet.qos == QOS_1
|
|
|
|
# assert packet.packet_id is not None
|
|
|
|
# assert packet.packet_id in self.session.inflight_out
|
|
|
|
# assert self.handler is not None
|
|
|
|
# assert packet.packet_id in self.handler._puback_waiters
|
|
|
|
# puback = PubackPacket.build(packet.packet_id)
|
|
|
|
# await puback.to_stream(writer)
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# async def test_coro() -> None:
|
|
|
|
# try:
|
|
|
|
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
|
|
|
|
# reader_adapted, writer_adapted = adapt(reader, writer)
|
|
|
|
# self.handler = ProtocolHandler(self.plugin_manager)
|
|
|
|
# self.handler.attach(self.session, reader_adapted, writer_adapted)
|
|
|
|
# await self.handler.start()
|
|
|
|
# await self.stop_handler(self.handler, self.session)
|
|
|
|
# if not future.done():
|
|
|
|
# future.set_result(True)
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# self.handler = None
|
|
|
|
# self.session = Session()
|
|
|
|
# message = OutgoingApplicationMessage(1, "/topic", QOS_1, b"test_data", False)
|
|
|
|
# message.publish_packet = PublishPacket.build(
|
|
|
|
# "/topic",
|
|
|
|
# b"test_data",
|
|
|
|
# rand_packet_id(),
|
|
|
|
# False,
|
|
|
|
# QOS_1,
|
|
|
|
# False,
|
|
|
|
# )
|
|
|
|
# self.session.inflight_out[1] = message
|
|
|
|
# future: asyncio.Future[Any] = asyncio.Future()
|
|
|
|
|
|
|
|
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
|
|
|
|
# server = self.loop.run_until_complete(coro)
|
|
|
|
# self.loop.run_until_complete(test_coro())
|
|
|
|
# server.close()
|
|
|
|
# self.loop.run_until_complete(server.wait_closed())
|
|
|
|
# exception = future.exception()
|
|
|
|
# if exception:
|
|
|
|
# raise exception
|
|
|
|
|
|
|
|
# def test_publish_qos2_retry(self):
|
|
|
|
# async def server_mock(reader, writer) -> None:
|
|
|
|
# try:
|
|
|
|
# packet = await PublishPacket.from_stream(reader)
|
|
|
|
# assert packet.topic_name == "/topic"
|
|
|
|
# assert packet.qos == QOS_2
|
|
|
|
# assert packet.packet_id is not None
|
|
|
|
# assert packet.packet_id in self.session.inflight_out
|
|
|
|
# assert self.handler is not None
|
|
|
|
# assert packet.packet_id in self.handler._pubrec_waiters
|
|
|
|
# pubrec = PubrecPacket.build(packet.packet_id)
|
|
|
|
# await pubrec.to_stream(writer)
|
|
|
|
|
|
|
|
# await PubrelPacket.from_stream(reader)
|
|
|
|
# assert packet.packet_id in self.handler._pubcomp_waiters
|
|
|
|
# pubcomp = PubcompPacket.build(packet.packet_id)
|
|
|
|
# await pubcomp.to_stream(writer)
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# async def test_coro() -> None:
|
|
|
|
# try:
|
|
|
|
# reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
|
|
|
|
# reader_adapted, writer_adapted = adapt(reader, writer)
|
|
|
|
# self.handler = ProtocolHandler(self.plugin_manager)
|
|
|
|
# self.handler.attach(self.session, reader_adapted, writer_adapted)
|
|
|
|
# await self.handler.start()
|
|
|
|
# await self.stop_handler(self.handler, self.session)
|
|
|
|
# if not future.done():
|
|
|
|
# future.set_result(True)
|
|
|
|
# except Exception as ae:
|
|
|
|
# future.set_exception(ae)
|
|
|
|
|
|
|
|
# self.handler = None
|
|
|
|
# self.session = Session()
|
|
|
|
# message = OutgoingApplicationMessage(1, "/topic", QOS_2, b"test_data", False)
|
|
|
|
# message.publish_packet = PublishPacket.build(
|
|
|
|
# "/topic",
|
|
|
|
# b"test_data",
|
|
|
|
# rand_packet_id(),
|
|
|
|
# False,
|
|
|
|
# QOS_2,
|
|
|
|
# False,
|
|
|
|
# )
|
|
|
|
# self.session.inflight_out[1] = message
|
|
|
|
# future: asyncio.Future[Any] = asyncio.Future()
|
|
|
|
|
|
|
|
# coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
|
|
|
|
# server = self.loop.run_until_complete(coro)
|
|
|
|
# self.loop.run_until_complete(test_coro())
|
|
|
|
# server.close()
|
|
|
|
# self.loop.run_until_complete(server.wait_closed())
|
|
|
|
# exception = future.exception()
|
|
|
|
# if exception:
|
|
|
|
# raise exception
|