2015-09-12 12:47:36 +00:00
|
|
|
# Copyright (c) 2015 Nicolas JOUANIN
|
|
|
|
#
|
|
|
|
# See the file license.txt for copying permission.
|
|
|
|
import unittest
|
|
|
|
import asyncio
|
2015-09-13 20:36:22 +00:00
|
|
|
import logging
|
2015-11-01 20:55:24 +00:00
|
|
|
import random
|
2015-09-12 12:47:36 +00:00
|
|
|
from hbmqtt.plugins.manager import PluginManager
|
2021-03-14 21:16:51 +00:00
|
|
|
from hbmqtt.session import (
|
|
|
|
Session,
|
|
|
|
OutgoingApplicationMessage,
|
|
|
|
IncomingApplicationMessage,
|
|
|
|
)
|
2015-09-12 12:47:36 +00:00
|
|
|
from hbmqtt.mqtt.protocol.handler import ProtocolHandler
|
2015-09-13 20:36:22 +00:00
|
|
|
from hbmqtt.adapters import StreamWriterAdapter, StreamReaderAdapter
|
2017-08-06 22:46:15 +00:00
|
|
|
from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
|
2015-09-13 20:36:22 +00:00
|
|
|
from hbmqtt.mqtt.publish import PublishPacket
|
2015-09-14 20:31:16 +00:00
|
|
|
from hbmqtt.mqtt.puback import PubackPacket
|
2015-09-14 20:56:22 +00:00
|
|
|
from hbmqtt.mqtt.pubrec import PubrecPacket
|
|
|
|
from hbmqtt.mqtt.pubrel import PubrelPacket
|
|
|
|
from hbmqtt.mqtt.pubcomp import PubcompPacket
|
2015-09-13 20:36:22 +00:00
|
|
|
|
2021-03-14 21:16:51 +00:00
|
|
|
formatter = (
|
|
|
|
"[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
|
|
|
)
|
2015-09-15 19:47:42 +00:00
|
|
|
logging.basicConfig(level=logging.DEBUG, format=formatter)
|
2015-09-13 20:36:22 +00:00
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
2015-11-01 20:55:24 +00:00
|
|
|
def rand_packet_id():
|
|
|
|
return random.randint(0, 65535)
|
|
|
|
|
|
|
|
|
2015-09-13 20:36:22 +00:00
|
|
|
def adapt(reader, writer):
|
|
|
|
return StreamReaderAdapter(reader), StreamWriterAdapter(writer)
|
2015-09-12 12:47:36 +00:00
|
|
|
|
|
|
|
|
|
|
|
class ProtocolHandlerTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.loop = asyncio.new_event_loop()
|
2015-09-22 20:44:29 +00:00
|
|
|
asyncio.set_event_loop(self.loop)
|
2021-03-14 21:16:51 +00:00
|
|
|
self.plugin_manager = PluginManager(
|
|
|
|
"hbmqtt.test.plugins", context=None, loop=self.loop
|
|
|
|
)
|
2015-09-12 12:47:36 +00:00
|
|
|
|
2015-09-13 20:36:22 +00:00
|
|
|
def tearDown(self):
|
|
|
|
self.loop.close()
|
|
|
|
|
2015-09-12 12:47:36 +00:00
|
|
|
def test_init_handler(self):
|
2017-08-06 22:51:23 +00:00
|
|
|
Session()
|
2015-10-07 20:42:04 +00:00
|
|
|
handler = ProtocolHandler(self.plugin_manager, loop=self.loop)
|
|
|
|
self.assertIsNone(handler.session)
|
2015-09-12 12:47:36 +00:00
|
|
|
self.assertIs(handler._loop, self.loop)
|
2015-09-13 20:36:22 +00:00
|
|
|
self.check_empty_waiters(handler)
|
2015-09-12 13:28:40 +00:00
|
|
|
|
|
|
|
def test_start_stop(self):
|
2020-12-31 00:16:45 +00:00
|
|
|
async def server_mock(reader, writer):
|
2015-09-12 13:28:40 +00:00
|
|
|
pass
|
|
|
|
|
2020-12-31 00:16:45 +00:00
|
|
|
async def test_coro():
|
2015-09-14 20:08:14 +00:00
|
|
|
try:
|
|
|
|
s = Session()
|
2021-03-14 20:44:41 +00:00
|
|
|
reader, writer = await asyncio.open_connection("127.0.0.1", 8888)
|
2015-09-29 20:28:05 +00:00
|
|
|
reader_adapted, writer_adapted = adapt(reader, writer)
|
2015-10-07 20:42:04 +00:00
|
|
|
handler = ProtocolHandler(self.plugin_manager)
|
|
|
|
handler.attach(s, reader_adapted, writer_adapted)
|
2020-12-31 00:16:45 +00:00
|
|
|
await self.start_handler(handler, s)
|
|
|
|
await self.stop_handler(handler, s)
|
2015-09-14 20:08:14 +00:00
|
|
|
future.set_result(True)
|
2015-09-15 21:17:55 +00:00
|
|
|
except Exception as ae:
|
2015-09-14 20:08:14 +00:00
|
|
|
future.set_exception(ae)
|
2015-09-13 20:36:22 +00:00
|
|
|
|
2015-09-14 19:55:55 +00:00
|
|
|
future = asyncio.Future(loop=self.loop)
|
2021-03-14 20:44:41 +00:00
|
|
|
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888)
|
2015-09-13 20:36:22 +00:00
|
|
|
server = self.loop.run_until_complete(coro)
|
|
|
|
self.loop.run_until_complete(test_coro())
|
|
|
|
server.close()
|
2015-09-14 19:55:55 +00:00
|
|
|
self.loop.run_until_complete(server.wait_closed())
|
|
|
|
if future.exception():
|
|
|
|
raise future.exception()
|
2015-09-13 20:36:22 +00:00
|
|
|
|
|
|
|
def test_publish_qos0(self):
|
2020-12-31 00:16:45 +00:00
|
|
|
async def server_mock(reader, writer):
|
2015-09-14 19:18:08 +00:00
|
|
|
try:
|
2020-12-31 00:16:45 +00:00
|
|
|
packet = await PublishPacket.from_stream(reader)
|
2021-03-14 20:44:41 +00:00
|
|
|
self.assertEqual(packet.variable_header.topic_name, "/topic")
|
2017-07-26 12:01:17 +00:00
|
|
|
self.assertEqual(packet.qos, QOS_0)
|
2015-09-14 19:18:08 +00:00
|
|
|
self.assertIsNone(packet.packet_id)
|
2015-09-15 21:17:55 +00:00
|
|
|
except Exception as ae:
|
2015-09-14 19:18:08 +00:00
|
|
|
future.set_exception(ae)
|
2015-09-13 20:36:22 +00:00
|
|
|
|
2020-12-31 00:16:45 +00:00
|
|
|
async def test_coro():
|
2015-09-14 20:08:14 +00:00
|
|
|
try:
|
|
|
|
s = Session()
|
2021-03-14 20:44:41 +00:00
|
|
|
reader, writer = await asyncio.open_connection(
|
|
|
|
"127.0.0.1", 8888, loop=self.loop
|
|
|
|
)
|
2015-09-29 20:28:05 +00:00
|
|
|
reader_adapted, writer_adapted = adapt(reader, writer)
|
2015-10-07 20:42:04 +00:00
|
|
|
handler = ProtocolHandler(self.plugin_manager, loop=self.loop)
|
|
|
|
handler.attach(s, reader_adapted, writer_adapted)
|
2020-12-31 00:16:45 +00:00
|
|
|
await self.start_handler(handler, s)
|
2021-03-14 21:16:51 +00:00
|
|
|
message = await handler.mqtt_publish(
|
|
|
|
"/topic", b"test_data", QOS_0, False
|
|
|
|
)
|
2015-09-14 20:08:14 +00:00
|
|
|
self.assertIsInstance(message, OutgoingApplicationMessage)
|
|
|
|
self.assertIsNotNone(message.publish_packet)
|
|
|
|
self.assertIsNone(message.puback_packet)
|
|
|
|
self.assertIsNone(message.pubrec_packet)
|
|
|
|
self.assertIsNone(message.pubrel_packet)
|
|
|
|
self.assertIsNone(message.pubcomp_packet)
|
2020-12-31 00:16:45 +00:00
|
|
|
await self.stop_handler(handler, s)
|
2015-09-14 20:08:14 +00:00
|
|
|
future.set_result(True)
|
2015-09-15 21:17:55 +00:00
|
|
|
except Exception as ae:
|
2015-09-14 20:08:14 +00:00
|
|
|
future.set_exception(ae)
|
2015-09-13 20:36:22 +00:00
|
|
|
|
2015-09-14 19:18:08 +00:00
|
|
|
future = asyncio.Future(loop=self.loop)
|
2021-03-14 20:44:41 +00:00
|
|
|
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888, loop=self.loop)
|
2015-09-13 20:36:22 +00:00
|
|
|
server = self.loop.run_until_complete(coro)
|
|
|
|
self.loop.run_until_complete(test_coro())
|
|
|
|
server.close()
|
|
|
|
self.loop.run_until_complete(server.wait_closed())
|
2015-09-14 20:31:16 +00:00
|
|
|
if future.exception():
|
|
|
|
raise future.exception()
|
|
|
|
|
|
|
|
def test_publish_qos1(self):
|
2020-12-31 00:16:45 +00:00
|
|
|
async def server_mock(reader, writer):
|
|
|
|
packet = await PublishPacket.from_stream(reader)
|
2015-09-14 20:31:16 +00:00
|
|
|
try:
|
2021-03-14 20:44:41 +00:00
|
|
|
self.assertEqual(packet.variable_header.topic_name, "/topic")
|
2017-07-26 12:01:17 +00:00
|
|
|
self.assertEqual(packet.qos, QOS_1)
|
2015-09-14 20:31:16 +00:00
|
|
|
self.assertIsNotNone(packet.packet_id)
|
|
|
|
self.assertIn(packet.packet_id, self.session.inflight_out)
|
|
|
|
self.assertIn(packet.packet_id, self.handler._puback_waiters)
|
2015-09-23 19:18:09 +00:00
|
|
|
puback = PubackPacket.build(packet.packet_id)
|
2020-12-31 00:16:45 +00:00
|
|
|
await puback.to_stream(writer)
|
2015-09-15 21:17:55 +00:00
|
|
|
except Exception as ae:
|
2015-09-14 20:31:16 +00:00
|
|
|
future.set_exception(ae)
|
|
|
|
|
2020-12-31 00:16:45 +00:00
|
|
|
async def test_coro():
|
2015-09-14 20:31:16 +00:00
|
|
|
try:
|
2021-03-14 20:44:41 +00:00
|
|
|
reader, writer = await asyncio.open_connection(
|
|
|
|
"127.0.0.1", 8888, loop=self.loop
|
|
|
|
)
|
2015-09-29 20:28:05 +00:00
|
|
|
reader_adapted, writer_adapted = adapt(reader, writer)
|
2015-10-07 20:42:04 +00:00
|
|
|
self.handler = ProtocolHandler(self.plugin_manager, loop=self.loop)
|
|
|
|
self.handler.attach(self.session, reader_adapted, writer_adapted)
|
2020-12-31 00:16:45 +00:00
|
|
|
await self.start_handler(self.handler, self.session)
|
2021-03-14 21:16:51 +00:00
|
|
|
message = await self.handler.mqtt_publish(
|
|
|
|
"/topic", b"test_data", QOS_1, False
|
|
|
|
)
|
2015-09-14 20:31:16 +00:00
|
|
|
self.assertIsInstance(message, OutgoingApplicationMessage)
|
|
|
|
self.assertIsNotNone(message.publish_packet)
|
|
|
|
self.assertIsNotNone(message.puback_packet)
|
|
|
|
self.assertIsNone(message.pubrec_packet)
|
|
|
|
self.assertIsNone(message.pubrel_packet)
|
|
|
|
self.assertIsNone(message.pubcomp_packet)
|
2020-12-31 00:16:45 +00:00
|
|
|
await self.stop_handler(self.handler, self.session)
|
2015-09-14 20:31:16 +00:00
|
|
|
if not future.done():
|
|
|
|
future.set_result(True)
|
2015-09-15 21:17:55 +00:00
|
|
|
except Exception as ae:
|
2015-09-14 20:31:16 +00:00
|
|
|
future.set_exception(ae)
|
2021-03-14 20:44:41 +00:00
|
|
|
|
2015-09-14 20:31:16 +00:00
|
|
|
self.handler = None
|
|
|
|
self.session = Session()
|
|
|
|
future = asyncio.Future(loop=self.loop)
|
|
|
|
|
2021-03-14 20:44:41 +00:00
|
|
|
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888, loop=self.loop)
|
2015-09-14 20:31:16 +00:00
|
|
|
server = self.loop.run_until_complete(coro)
|
|
|
|
self.loop.run_until_complete(test_coro())
|
|
|
|
server.close()
|
|
|
|
self.loop.run_until_complete(server.wait_closed())
|
2015-09-14 20:56:22 +00:00
|
|
|
if future.exception():
|
|
|
|
raise future.exception()
|
|
|
|
|
|
|
|
def test_publish_qos2(self):
|
2020-12-31 00:16:45 +00:00
|
|
|
async def server_mock(reader, writer):
|
2015-09-14 20:56:22 +00:00
|
|
|
try:
|
2020-12-31 00:16:45 +00:00
|
|
|
packet = await PublishPacket.from_stream(reader)
|
2021-03-14 20:44:41 +00:00
|
|
|
self.assertEqual(packet.topic_name, "/topic")
|
2017-07-26 12:01:17 +00:00
|
|
|
self.assertEqual(packet.qos, QOS_2)
|
2015-09-14 20:56:22 +00:00
|
|
|
self.assertIsNotNone(packet.packet_id)
|
|
|
|
self.assertIn(packet.packet_id, self.session.inflight_out)
|
|
|
|
self.assertIn(packet.packet_id, self.handler._pubrec_waiters)
|
|
|
|
pubrec = PubrecPacket.build(packet.packet_id)
|
2020-12-31 00:16:45 +00:00
|
|
|
await pubrec.to_stream(writer)
|
2015-09-14 20:56:22 +00:00
|
|
|
|
2020-12-31 00:16:45 +00:00
|
|
|
await PubrelPacket.from_stream(reader)
|
2015-09-14 20:56:22 +00:00
|
|
|
self.assertIn(packet.packet_id, self.handler._pubcomp_waiters)
|
|
|
|
pubcomp = PubcompPacket.build(packet.packet_id)
|
2020-12-31 00:16:45 +00:00
|
|
|
await pubcomp.to_stream(writer)
|
2015-09-15 21:17:55 +00:00
|
|
|
except Exception as ae:
|
2015-09-14 20:56:22 +00:00
|
|
|
future.set_exception(ae)
|
|
|
|
|
2020-12-31 00:16:45 +00:00
|
|
|
async def test_coro():
|
2015-09-14 20:56:22 +00:00
|
|
|
try:
|
2021-03-14 20:44:41 +00:00
|
|
|
reader, writer = await asyncio.open_connection(
|
|
|
|
"127.0.0.1", 8888, loop=self.loop
|
|
|
|
)
|
2015-09-29 20:28:05 +00:00
|
|
|
reader_adapted, writer_adapted = adapt(reader, writer)
|
2015-10-07 20:42:04 +00:00
|
|
|
self.handler = ProtocolHandler(self.plugin_manager, loop=self.loop)
|
|
|
|
self.handler.attach(self.session, reader_adapted, writer_adapted)
|
2020-12-31 00:16:45 +00:00
|
|
|
await self.start_handler(self.handler, self.session)
|
2021-03-14 21:16:51 +00:00
|
|
|
message = await self.handler.mqtt_publish(
|
|
|
|
"/topic", b"test_data", QOS_2, False
|
|
|
|
)
|
2015-09-14 20:56:22 +00:00
|
|
|
self.assertIsInstance(message, OutgoingApplicationMessage)
|
|
|
|
self.assertIsNotNone(message.publish_packet)
|
|
|
|
self.assertIsNone(message.puback_packet)
|
|
|
|
self.assertIsNotNone(message.pubrec_packet)
|
|
|
|
self.assertIsNotNone(message.pubrel_packet)
|
|
|
|
self.assertIsNotNone(message.pubcomp_packet)
|
2020-12-31 00:16:45 +00:00
|
|
|
await self.stop_handler(self.handler, self.session)
|
2015-09-14 20:56:22 +00:00
|
|
|
if not future.done():
|
|
|
|
future.set_result(True)
|
2015-09-15 21:17:55 +00:00
|
|
|
except Exception as ae:
|
2015-09-14 20:56:22 +00:00
|
|
|
future.set_exception(ae)
|
2021-03-14 20:44:41 +00:00
|
|
|
|
2015-09-14 20:56:22 +00:00
|
|
|
self.handler = None
|
|
|
|
self.session = Session()
|
|
|
|
future = asyncio.Future(loop=self.loop)
|
|
|
|
|
2021-03-14 20:44:41 +00:00
|
|
|
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888, loop=self.loop)
|
2015-09-14 20:56:22 +00:00
|
|
|
server = self.loop.run_until_complete(coro)
|
|
|
|
self.loop.run_until_complete(test_coro())
|
|
|
|
server.close()
|
|
|
|
self.loop.run_until_complete(server.wait_closed())
|
2015-09-14 19:18:08 +00:00
|
|
|
if future.exception():
|
|
|
|
raise future.exception()
|
2015-09-13 20:36:22 +00:00
|
|
|
|
2015-09-15 19:31:07 +00:00
|
|
|
def test_receive_qos0(self):
|
2020-12-31 00:16:45 +00:00
|
|
|
async def server_mock(reader, writer):
|
2021-03-14 20:44:41 +00:00
|
|
|
packet = PublishPacket.build(
|
|
|
|
"/topic", b"test_data", rand_packet_id(), False, QOS_0, False
|
|
|
|
)
|
2020-12-31 00:16:45 +00:00
|
|
|
await packet.to_stream(writer)
|
2015-09-15 19:31:07 +00:00
|
|
|
|
2020-12-31 00:16:45 +00:00
|
|
|
async def test_coro():
|
2015-09-15 19:47:42 +00:00
|
|
|
try:
|
2021-03-14 20:44:41 +00:00
|
|
|
reader, writer = await asyncio.open_connection(
|
|
|
|
"127.0.0.1", 8888, loop=self.loop
|
|
|
|
)
|
2015-09-29 20:28:05 +00:00
|
|
|
reader_adapted, writer_adapted = adapt(reader, writer)
|
2015-10-07 20:42:04 +00:00
|
|
|
self.handler = ProtocolHandler(self.plugin_manager, loop=self.loop)
|
|
|
|
self.handler.attach(self.session, reader_adapted, writer_adapted)
|
2020-12-31 00:16:45 +00:00
|
|
|
await self.start_handler(self.handler, self.session)
|
|
|
|
message = await self.handler.mqtt_deliver_next_message()
|
2015-09-15 19:47:42 +00:00
|
|
|
self.assertIsInstance(message, IncomingApplicationMessage)
|
|
|
|
self.assertIsNotNone(message.publish_packet)
|
2015-09-23 19:18:09 +00:00
|
|
|
self.assertIsNone(message.puback_packet)
|
|
|
|
self.assertIsNone(message.pubrec_packet)
|
|
|
|
self.assertIsNone(message.pubrel_packet)
|
|
|
|
self.assertIsNone(message.pubcomp_packet)
|
2020-12-31 00:16:45 +00:00
|
|
|
await self.stop_handler(self.handler, self.session)
|
2015-09-15 19:47:42 +00:00
|
|
|
future.set_result(True)
|
2015-09-15 21:17:55 +00:00
|
|
|
except Exception as ae:
|
2015-09-15 19:47:42 +00:00
|
|
|
future.set_exception(ae)
|
|
|
|
|
2015-09-15 21:17:55 +00:00
|
|
|
self.handler = None
|
|
|
|
self.session = Session()
|
2015-09-15 19:47:42 +00:00
|
|
|
future = asyncio.Future(loop=self.loop)
|
2021-03-14 20:44:41 +00:00
|
|
|
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888, loop=self.loop)
|
2015-09-15 19:47:42 +00:00
|
|
|
server = self.loop.run_until_complete(coro)
|
|
|
|
self.loop.run_until_complete(test_coro())
|
|
|
|
server.close()
|
|
|
|
self.loop.run_until_complete(server.wait_closed())
|
|
|
|
if future.exception():
|
|
|
|
raise future.exception()
|
|
|
|
|
|
|
|
def test_receive_qos1(self):
|
2020-12-31 00:16:45 +00:00
|
|
|
async def server_mock(reader, writer):
|
2015-09-15 19:47:42 +00:00
|
|
|
try:
|
2021-03-14 20:44:41 +00:00
|
|
|
packet = PublishPacket.build(
|
|
|
|
"/topic", b"test_data", rand_packet_id(), False, QOS_1, False
|
|
|
|
)
|
2020-12-31 00:16:45 +00:00
|
|
|
await packet.to_stream(writer)
|
|
|
|
puback = await PubackPacket.from_stream(reader)
|
2015-09-15 19:47:42 +00:00
|
|
|
self.assertIsNotNone(puback)
|
|
|
|
self.assertEqual(packet.packet_id, puback.packet_id)
|
2015-09-15 21:17:55 +00:00
|
|
|
except Exception as ae:
|
2015-09-15 21:38:24 +00:00
|
|
|
print(ae)
|
2015-09-15 21:17:55 +00:00
|
|
|
future.set_exception(ae)
|
|
|
|
|
2020-12-31 00:16:45 +00:00
|
|
|
async def test_coro():
|
2015-09-15 21:17:55 +00:00
|
|
|
try:
|
2021-03-14 20:44:41 +00:00
|
|
|
reader, writer = await asyncio.open_connection(
|
|
|
|
"127.0.0.1", 8888, loop=self.loop
|
|
|
|
)
|
2015-09-29 20:28:05 +00:00
|
|
|
reader_adapted, writer_adapted = adapt(reader, writer)
|
2015-10-07 20:42:04 +00:00
|
|
|
self.handler = ProtocolHandler(self.plugin_manager, loop=self.loop)
|
|
|
|
self.handler.attach(self.session, reader_adapted, writer_adapted)
|
2020-12-31 00:16:45 +00:00
|
|
|
await self.start_handler(self.handler, self.session)
|
|
|
|
message = await self.handler.mqtt_deliver_next_message()
|
2015-09-15 21:17:55 +00:00
|
|
|
self.assertIsInstance(message, IncomingApplicationMessage)
|
|
|
|
self.assertIsNotNone(message.publish_packet)
|
|
|
|
self.assertIsNotNone(message.puback_packet)
|
2015-09-23 19:18:09 +00:00
|
|
|
self.assertIsNone(message.pubrec_packet)
|
|
|
|
self.assertIsNone(message.pubrel_packet)
|
|
|
|
self.assertIsNone(message.pubcomp_packet)
|
2020-12-31 00:16:45 +00:00
|
|
|
await self.stop_handler(self.handler, self.session)
|
2015-09-15 21:17:55 +00:00
|
|
|
future.set_result(True)
|
|
|
|
except Exception as ae:
|
|
|
|
future.set_exception(ae)
|
|
|
|
|
|
|
|
self.handler = None
|
|
|
|
self.session = Session()
|
|
|
|
future = asyncio.Future(loop=self.loop)
|
|
|
|
self.event = asyncio.Event(loop=self.loop)
|
2021-03-14 20:44:41 +00:00
|
|
|
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888, loop=self.loop)
|
2015-09-15 21:17:55 +00:00
|
|
|
server = self.loop.run_until_complete(coro)
|
|
|
|
self.loop.run_until_complete(test_coro())
|
|
|
|
server.close()
|
|
|
|
self.loop.run_until_complete(server.wait_closed())
|
|
|
|
if future.exception():
|
|
|
|
raise future.exception()
|
|
|
|
|
|
|
|
def test_receive_qos2(self):
|
2020-12-31 00:16:45 +00:00
|
|
|
async def server_mock(reader, writer):
|
2015-09-15 21:17:55 +00:00
|
|
|
try:
|
2021-03-14 20:44:41 +00:00
|
|
|
packet = PublishPacket.build(
|
|
|
|
"/topic", b"test_data", rand_packet_id(), False, QOS_2, False
|
|
|
|
)
|
2020-12-31 00:16:45 +00:00
|
|
|
await packet.to_stream(writer)
|
|
|
|
pubrec = await PubrecPacket.from_stream(reader)
|
2015-09-15 21:17:55 +00:00
|
|
|
self.assertIsNotNone(pubrec)
|
|
|
|
self.assertEqual(packet.packet_id, pubrec.packet_id)
|
2015-09-23 19:18:09 +00:00
|
|
|
self.assertIn(packet.packet_id, self.handler._pubrel_waiters)
|
2015-09-15 21:17:55 +00:00
|
|
|
pubrel = PubrelPacket.build(packet.packet_id)
|
2020-12-31 00:16:45 +00:00
|
|
|
await pubrel.to_stream(writer)
|
|
|
|
pubcomp = await PubcompPacket.from_stream(reader)
|
2015-09-15 21:17:55 +00:00
|
|
|
self.assertIsNotNone(pubcomp)
|
|
|
|
self.assertEqual(packet.packet_id, pubcomp.packet_id)
|
|
|
|
except Exception as ae:
|
2015-09-15 19:47:42 +00:00
|
|
|
future.set_exception(ae)
|
|
|
|
|
2020-12-31 00:16:45 +00:00
|
|
|
async def test_coro():
|
2015-09-15 19:31:07 +00:00
|
|
|
try:
|
2021-03-14 20:44:41 +00:00
|
|
|
reader, writer = await asyncio.open_connection(
|
|
|
|
"127.0.0.1", 8888, loop=self.loop
|
|
|
|
)
|
2015-10-16 20:13:37 +00:00
|
|
|
reader_adapted, writer_adapted = adapt(reader, writer)
|
|
|
|
self.handler = ProtocolHandler(self.plugin_manager, loop=self.loop)
|
|
|
|
self.handler.attach(self.session, reader_adapted, writer_adapted)
|
2020-12-31 00:16:45 +00:00
|
|
|
await self.start_handler(self.handler, self.session)
|
|
|
|
message = await self.handler.mqtt_deliver_next_message()
|
2015-09-15 19:31:07 +00:00
|
|
|
self.assertIsInstance(message, IncomingApplicationMessage)
|
|
|
|
self.assertIsNotNone(message.publish_packet)
|
2015-09-23 19:18:09 +00:00
|
|
|
self.assertIsNone(message.puback_packet)
|
|
|
|
self.assertIsNotNone(message.pubrec_packet)
|
|
|
|
self.assertIsNotNone(message.pubrel_packet)
|
|
|
|
self.assertIsNotNone(message.pubcomp_packet)
|
2020-12-31 00:16:45 +00:00
|
|
|
await self.stop_handler(self.handler, self.session)
|
2015-09-15 19:31:07 +00:00
|
|
|
future.set_result(True)
|
2015-09-15 21:17:55 +00:00
|
|
|
except Exception as ae:
|
2015-09-15 19:31:07 +00:00
|
|
|
future.set_exception(ae)
|
|
|
|
|
2015-09-15 21:17:55 +00:00
|
|
|
self.handler = None
|
|
|
|
self.session = Session()
|
2015-09-15 19:31:07 +00:00
|
|
|
future = asyncio.Future(loop=self.loop)
|
2021-03-14 20:44:41 +00:00
|
|
|
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888, loop=self.loop)
|
2015-09-15 19:31:07 +00:00
|
|
|
server = self.loop.run_until_complete(coro)
|
|
|
|
self.loop.run_until_complete(test_coro())
|
|
|
|
server.close()
|
|
|
|
self.loop.run_until_complete(server.wait_closed())
|
|
|
|
if future.exception():
|
|
|
|
raise future.exception()
|
|
|
|
|
2020-12-31 00:16:45 +00:00
|
|
|
async def start_handler(self, handler, session):
|
2015-09-14 20:08:14 +00:00
|
|
|
self.check_empty_waiters(handler)
|
|
|
|
self.check_no_message(session)
|
2020-12-31 00:16:45 +00:00
|
|
|
await handler.start()
|
2021-03-06 17:37:23 +00:00
|
|
|
assert handler._reader_ready
|
2015-09-13 20:36:22 +00:00
|
|
|
|
2020-12-31 00:16:45 +00:00
|
|
|
async def stop_handler(self, handler, session):
|
|
|
|
await handler.stop()
|
2021-03-06 17:37:23 +00:00
|
|
|
assert handler._reader_stopped
|
2015-09-14 20:08:14 +00:00
|
|
|
self.check_empty_waiters(handler)
|
|
|
|
self.check_no_message(session)
|
|
|
|
|
|
|
|
def check_empty_waiters(self, handler):
|
|
|
|
self.assertFalse(handler._puback_waiters)
|
|
|
|
self.assertFalse(handler._pubrec_waiters)
|
|
|
|
self.assertFalse(handler._pubrel_waiters)
|
|
|
|
self.assertFalse(handler._pubcomp_waiters)
|
|
|
|
|
|
|
|
def check_no_message(self, session):
|
|
|
|
self.assertFalse(session.inflight_out)
|
|
|
|
self.assertFalse(session.inflight_in)
|
2015-09-30 19:22:46 +00:00
|
|
|
|
|
|
|
def test_publish_qos1_retry(self):
|
2020-12-31 00:16:45 +00:00
|
|
|
async def server_mock(reader, writer):
|
|
|
|
packet = await PublishPacket.from_stream(reader)
|
2015-09-30 19:22:46 +00:00
|
|
|
try:
|
2021-03-14 20:44:41 +00:00
|
|
|
self.assertEqual(packet.topic_name, "/topic")
|
2017-07-26 12:01:17 +00:00
|
|
|
self.assertEqual(packet.qos, QOS_1)
|
2015-09-30 19:22:46 +00:00
|
|
|
self.assertIsNotNone(packet.packet_id)
|
|
|
|
self.assertIn(packet.packet_id, self.session.inflight_out)
|
|
|
|
self.assertIn(packet.packet_id, self.handler._puback_waiters)
|
|
|
|
puback = PubackPacket.build(packet.packet_id)
|
2020-12-31 00:16:45 +00:00
|
|
|
await puback.to_stream(writer)
|
2015-09-30 19:22:46 +00:00
|
|
|
except Exception as ae:
|
|
|
|
future.set_exception(ae)
|
|
|
|
|
2020-12-31 00:16:45 +00:00
|
|
|
async def test_coro():
|
2015-09-30 19:22:46 +00:00
|
|
|
try:
|
2021-03-14 20:44:41 +00:00
|
|
|
reader, writer = await asyncio.open_connection(
|
|
|
|
"127.0.0.1", 8888, loop=self.loop
|
|
|
|
)
|
2015-09-30 19:22:46 +00:00
|
|
|
reader_adapted, writer_adapted = adapt(reader, writer)
|
2015-10-07 20:42:04 +00:00
|
|
|
self.handler = ProtocolHandler(self.plugin_manager, loop=self.loop)
|
|
|
|
self.handler.attach(self.session, reader_adapted, writer_adapted)
|
2020-12-31 00:16:45 +00:00
|
|
|
await self.handler.start()
|
|
|
|
await self.stop_handler(self.handler, self.session)
|
2015-09-30 19:22:46 +00:00
|
|
|
if not future.done():
|
|
|
|
future.set_result(True)
|
|
|
|
except Exception as ae:
|
|
|
|
future.set_exception(ae)
|
2021-03-14 20:44:41 +00:00
|
|
|
|
2015-09-30 19:22:46 +00:00
|
|
|
self.handler = None
|
|
|
|
self.session = Session()
|
2021-03-14 20:44:41 +00:00
|
|
|
message = OutgoingApplicationMessage(1, "/topic", QOS_1, b"test_data", False)
|
|
|
|
message.publish_packet = PublishPacket.build(
|
|
|
|
"/topic", b"test_data", rand_packet_id(), False, QOS_1, False
|
|
|
|
)
|
2015-09-30 19:22:46 +00:00
|
|
|
self.session.inflight_out[1] = message
|
|
|
|
future = asyncio.Future(loop=self.loop)
|
|
|
|
|
2021-03-14 20:44:41 +00:00
|
|
|
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888, loop=self.loop)
|
2015-09-30 19:22:46 +00:00
|
|
|
server = self.loop.run_until_complete(coro)
|
|
|
|
self.loop.run_until_complete(test_coro())
|
|
|
|
server.close()
|
|
|
|
self.loop.run_until_complete(server.wait_closed())
|
|
|
|
if future.exception():
|
|
|
|
raise future.exception()
|
|
|
|
|
|
|
|
def test_publish_qos2_retry(self):
|
2020-12-31 00:16:45 +00:00
|
|
|
async def server_mock(reader, writer):
|
2015-09-30 19:22:46 +00:00
|
|
|
try:
|
2020-12-31 00:16:45 +00:00
|
|
|
packet = await PublishPacket.from_stream(reader)
|
2021-03-14 20:44:41 +00:00
|
|
|
self.assertEqual(packet.topic_name, "/topic")
|
2017-07-26 12:01:17 +00:00
|
|
|
self.assertEqual(packet.qos, QOS_2)
|
2015-09-30 19:22:46 +00:00
|
|
|
self.assertIsNotNone(packet.packet_id)
|
|
|
|
self.assertIn(packet.packet_id, self.session.inflight_out)
|
|
|
|
self.assertIn(packet.packet_id, self.handler._pubrec_waiters)
|
|
|
|
pubrec = PubrecPacket.build(packet.packet_id)
|
2020-12-31 00:16:45 +00:00
|
|
|
await pubrec.to_stream(writer)
|
2015-09-30 19:22:46 +00:00
|
|
|
|
2020-12-31 00:16:45 +00:00
|
|
|
await PubrelPacket.from_stream(reader)
|
2015-09-30 19:22:46 +00:00
|
|
|
self.assertIn(packet.packet_id, self.handler._pubcomp_waiters)
|
|
|
|
pubcomp = PubcompPacket.build(packet.packet_id)
|
2020-12-31 00:16:45 +00:00
|
|
|
await pubcomp.to_stream(writer)
|
2015-09-30 19:22:46 +00:00
|
|
|
except Exception as ae:
|
|
|
|
future.set_exception(ae)
|
|
|
|
|
2020-12-31 00:16:45 +00:00
|
|
|
async def test_coro():
|
2015-09-30 19:22:46 +00:00
|
|
|
try:
|
2021-03-14 20:44:41 +00:00
|
|
|
reader, writer = await asyncio.open_connection(
|
|
|
|
"127.0.0.1", 8888, loop=self.loop
|
|
|
|
)
|
2015-09-30 19:22:46 +00:00
|
|
|
reader_adapted, writer_adapted = adapt(reader, writer)
|
2015-10-07 20:42:04 +00:00
|
|
|
self.handler = ProtocolHandler(self.plugin_manager, loop=self.loop)
|
|
|
|
self.handler.attach(self.session, reader_adapted, writer_adapted)
|
2020-12-31 00:16:45 +00:00
|
|
|
await self.handler.start()
|
|
|
|
await self.stop_handler(self.handler, self.session)
|
2015-09-30 19:22:46 +00:00
|
|
|
if not future.done():
|
|
|
|
future.set_result(True)
|
|
|
|
except Exception as ae:
|
|
|
|
future.set_exception(ae)
|
2021-03-14 20:44:41 +00:00
|
|
|
|
2015-09-30 19:22:46 +00:00
|
|
|
self.handler = None
|
|
|
|
self.session = Session()
|
2021-03-14 20:44:41 +00:00
|
|
|
message = OutgoingApplicationMessage(1, "/topic", QOS_2, b"test_data", False)
|
|
|
|
message.publish_packet = PublishPacket.build(
|
|
|
|
"/topic", b"test_data", rand_packet_id(), False, QOS_2, False
|
|
|
|
)
|
2015-09-30 19:22:46 +00:00
|
|
|
self.session.inflight_out[1] = message
|
|
|
|
future = asyncio.Future(loop=self.loop)
|
|
|
|
|
2021-03-14 20:44:41 +00:00
|
|
|
coro = asyncio.start_server(server_mock, "127.0.0.1", 8888, loop=self.loop)
|
2015-09-30 19:22:46 +00:00
|
|
|
server = self.loop.run_until_complete(coro)
|
|
|
|
self.loop.run_until_complete(test_coro())
|
|
|
|
server.close()
|
|
|
|
self.loop.run_until_complete(server.wait_closed())
|
|
|
|
if future.exception():
|
|
|
|
raise future.exception()
|