diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index 5701282..1d64789 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -4,6 +4,7 @@ import logging import collections +from asyncio import InvalidStateError from blinker import Signal from hbmqtt.mqtt import packet_class @@ -464,8 +465,9 @@ class ProtocolHandler: waiter = self._puback_waiters[packet_id] waiter.set_result(puback) except KeyError: - self.logger.warning("%s Received PUBACK for unknown pending message Id: '%s'" % - (self.session.client_id, packet_id)) + self.logger.warning("Received PUBACK for unknown pending message Id: '%d'" % packet_id) + except InvalidStateError: + self.logger.warning("PUBACK waiter with Id '%d' already done" % packet_id) @asyncio.coroutine def handle_pubrec(self, pubrec: PubrecPacket): @@ -474,7 +476,9 @@ class ProtocolHandler: waiter = self._pubrec_waiters[packet_id] waiter.set_result(pubrec) except KeyError: - self.logger.warning("Received PUBREC for unknown pending message with Id: %s" % packet_id) + self.logger.warning("Received PUBREC for unknown pending message with Id: %d" % packet_id) + except InvalidStateError: + self.logger.warning("PUBREC waiter with Id '%d' already done" % packet_id) @asyncio.coroutine def handle_pubcomp(self, pubcomp: PubcompPacket): @@ -483,7 +487,9 @@ class ProtocolHandler: waiter = self._pubcomp_waiters[packet_id] waiter.set_result(pubcomp) except KeyError: - self.logger.warning("Received PUBCOMP for unknown pending message with Id: %s" % packet_id) + self.logger.warning("Received PUBCOMP for unknown pending message with Id: %d" % packet_id) + except InvalidStateError: + self.logger.warning("PUBCOMP waiter with Id '%d' already done" % packet_id) @asyncio.coroutine def handle_pubrel(self, pubrel: PubrelPacket): @@ -492,7 +498,9 @@ class ProtocolHandler: waiter = self._pubrel_waiters[packet_id] waiter.set_result(pubrel) except KeyError: - self.logger.warning("Received PUBREL for unknown pending message with Id: %s" % packet_id) + self.logger.warning("Received PUBREL for unknown pending message with Id: %d" % packet_id) + except InvalidStateError: + self.logger.warning("PUBREL waiter with Id '%d' already done" % packet_id) @asyncio.coroutine def handle_publish(self, publish_packet: PublishPacket): @@ -502,59 +510,3 @@ class ProtocolHandler: incoming_message = IncomingApplicationMessage(packet_id, publish_packet.topic_name, qos, publish_packet.data, publish_packet.retain_flag) incoming_message.publish_packet = publish_packet yield from self._handle_message_flow(incoming_message) - # if qos == 0: - # if publish_packet.dup_flag: - # self.logger.warning("[MQTT-3.3.1-2] DUP flag must set to 0 for QOS 0 message. Message ignored: %s" % - # repr(publish_packet)) - # else: - # # Assign packet_id as it's needed internally - # incoming_message.publish_packet = publish_packet - # yield from self.session.delivered_message_queue.put(incoming_message) - # else: - # # Check if publish is a retry - # if packet_id in self.session.inflight_in: - # incoming_message = self.session.inflight_in[packet_id] - # else: - # incoming_message = IncomingInFlightMessage(publish_packet, - # qos, - # self.session.publish_retry_delay, - # self._loop) - # self.session.inflight_in[packet_id] = incoming_message - # incoming_message.publish() - # - # if qos == 1: - # # Initiate delivery - # yield from self.session.delivered_message_queue.put(packet_id) - # ack = yield from incoming_message.wait_acknowledge() - # if ack: - # # Send PUBACK - # puback = PubackPacket.build(packet_id) - # yield from self._send_packet(puback) - # # Discard message - # del self.session.inflight_in[packet_id] - # self.logger.debug("Discarded incoming message %d" % packet_id) - # else: - # raise HBMQTTException("Something wrong, ack is False") - # if qos == 2: - # # Send PUBREC - # pubrec = PubrecPacket.build(packet_id) - # yield from self._send_packet(pubrec) - # incoming_message.sent_pubrec() - # # Wait for pubrel - # ack = yield from incoming_message.wait_pubrel() - # if ack: - # # Initiate delivery - # yield from self.session.delivered_message_queue.put(packet_id) - # else: - # raise HBMQTTException("Something wrong, ack is False") - # ack = yield from incoming_message.wait_acknowledge() - # if ack: - # # Send PUBCOMP - # pubcomp = PubcompPacket.build(packet_id) - # yield from self._send_packet(pubcomp) - # incoming_message.sent_pubcomp() - # # Discard message - # del self.session.inflight_in[packet_id] - # self.logger.debug("Discarded incoming message %d" % packet_id) - # else: - # raise HBMQTTException("Something wrong, ack is False")