kopia lustrzana https://github.com/Yakifo/amqtt
Manage already done handler (shouldn't happen)
rodzic
e85838512e
commit
cecf88c2c9
|
@ -4,6 +4,7 @@
|
||||||
import logging
|
import logging
|
||||||
import collections
|
import collections
|
||||||
|
|
||||||
|
from asyncio import InvalidStateError
|
||||||
from blinker import Signal
|
from blinker import Signal
|
||||||
|
|
||||||
from hbmqtt.mqtt import packet_class
|
from hbmqtt.mqtt import packet_class
|
||||||
|
@ -464,8 +465,9 @@ class ProtocolHandler:
|
||||||
waiter = self._puback_waiters[packet_id]
|
waiter = self._puback_waiters[packet_id]
|
||||||
waiter.set_result(puback)
|
waiter.set_result(puback)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
self.logger.warning("%s Received PUBACK for unknown pending message Id: '%s'" %
|
self.logger.warning("Received PUBACK for unknown pending message Id: '%d'" % packet_id)
|
||||||
(self.session.client_id, packet_id))
|
except InvalidStateError:
|
||||||
|
self.logger.warning("PUBACK waiter with Id '%d' already done" % packet_id)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def handle_pubrec(self, pubrec: PubrecPacket):
|
def handle_pubrec(self, pubrec: PubrecPacket):
|
||||||
|
@ -474,7 +476,9 @@ class ProtocolHandler:
|
||||||
waiter = self._pubrec_waiters[packet_id]
|
waiter = self._pubrec_waiters[packet_id]
|
||||||
waiter.set_result(pubrec)
|
waiter.set_result(pubrec)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
self.logger.warning("Received PUBREC for unknown pending message with Id: %s" % packet_id)
|
self.logger.warning("Received PUBREC for unknown pending message with Id: %d" % packet_id)
|
||||||
|
except InvalidStateError:
|
||||||
|
self.logger.warning("PUBREC waiter with Id '%d' already done" % packet_id)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def handle_pubcomp(self, pubcomp: PubcompPacket):
|
def handle_pubcomp(self, pubcomp: PubcompPacket):
|
||||||
|
@ -483,7 +487,9 @@ class ProtocolHandler:
|
||||||
waiter = self._pubcomp_waiters[packet_id]
|
waiter = self._pubcomp_waiters[packet_id]
|
||||||
waiter.set_result(pubcomp)
|
waiter.set_result(pubcomp)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
self.logger.warning("Received PUBCOMP for unknown pending message with Id: %s" % packet_id)
|
self.logger.warning("Received PUBCOMP for unknown pending message with Id: %d" % packet_id)
|
||||||
|
except InvalidStateError:
|
||||||
|
self.logger.warning("PUBCOMP waiter with Id '%d' already done" % packet_id)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def handle_pubrel(self, pubrel: PubrelPacket):
|
def handle_pubrel(self, pubrel: PubrelPacket):
|
||||||
|
@ -492,7 +498,9 @@ class ProtocolHandler:
|
||||||
waiter = self._pubrel_waiters[packet_id]
|
waiter = self._pubrel_waiters[packet_id]
|
||||||
waiter.set_result(pubrel)
|
waiter.set_result(pubrel)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
self.logger.warning("Received PUBREL for unknown pending message with Id: %s" % packet_id)
|
self.logger.warning("Received PUBREL for unknown pending message with Id: %d" % packet_id)
|
||||||
|
except InvalidStateError:
|
||||||
|
self.logger.warning("PUBREL waiter with Id '%d' already done" % packet_id)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def handle_publish(self, publish_packet: PublishPacket):
|
def handle_publish(self, publish_packet: PublishPacket):
|
||||||
|
@ -502,59 +510,3 @@ class ProtocolHandler:
|
||||||
incoming_message = IncomingApplicationMessage(packet_id, publish_packet.topic_name, qos, publish_packet.data, publish_packet.retain_flag)
|
incoming_message = IncomingApplicationMessage(packet_id, publish_packet.topic_name, qos, publish_packet.data, publish_packet.retain_flag)
|
||||||
incoming_message.publish_packet = publish_packet
|
incoming_message.publish_packet = publish_packet
|
||||||
yield from self._handle_message_flow(incoming_message)
|
yield from self._handle_message_flow(incoming_message)
|
||||||
# if qos == 0:
|
|
||||||
# if publish_packet.dup_flag:
|
|
||||||
# self.logger.warning("[MQTT-3.3.1-2] DUP flag must set to 0 for QOS 0 message. Message ignored: %s" %
|
|
||||||
# repr(publish_packet))
|
|
||||||
# else:
|
|
||||||
# # Assign packet_id as it's needed internally
|
|
||||||
# incoming_message.publish_packet = publish_packet
|
|
||||||
# yield from self.session.delivered_message_queue.put(incoming_message)
|
|
||||||
# else:
|
|
||||||
# # Check if publish is a retry
|
|
||||||
# if packet_id in self.session.inflight_in:
|
|
||||||
# incoming_message = self.session.inflight_in[packet_id]
|
|
||||||
# else:
|
|
||||||
# incoming_message = IncomingInFlightMessage(publish_packet,
|
|
||||||
# qos,
|
|
||||||
# self.session.publish_retry_delay,
|
|
||||||
# self._loop)
|
|
||||||
# self.session.inflight_in[packet_id] = incoming_message
|
|
||||||
# incoming_message.publish()
|
|
||||||
#
|
|
||||||
# if qos == 1:
|
|
||||||
# # Initiate delivery
|
|
||||||
# yield from self.session.delivered_message_queue.put(packet_id)
|
|
||||||
# ack = yield from incoming_message.wait_acknowledge()
|
|
||||||
# if ack:
|
|
||||||
# # Send PUBACK
|
|
||||||
# puback = PubackPacket.build(packet_id)
|
|
||||||
# yield from self._send_packet(puback)
|
|
||||||
# # Discard message
|
|
||||||
# del self.session.inflight_in[packet_id]
|
|
||||||
# self.logger.debug("Discarded incoming message %d" % packet_id)
|
|
||||||
# else:
|
|
||||||
# raise HBMQTTException("Something wrong, ack is False")
|
|
||||||
# if qos == 2:
|
|
||||||
# # Send PUBREC
|
|
||||||
# pubrec = PubrecPacket.build(packet_id)
|
|
||||||
# yield from self._send_packet(pubrec)
|
|
||||||
# incoming_message.sent_pubrec()
|
|
||||||
# # Wait for pubrel
|
|
||||||
# ack = yield from incoming_message.wait_pubrel()
|
|
||||||
# if ack:
|
|
||||||
# # Initiate delivery
|
|
||||||
# yield from self.session.delivered_message_queue.put(packet_id)
|
|
||||||
# else:
|
|
||||||
# raise HBMQTTException("Something wrong, ack is False")
|
|
||||||
# ack = yield from incoming_message.wait_acknowledge()
|
|
||||||
# if ack:
|
|
||||||
# # Send PUBCOMP
|
|
||||||
# pubcomp = PubcompPacket.build(packet_id)
|
|
||||||
# yield from self._send_packet(pubcomp)
|
|
||||||
# incoming_message.sent_pubcomp()
|
|
||||||
# # Discard message
|
|
||||||
# del self.session.inflight_in[packet_id]
|
|
||||||
# self.logger.debug("Discarded incoming message %d" % packet_id)
|
|
||||||
# else:
|
|
||||||
# raise HBMQTTException("Something wrong, ack is False")
|
|
||||||
|
|
Ładowanie…
Reference in New Issue