kopia lustrzana https://github.com/Yakifo/amqtt
Add QOS_2 publish test
rodzic
484c445f12
commit
dace44a491
|
@ -11,6 +11,9 @@ from hbmqtt.adapters import StreamWriterAdapter, StreamReaderAdapter
|
||||||
from hbmqtt.mqtt.constants import *
|
from hbmqtt.mqtt.constants import *
|
||||||
from hbmqtt.mqtt.publish import PublishPacket
|
from hbmqtt.mqtt.publish import PublishPacket
|
||||||
from hbmqtt.mqtt.puback import PubackPacket
|
from hbmqtt.mqtt.puback import PubackPacket
|
||||||
|
from hbmqtt.mqtt.pubrec import PubrecPacket
|
||||||
|
from hbmqtt.mqtt.pubrel import PubrelPacket
|
||||||
|
from hbmqtt.mqtt.pubcomp import PubcompPacket
|
||||||
|
|
||||||
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
||||||
logging.basicConfig(level=logging.INFO, format=formatter)
|
logging.basicConfig(level=logging.INFO, format=formatter)
|
||||||
|
@ -149,6 +152,59 @@ class ProtocolHandlerTest(unittest.TestCase):
|
||||||
if future.exception():
|
if future.exception():
|
||||||
raise future.exception()
|
raise future.exception()
|
||||||
|
|
||||||
|
def test_publish_qos2(self):
|
||||||
|
@asyncio.coroutine
|
||||||
|
def server_mock(reader, writer):
|
||||||
|
try:
|
||||||
|
packet = yield from PublishPacket.from_stream(reader)
|
||||||
|
self.assertEquals(packet.topic_name, '/topic')
|
||||||
|
self.assertEquals(packet.qos, QOS_2)
|
||||||
|
self.assertIsNotNone(packet.packet_id)
|
||||||
|
self.assertIn(packet.packet_id, self.session.inflight_out)
|
||||||
|
|
||||||
|
self.assertIn(packet.packet_id, self.handler._pubrec_waiters)
|
||||||
|
pubrec = PubrecPacket.build(packet.packet_id)
|
||||||
|
yield from pubrec.to_stream(writer)
|
||||||
|
|
||||||
|
pubrel = yield from PubrelPacket.from_stream(reader)
|
||||||
|
|
||||||
|
self.assertIn(packet.packet_id, self.handler._pubcomp_waiters)
|
||||||
|
pubcomp = PubcompPacket.build(packet.packet_id)
|
||||||
|
yield from pubcomp.to_stream(writer)
|
||||||
|
except AssertionError as ae:
|
||||||
|
future.set_exception(ae)
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def test_coro():
|
||||||
|
try:
|
||||||
|
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
|
||||||
|
self.session.reader, self.session.writer = adapt(reader, writer)
|
||||||
|
self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop)
|
||||||
|
yield from self.start_handler(self.handler, self.session)
|
||||||
|
message = yield from self.handler.mqtt_publish('/topic', b'test_data', QOS_2, False)
|
||||||
|
self.assertIsInstance(message, OutgoingApplicationMessage)
|
||||||
|
self.assertIsNotNone(message.publish_packet)
|
||||||
|
self.assertIsNone(message.puback_packet)
|
||||||
|
self.assertIsNotNone(message.pubrec_packet)
|
||||||
|
self.assertIsNotNone(message.pubrel_packet)
|
||||||
|
self.assertIsNotNone(message.pubcomp_packet)
|
||||||
|
yield from self.stop_handler(self.handler, self.session)
|
||||||
|
if not future.done():
|
||||||
|
future.set_result(True)
|
||||||
|
except AssertionError as ae:
|
||||||
|
future.set_exception(ae)
|
||||||
|
self.handler = None
|
||||||
|
self.session = Session()
|
||||||
|
future = asyncio.Future(loop=self.loop)
|
||||||
|
|
||||||
|
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
|
||||||
|
server = self.loop.run_until_complete(coro)
|
||||||
|
self.loop.run_until_complete(test_coro())
|
||||||
|
server.close()
|
||||||
|
self.loop.run_until_complete(server.wait_closed())
|
||||||
|
if future.exception():
|
||||||
|
raise future.exception()
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def start_handler(self, handler, session):
|
def start_handler(self, handler, session):
|
||||||
yield from handler.start()
|
yield from handler.start()
|
||||||
|
|
Ładowanie…
Reference in New Issue