kopia lustrzana https://github.com/Yakifo/amqtt
Receive messages FIX
rodzic
4b7be2b879
commit
417bb01148
|
@ -212,6 +212,7 @@ class ProtocolHandler:
|
||||||
del self.session.inflight_out[app_message.packet_id]
|
del self.session.inflight_out[app_message.packet_id]
|
||||||
elif isinstance(app_message, IncomingApplicationMessage):
|
elif isinstance(app_message, IncomingApplicationMessage):
|
||||||
# Initiate delivery
|
# Initiate delivery
|
||||||
|
self.logger.debug("Add message to delivery")
|
||||||
yield from self.session.delivered_message_queue.put(app_message)
|
yield from self.session.delivered_message_queue.put(app_message)
|
||||||
# Send PUBACK
|
# Send PUBACK
|
||||||
puback = PubackPacket.build(app_message.packet_id)
|
puback = PubackPacket.build(app_message.packet_id)
|
||||||
|
@ -388,6 +389,7 @@ class ProtocolHandler:
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def mqtt_deliver_next_message(self):
|
def mqtt_deliver_next_message(self):
|
||||||
message = yield from self.session.delivered_message_queue.get()
|
message = yield from self.session.delivered_message_queue.get()
|
||||||
|
self.logger.debug("Delivering message %r" % message)
|
||||||
return message
|
return message
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
|
@ -474,13 +476,13 @@ class ProtocolHandler:
|
||||||
self.logger.warning("Received PUBCOMP for unknown pending message with Id: %s" % packet_id)
|
self.logger.warning("Received PUBCOMP for unknown pending message with Id: %s" % packet_id)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def handle_pubrel(self, pubrel: PubrecPacket):
|
def handle_pubrel(self, pubrel: PubrelPacket):
|
||||||
packet_id = pubrel.packet_id
|
packet_id = pubrel.packet_id
|
||||||
try:
|
try:
|
||||||
inflight_message = self.session.inflight_in[packet_id]
|
waiter = self._pubrel_waiters[packet_id]
|
||||||
inflight_message.received_pubrel()
|
waiter.set_result(pubrel)
|
||||||
except KeyError as ke:
|
except KeyError as ke:
|
||||||
self.logger.warning("Received PUBREL for unknown pending subscription with Id: %s" % packet_id)
|
self.logger.warning("Received PUBREL for unknown pending message with Id: %s" % packet_id)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def handle_publish(self, publish_packet: PublishPacket):
|
def handle_publish(self, publish_packet: PublishPacket):
|
||||||
|
|
1
setup.py
1
setup.py
|
@ -36,6 +36,7 @@ setup(
|
||||||
'hbmqtt.test.plugins': [
|
'hbmqtt.test.plugins': [
|
||||||
'test_plugin = tests.plugins.test_manager:TestPlugin',
|
'test_plugin = tests.plugins.test_manager:TestPlugin',
|
||||||
'event_plugin = tests.plugins.test_manager:EventTestPlugin',
|
'event_plugin = tests.plugins.test_manager:EventTestPlugin',
|
||||||
|
'packet_logger_plugin = hbmqtt.plugins.logging:PacketLoggerPlugin',
|
||||||
],
|
],
|
||||||
'hbmqtt.broker.plugins': [
|
'hbmqtt.broker.plugins': [
|
||||||
# 'event_logger_plugin = hbmqtt.plugins.logging:EventLoggerPlugin',
|
# 'event_logger_plugin = hbmqtt.plugins.logging:EventLoggerPlugin',
|
||||||
|
|
|
@ -54,7 +54,7 @@ class ProtocolHandlerTest(unittest.TestCase):
|
||||||
yield from self.start_handler(handler, s)
|
yield from self.start_handler(handler, s)
|
||||||
yield from self.stop_handler(handler, s)
|
yield from self.stop_handler(handler, s)
|
||||||
future.set_result(True)
|
future.set_result(True)
|
||||||
except AssertionError as ae:
|
except Exception as ae:
|
||||||
future.set_exception(ae)
|
future.set_exception(ae)
|
||||||
|
|
||||||
future = asyncio.Future(loop=self.loop)
|
future = asyncio.Future(loop=self.loop)
|
||||||
|
@ -74,7 +74,7 @@ class ProtocolHandlerTest(unittest.TestCase):
|
||||||
self.assertEquals(packet.topic_name, '/topic')
|
self.assertEquals(packet.topic_name, '/topic')
|
||||||
self.assertEquals(packet.qos, QOS_0)
|
self.assertEquals(packet.qos, QOS_0)
|
||||||
self.assertIsNone(packet.packet_id)
|
self.assertIsNone(packet.packet_id)
|
||||||
except AssertionError as ae:
|
except Exception as ae:
|
||||||
future.set_exception(ae)
|
future.set_exception(ae)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
|
@ -94,7 +94,7 @@ class ProtocolHandlerTest(unittest.TestCase):
|
||||||
self.assertIsNone(message.pubcomp_packet)
|
self.assertIsNone(message.pubcomp_packet)
|
||||||
yield from self.stop_handler(handler, s)
|
yield from self.stop_handler(handler, s)
|
||||||
future.set_result(True)
|
future.set_result(True)
|
||||||
except AssertionError as ae:
|
except Exception as ae:
|
||||||
future.set_exception(ae)
|
future.set_exception(ae)
|
||||||
|
|
||||||
future = asyncio.Future(loop=self.loop)
|
future = asyncio.Future(loop=self.loop)
|
||||||
|
@ -116,7 +116,7 @@ class ProtocolHandlerTest(unittest.TestCase):
|
||||||
self.assertIsNotNone(packet.packet_id)
|
self.assertIsNotNone(packet.packet_id)
|
||||||
self.assertIn(packet.packet_id, self.session.inflight_out)
|
self.assertIn(packet.packet_id, self.session.inflight_out)
|
||||||
self.assertIn(packet.packet_id, self.handler._puback_waiters)
|
self.assertIn(packet.packet_id, self.handler._puback_waiters)
|
||||||
except AssertionError as ae:
|
except Exception as ae:
|
||||||
future.set_exception(ae)
|
future.set_exception(ae)
|
||||||
puback = PubackPacket.build(packet.packet_id)
|
puback = PubackPacket.build(packet.packet_id)
|
||||||
yield from puback.to_stream(writer)
|
yield from puback.to_stream(writer)
|
||||||
|
@ -138,7 +138,7 @@ class ProtocolHandlerTest(unittest.TestCase):
|
||||||
yield from self.stop_handler(self.handler, self.session)
|
yield from self.stop_handler(self.handler, self.session)
|
||||||
if not future.done():
|
if not future.done():
|
||||||
future.set_result(True)
|
future.set_result(True)
|
||||||
except AssertionError as ae:
|
except Exception as ae:
|
||||||
future.set_exception(ae)
|
future.set_exception(ae)
|
||||||
self.handler = None
|
self.handler = None
|
||||||
self.session = Session()
|
self.session = Session()
|
||||||
|
@ -171,7 +171,7 @@ class ProtocolHandlerTest(unittest.TestCase):
|
||||||
self.assertIn(packet.packet_id, self.handler._pubcomp_waiters)
|
self.assertIn(packet.packet_id, self.handler._pubcomp_waiters)
|
||||||
pubcomp = PubcompPacket.build(packet.packet_id)
|
pubcomp = PubcompPacket.build(packet.packet_id)
|
||||||
yield from pubcomp.to_stream(writer)
|
yield from pubcomp.to_stream(writer)
|
||||||
except AssertionError as ae:
|
except Exception as ae:
|
||||||
future.set_exception(ae)
|
future.set_exception(ae)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
|
@ -191,7 +191,7 @@ class ProtocolHandlerTest(unittest.TestCase):
|
||||||
yield from self.stop_handler(self.handler, self.session)
|
yield from self.stop_handler(self.handler, self.session)
|
||||||
if not future.done():
|
if not future.done():
|
||||||
future.set_result(True)
|
future.set_result(True)
|
||||||
except AssertionError as ae:
|
except Exception as ae:
|
||||||
future.set_exception(ae)
|
future.set_exception(ae)
|
||||||
self.handler = None
|
self.handler = None
|
||||||
self.session = Session()
|
self.session = Session()
|
||||||
|
@ -215,19 +215,20 @@ class ProtocolHandlerTest(unittest.TestCase):
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def test_coro():
|
def test_coro():
|
||||||
try:
|
try:
|
||||||
s = Session()
|
|
||||||
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
|
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
|
||||||
s.reader, s.writer = adapt(reader, writer)
|
self.session.reader, self.session.writer = adapt(reader, writer)
|
||||||
handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop)
|
self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop)
|
||||||
yield from self.start_handler(handler, s)
|
yield from self.start_handler(self.handler, self.session)
|
||||||
message = yield from handler.mqtt_deliver_next_message()
|
message = yield from self.handler.mqtt_deliver_next_message()
|
||||||
self.assertIsInstance(message, IncomingApplicationMessage)
|
self.assertIsInstance(message, IncomingApplicationMessage)
|
||||||
self.assertIsNotNone(message.publish_packet)
|
self.assertIsNotNone(message.publish_packet)
|
||||||
yield from self.stop_handler(handler, s)
|
yield from self.stop_handler(self.handler, self.session)
|
||||||
future.set_result(True)
|
future.set_result(True)
|
||||||
except AssertionError as ae:
|
except Exception as ae:
|
||||||
future.set_exception(ae)
|
future.set_exception(ae)
|
||||||
|
|
||||||
|
self.handler = None
|
||||||
|
self.session = Session()
|
||||||
future = asyncio.Future(loop=self.loop)
|
future = asyncio.Future(loop=self.loop)
|
||||||
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
|
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
|
||||||
server = self.loop.run_until_complete(coro)
|
server = self.loop.run_until_complete(coro)
|
||||||
|
@ -241,31 +242,86 @@ class ProtocolHandlerTest(unittest.TestCase):
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def server_mock(reader, writer):
|
def server_mock(reader, writer):
|
||||||
try:
|
try:
|
||||||
|
yield from self.event.wait()
|
||||||
|
#self.event.clear()
|
||||||
packet = PublishPacket.build('/topic', b'test_data', 1, False, QOS_1, False)
|
packet = PublishPacket.build('/topic', b'test_data', 1, False, QOS_1, False)
|
||||||
yield from packet.to_stream(writer)
|
yield from packet.to_stream(writer)
|
||||||
puback = yield from PubackPacket.from_stream(reader)
|
puback = yield from PubackPacket.from_stream(reader)
|
||||||
self.assertIsNotNone(puback)
|
self.assertIsNotNone(puback)
|
||||||
self.assertEqual(packet.packet_id, puback.packet_id)
|
self.assertEqual(packet.packet_id, puback.packet_id)
|
||||||
writer.close()
|
self.assertEquals(self.session.delivered_message_queue.qsize(), 1)
|
||||||
except AssertionError as ae:
|
#yield from self.event.wait()
|
||||||
|
#writer.close()
|
||||||
|
except Exception as ae:
|
||||||
future.set_exception(ae)
|
future.set_exception(ae)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def test_coro():
|
def test_coro():
|
||||||
try:
|
try:
|
||||||
s = Session()
|
|
||||||
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
|
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
|
||||||
s.reader, s.writer = adapt(reader, writer)
|
self.session.reader, self.session.writer = adapt(reader, writer)
|
||||||
handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop)
|
self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop)
|
||||||
yield from self.start_handler(handler, s)
|
yield from self.start_handler(self.handler, self.session)
|
||||||
message = yield from handler.mqtt_deliver_next_message()
|
self.event.set()
|
||||||
|
message = yield from self.handler.mqtt_deliver_next_message()
|
||||||
self.assertIsInstance(message, IncomingApplicationMessage)
|
self.assertIsInstance(message, IncomingApplicationMessage)
|
||||||
self.assertIsNotNone(message.publish_packet)
|
self.assertIsNotNone(message.publish_packet)
|
||||||
yield from self.stop_handler(handler, s)
|
self.assertIsNotNone(message.puback_packet)
|
||||||
|
yield from self.stop_handler(self.handler, self.session)
|
||||||
future.set_result(True)
|
future.set_result(True)
|
||||||
except AssertionError as ae:
|
#self.event.set()
|
||||||
|
except Exception as ae:
|
||||||
future.set_exception(ae)
|
future.set_exception(ae)
|
||||||
|
|
||||||
|
self.handler = None
|
||||||
|
self.session = Session()
|
||||||
|
future = asyncio.Future(loop=self.loop)
|
||||||
|
self.event = asyncio.Event(loop=self.loop)
|
||||||
|
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
|
||||||
|
server = self.loop.run_until_complete(coro)
|
||||||
|
self.loop.run_until_complete(test_coro())
|
||||||
|
server.close()
|
||||||
|
self.loop.run_until_complete(server.wait_closed())
|
||||||
|
if future.exception():
|
||||||
|
raise future.exception()
|
||||||
|
|
||||||
|
def test_receive_qos2(self):
|
||||||
|
@asyncio.coroutine
|
||||||
|
def server_mock(reader, writer):
|
||||||
|
try:
|
||||||
|
packet = PublishPacket.build('/topic', b'test_data', 2, False, QOS_2, False)
|
||||||
|
yield from packet.to_stream(writer)
|
||||||
|
pubrec = yield from PubrecPacket.from_stream(reader)
|
||||||
|
self.assertIsNotNone(pubrec)
|
||||||
|
self.assertEqual(packet.packet_id, pubrec.packet_id)
|
||||||
|
self.assertIn(packet.packet_id, self.handler._pubrel_waiters)
|
||||||
|
pubrel = PubrelPacket.build(packet.packet_id)
|
||||||
|
yield from pubrel.to_stream(writer)
|
||||||
|
self.assertNotIn(packet.packet_id, self.handler._pubrel_waiters)
|
||||||
|
pubcomp = yield from PubcompPacket.from_stream(reader)
|
||||||
|
self.assertIsNotNone(pubcomp)
|
||||||
|
self.assertEqual(packet.packet_id, pubcomp.packet_id)
|
||||||
|
writer.close()
|
||||||
|
except Exception as ae:
|
||||||
|
future.set_exception(ae)
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def test_coro():
|
||||||
|
try:
|
||||||
|
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
|
||||||
|
self.session.reader, self.session.writer = adapt(reader, writer)
|
||||||
|
self.handler = ProtocolHandler(self.session, self.plugin_manager, loop=self.loop)
|
||||||
|
yield from self.start_handler(self.handler, self.session)
|
||||||
|
message = yield from self.handler.mqtt_deliver_next_message()
|
||||||
|
self.assertIsInstance(message, IncomingApplicationMessage)
|
||||||
|
self.assertIsNotNone(message.publish_packet)
|
||||||
|
yield from self.stop_handler(self.handler, self.session)
|
||||||
|
future.set_result(True)
|
||||||
|
except Exception as ae:
|
||||||
|
future.set_exception(ae)
|
||||||
|
|
||||||
|
self.handler = None
|
||||||
|
self.session = Session()
|
||||||
future = asyncio.Future(loop=self.loop)
|
future = asyncio.Future(loop=self.loop)
|
||||||
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
|
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
|
||||||
server = self.loop.run_until_complete(coro)
|
server = self.loop.run_until_complete(coro)
|
||||||
|
|
Ładowanie…
Reference in New Issue