From a3391ffe8a90f3a051a7dc7b12949977228db628 Mon Sep 17 00:00:00 2001 From: nico Date: Thu, 2 Jul 2015 14:10:26 +0200 Subject: [PATCH] HBMQTT-6 Add publish messages reception co-routine (WIP) --- hbmqtt/mqtt/protocol.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/hbmqtt/mqtt/protocol.py b/hbmqtt/mqtt/protocol.py index 3751929..685d9d4 100644 --- a/hbmqtt/mqtt/protocol.py +++ b/hbmqtt/mqtt/protocol.py @@ -12,6 +12,9 @@ from hbmqtt.mqtt.disconnect import DisconnectPacket from hbmqtt.mqtt.pingreq import PingReqPacket from hbmqtt.mqtt.publish import PublishPacket from hbmqtt.mqtt.pubrel import PubrelPacket +from hbmqtt.mqtt.puback import PubackPacket +from hbmqtt.mqtt.pubrec import PubrecPacket +from hbmqtt.mqtt.pubcomp import PubcompPacket from hbmqtt.mqtt.subscribe import SubscribePacket from hbmqtt.mqtt.unsubscribe import UnsubscribePacket from hbmqtt.session import Session @@ -65,6 +68,7 @@ class ProtocolHandler: self.session.local_address, self.session.local_port = self.session.writer.get_extra_info('sockname') self.incoming_queues = dict() + self.application_messages = asyncio.Queue() for p in PacketType: self.incoming_queues[p] = asyncio.Queue() self.outgoing_queue = asyncio.Queue() @@ -234,6 +238,27 @@ class ProtocolHandler: self.logger.warn("Packet type incompatible with message QOS: %s" % me) self.logger.debug("In-flight messages polling coro stopped") + @asyncio.coroutine + def _receive_publish_coro(self): + while self._running: + message = yield from self.incoming_queues[PacketType.PUBLISH].get() + yield self.application_messages.put(message) + message_id = message.fixed_header.packet_id + if (message.fixed_header.flags >> 1) & 0x01: + # QOS 1 + yield from self.outgoing_queue.put(PubackPacket.build(message_id)) + if (message.fixed_header.flags >> 1) & 0x02: + # QOS 2 + yield from self.outgoing_queue.put(PubrecPacket.build(message_id)) + + @asyncio.coroutine + def mqtt_deliver_next_message(self): + message = yield from self.application_messages.get() + message_id = message.fixed_header.packet_id + if (message.fixed_header.flags >> 1) & 0x02: + # QOS 2 + yield from self.outgoing_queue.put(PubrecPacket.build(message_id)) + return message class Subscription: states = ['new', 'subscribed', 'acknowledged']