diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index 4390481..8530759 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -168,7 +168,7 @@ class ProtocolHandler: :return: """ assert app_message.qos == QOS_1 - if app_message.is_acknowledged(): + if app_message.puback_packet: raise HBMQTTException("Message '%d' has already been acknowledged" % app_message.packet_id) if isinstance(app_message, OutgoingApplicationMessage): if app_message.packet_id not in self.session.inflight_out: @@ -190,11 +190,14 @@ class ProtocolHandler: yield from waiter del self._puback_waiters[app_message.packet_id] app_message.puback_packet = waiter.result() + + # Discard inflight message + del self.session.inflight_out[app_message.packet_id] elif isinstance(app_message, IncomingApplicationMessage): # Initiate delivery yield from self.session.delivered_message_queue.put(app_message) # Send PUBACK - puback = app_message.build_puback_packet() + puback = PubackPacket.build(self.packet_id) yield from self._send_packet(puback) app_message.puback_packet = puback @@ -210,56 +213,70 @@ class ProtocolHandler: """ assert app_message.qos == QOS_2 if isinstance(app_message, OutgoingApplicationMessage): - # Store message - publish_packet = None - if app_message.publish_packet is not None: - # This is a retry flow, no need to store just check the message exists in session - if app_message.packet_id not in self.session.inflight_out: - raise HBMQTTException("Unknown inflight message '%d' in session" % app_message.packet_id) - publish_packet = app_message.build_publish_packet() - else: - # Store message in session - self.session.inflight_out[app_message.packet_id] = app_message - publish_packet = app_message.build_publish_packet() - - # Send PUBLISH packet - yield from self._send_packet(publish_packet) - app_message.publish_packet = publish_packet - - if not app_message.pubrec_packet: + if app_message.pubrel_packet and app_message.pubcomp_packet: + raise HBMQTTException("Message '%d' has already been acknowledged" % app_message.packet_id) + if not app_message.pubrel_packet: + # Store message + publish_packet = None + if app_message.publish_packet is not None: + # This is a retry flow, no need to store just check the message exists in session + if app_message.packet_id not in self.session.inflight_out: + raise HBMQTTException("Unknown inflight message '%d' in session" % app_message.packet_id) + publish_packet = app_message.build_publish_packet(dup=True) + else: + # Store message in session + self.session.inflight_out[app_message.packet_id] = app_message + publish_packet = app_message.build_publish_packet() + # Send PUBLISH packet + yield from self._send_packet(publish_packet) + app_message.publish_packet = publish_packet + # Wait PUBREC if app_message.packet_id in self._pubrec_waiters: # PUBREC waiter already exists for this packet ID message = "Can't add PUBREC waiter, a waiter already exists for message Id '%s'" \ % app_message.packet_id self.logger.warning(message) raise HBMQTTException(message) - # Wait for PUBREC waiter = asyncio.Future(loop=self._loop) self._pubrec_waiters[app_message.packet_id] = waiter yield from waiter del self._pubrec_waiters[app_message.packet_id] app_message.pubrec_packet = waiter.result() - - if not app_message.pubrel_packet: + if not app_message.pubcomp_packet: # Send pubrel app_message.pubrel_packet = PubrelPacket.build(app_message.packet_id) yield from self._send_packet(app_message.pubrel_packet) - - if not app_message.pubcomp_packet: # Wait for PUBCOMP waiter = asyncio.Future(loop=self._loop) self._pubcomp_waiters[app_message.packet_id] = waiter yield from waiter del self._pubcomp_waiters[app_message.packet_id] app_message.pubcomp_packet = waiter.result() - + # Discard inflight message + del self.session.inflight_out[app_message.packet_id] elif isinstance(app_message, IncomingApplicationMessage): - if app_message.publish_packet.dup_flag: - self.logger.warning("[MQTT-3.3.1-2] DUP flag must set to 0 for QOS 0 message. Message ignored: %s" % - repr(app_message.publish_packet)) - else: - # Assign packet_id as it's needed internally - yield from self.session.delivered_message_queue.put(app_message) + self.session.inflight_in[app_message.packet_id] = app_message + # Send pubrec + pubrec_packet = PubrecPacket.build(app_message.packet_id) + yield from self._send_packet(pubrec_packet) + app_message.pubrec_packet = pubrec_packet + # Wait PUBREL + if app_message.packet_id in self._pubrel_waiters: + # PUBREL waiter already exists for this packet ID + message = "Can't add PUBREC waiter, a waiter already exists for message Id '%s'" \ + % app_message.packet_id + self.logger.warning(message) + raise HBMQTTException(message) + waiter = asyncio.Future(loop=self._loop) + self._pubrel_waiters[app_message.packet_id] = waiter + yield from waiter + del self._pubrel_waiters[app_message.packet_id] + app_message.pubrel_packet = waiter.result() + #Discard message + del self.session.inflight_in[app_message.packet_id] + # Send pubcomp + pubcomp_packet = PubcompPacket.build(app_message.packet_id) + yield from self._send_packet(pubcomp_packet) @asyncio.coroutine def stop(self):