Store publish message flows

pull/8/head
Nicolas Jouanin 2015-07-06 21:09:34 +02:00
rodzic 1f084d5ca1
commit 039037f9c6
1 zmienionych plików z 9 dodań i 3 usunięć

Wyświetl plik

@ -31,6 +31,9 @@ class InFlightMessage:
def __init__(self, packet, qos):
self.packet = packet
self.qos = qos
self.puback = None
self.pubrec = None
self.pubcomp = None
self._init_states()
def _init_states(self):
@ -99,6 +102,7 @@ class ProtocolHandler:
waiter = futures.Future(loop=self._loop)
self._puback_waiters[packet_id] = waiter
yield from waiter
inflight_message.puback = waiter.result()
inflight_message.acknowledge()
del self._puback_waiters[packet_id]
if qos == 0x02:
@ -106,6 +110,7 @@ class ProtocolHandler:
waiter = futures.Future(loop=self._loop)
self._pubrec_waiters[packet_id] = waiter
yield from waiter
inflight_message.pubrec = waiter.result()
del self._pubrec_waiters[packet_id]
inflight_message.receive()
@ -118,6 +123,7 @@ class ProtocolHandler:
waiter = futures.Future(loop=self._loop)
self._pubcomp_waiters[packet_id] = waiter
yield from waiter
inflight_message.pubcomp = waiter.result()
del self._pubcomp_waiters[packet_id]
inflight_message.complete()
@ -250,7 +256,7 @@ class ProtocolHandler:
packet_id = puback.variable_header.packet_id
try:
waiter = self._puback_waiters[packet_id]
waiter.set_result(None)
waiter.set_result(puback)
except KeyError as ke:
self.logger.warn("Received PUBACK for unknown pending subscription with Id: %s" % packet_id)
@ -259,7 +265,7 @@ class ProtocolHandler:
packet_id = pubrec.variable_header.packet_id
try:
waiter = self._pubrec_waiters[packet_id]
waiter.set_result(None)
waiter.set_result(pubrec)
except KeyError as ke:
self.logger.warn("Received PUBREC for unknown pending subscription with Id: %s" % packet_id)
@ -268,7 +274,7 @@ class ProtocolHandler:
packet_id = pubcomp.variable_header.packet_id
try:
waiter = self._pubcomp_waiters[packet_id]
waiter.set_result(None)
waiter.set_result(pubcomp)
except KeyError as ke:
self.logger.warn("Received PUBCOMP for unknown pending subscription with Id: %s" % packet_id)