Add publish messages reception co-routine (WIP)
pull/8/head
nico 2015-07-02 14:10:26 +02:00
rodzic b719a8283c
commit a3391ffe8a
1 zmienionych plików z 25 dodań i 0 usunięć

Wyświetl plik

@ -12,6 +12,9 @@ from hbmqtt.mqtt.disconnect import DisconnectPacket
from hbmqtt.mqtt.pingreq import PingReqPacket
from hbmqtt.mqtt.publish import PublishPacket
from hbmqtt.mqtt.pubrel import PubrelPacket
from hbmqtt.mqtt.puback import PubackPacket
from hbmqtt.mqtt.pubrec import PubrecPacket
from hbmqtt.mqtt.pubcomp import PubcompPacket
from hbmqtt.mqtt.subscribe import SubscribePacket
from hbmqtt.mqtt.unsubscribe import UnsubscribePacket
from hbmqtt.session import Session
@ -65,6 +68,7 @@ class ProtocolHandler:
self.session.local_address, self.session.local_port = self.session.writer.get_extra_info('sockname')
self.incoming_queues = dict()
self.application_messages = asyncio.Queue()
for p in PacketType:
self.incoming_queues[p] = asyncio.Queue()
self.outgoing_queue = asyncio.Queue()
@ -234,6 +238,27 @@ class ProtocolHandler:
self.logger.warn("Packet type incompatible with message QOS: %s" % me)
self.logger.debug("In-flight messages polling coro stopped")
@asyncio.coroutine
def _receive_publish_coro(self):
while self._running:
message = yield from self.incoming_queues[PacketType.PUBLISH].get()
yield self.application_messages.put(message)
message_id = message.fixed_header.packet_id
if (message.fixed_header.flags >> 1) & 0x01:
# QOS 1
yield from self.outgoing_queue.put(PubackPacket.build(message_id))
if (message.fixed_header.flags >> 1) & 0x02:
# QOS 2
yield from self.outgoing_queue.put(PubrecPacket.build(message_id))
@asyncio.coroutine
def mqtt_deliver_next_message(self):
message = yield from self.application_messages.get()
message_id = message.fixed_header.packet_id
if (message.fixed_header.flags >> 1) & 0x02:
# QOS 2
yield from self.outgoing_queue.put(PubrecPacket.build(message_id))
return message
class Subscription:
states = ['new', 'subscribed', 'acknowledged']