From 039037f9c67c4dd85c915bc48aa3e573e3864720 Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Mon, 6 Jul 2015 21:09:34 +0200 Subject: [PATCH] Store publish message flows --- hbmqtt/mqtt/protocol.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/hbmqtt/mqtt/protocol.py b/hbmqtt/mqtt/protocol.py index 57aa168..a4ac249 100644 --- a/hbmqtt/mqtt/protocol.py +++ b/hbmqtt/mqtt/protocol.py @@ -31,6 +31,9 @@ class InFlightMessage: def __init__(self, packet, qos): self.packet = packet self.qos = qos + self.puback = None + self.pubrec = None + self.pubcomp = None self._init_states() def _init_states(self): @@ -99,6 +102,7 @@ class ProtocolHandler: waiter = futures.Future(loop=self._loop) self._puback_waiters[packet_id] = waiter yield from waiter + inflight_message.puback = waiter.result() inflight_message.acknowledge() del self._puback_waiters[packet_id] if qos == 0x02: @@ -106,6 +110,7 @@ class ProtocolHandler: waiter = futures.Future(loop=self._loop) self._pubrec_waiters[packet_id] = waiter yield from waiter + inflight_message.pubrec = waiter.result() del self._pubrec_waiters[packet_id] inflight_message.receive() @@ -118,6 +123,7 @@ class ProtocolHandler: waiter = futures.Future(loop=self._loop) self._pubcomp_waiters[packet_id] = waiter yield from waiter + inflight_message.pubcomp = waiter.result() del self._pubcomp_waiters[packet_id] inflight_message.complete() @@ -250,7 +256,7 @@ class ProtocolHandler: packet_id = puback.variable_header.packet_id try: waiter = self._puback_waiters[packet_id] - waiter.set_result(None) + waiter.set_result(puback) except KeyError as ke: self.logger.warn("Received PUBACK for unknown pending subscription with Id: %s" % packet_id) @@ -259,7 +265,7 @@ class ProtocolHandler: packet_id = pubrec.variable_header.packet_id try: waiter = self._pubrec_waiters[packet_id] - waiter.set_result(None) + waiter.set_result(pubrec) except KeyError as ke: self.logger.warn("Received PUBREC for unknown pending subscription with Id: %s" % packet_id) @@ -268,7 +274,7 @@ class ProtocolHandler: packet_id = pubcomp.variable_header.packet_id try: waiter = self._pubcomp_waiters[packet_id] - waiter.set_result(None) + waiter.set_result(pubcomp) except KeyError as ke: self.logger.warn("Received PUBCOMP for unknown pending subscription with Id: %s" % packet_id)