kopia lustrzana https://github.com/Yakifo/amqtt
rename structure
rodzic
c72f5e3bec
commit
238069e5d8
|
@ -203,8 +203,8 @@ class ProtocolHandler:
|
|||
:return:
|
||||
"""
|
||||
self.logger.debug("Begin messages delivery retries")
|
||||
for packet_id in self.session.inflight_out:
|
||||
message = self.session.inflight_out[packet_id]
|
||||
for packet_id in self.session.outgoing_msg:
|
||||
message = self.session.outgoing_msg[packet_id]
|
||||
if message.is_new():
|
||||
self.logger.debug("Retrying publish message Id=%d", packet_id)
|
||||
message.publish_packet.dup_flag = True
|
||||
|
@ -213,7 +213,7 @@ class ProtocolHandler:
|
|||
yield from self.outgoing_queue.put(message.publish_packet)
|
||||
message.retry_publish()
|
||||
ack = yield from message.wait_acknowledge()
|
||||
del self.session.inflight_out[packet_id]
|
||||
del self.session.outgoing_msg[packet_id]
|
||||
if message.is_received():
|
||||
self.logger.debug("Retrying pubrel message Id=%d", packet_id)
|
||||
yield from self.outgoing_queue.put(PubrelPacket.build(packet_id))
|
||||
|
@ -223,14 +223,14 @@ class ProtocolHandler:
|
|||
@asyncio.coroutine
|
||||
def mqtt_publish(self, topic, message, qos, retain):
|
||||
packet_id = self.session.next_packet_id
|
||||
if packet_id in self.session.inflight_out:
|
||||
if packet_id in self.session.outgoing_msg:
|
||||
self.logger.warn("%s A message with the same packet ID is already in flight" % self.session.client_id)
|
||||
packet = PublishPacket.build(topic, message, packet_id, False, qos, retain)
|
||||
yield from self.outgoing_queue.put(packet)
|
||||
if qos != QOS_0:
|
||||
inflight_message = OutgoingInFlightMessage(packet, qos, loop=self._loop)
|
||||
inflight_message.sent_publish()
|
||||
self.session.inflight_out[packet_id] = inflight_message
|
||||
self.session.outgoing_msg[packet_id] = inflight_message
|
||||
ack = yield from inflight_message.wait_acknowledge()
|
||||
while not ack:
|
||||
#Retry publish
|
||||
|
@ -239,7 +239,7 @@ class ProtocolHandler:
|
|||
yield from self.outgoing_queue.put(packet)
|
||||
inflight_message.retry_publish()
|
||||
ack = yield from inflight_message.wait_acknowledge()
|
||||
del self.session.inflight_out[packet_id]
|
||||
del self.session.outgoing_msg[packet_id]
|
||||
|
||||
@asyncio.coroutine
|
||||
def stop(self):
|
||||
|
@ -405,7 +405,7 @@ class ProtocolHandler:
|
|||
def handle_puback(self, puback: PubackPacket):
|
||||
packet_id = puback.variable_header.packet_id
|
||||
try:
|
||||
inflight_message = self.session.inflight_out[packet_id]
|
||||
inflight_message = self.session.outgoing_msg[packet_id]
|
||||
inflight_message.received_puback()
|
||||
except KeyError as ke:
|
||||
self.logger.warn("%s Received PUBACK for unknown pending subscription with Id: %s" %
|
||||
|
@ -415,7 +415,7 @@ class ProtocolHandler:
|
|||
def handle_pubrec(self, pubrec: PubrecPacket):
|
||||
packet_id = pubrec.variable_header.packet_id
|
||||
try:
|
||||
inflight_message = self.session.inflight_out[packet_id]
|
||||
inflight_message = self.session.outgoing_msg[packet_id]
|
||||
inflight_message.received_pubrec()
|
||||
yield from self.outgoing_queue.put(PubrelPacket.build(packet_id))
|
||||
inflight_message.sent_pubrel()
|
||||
|
@ -426,7 +426,7 @@ class ProtocolHandler:
|
|||
def handle_pubcomp(self, pubcomp: PubcompPacket):
|
||||
packet_id = pubcomp.variable_header.packet_id
|
||||
try:
|
||||
inflight_message = self.session.inflight_out[packet_id]
|
||||
inflight_message = self.session.outgoing_msg[packet_id]
|
||||
inflight_message.received_pubcomp()
|
||||
except KeyError as ke:
|
||||
self.logger.warn("Received PUBCOMP for unknown pending subscription with Id: %s" % packet_id)
|
||||
|
@ -450,11 +450,11 @@ class ProtocolHandler:
|
|||
inflight_message = IncomingInFlightMessage(publish_packet, qos)
|
||||
yield from self.delivered_message.put(inflight_message)
|
||||
else:
|
||||
if packet_id in self.session.inflight_in:
|
||||
inflight_message = self.session.inflight_in[packet_id]
|
||||
if packet_id in self.session.incoming_msg:
|
||||
inflight_message = self.session.incoming_msg[packet_id]
|
||||
else:
|
||||
inflight_message = InFlightMessage(publish_packet, qos)
|
||||
self.session.inflight_in[packet_id] = inflight_message
|
||||
self.session.incoming_msg[packet_id] = inflight_message
|
||||
inflight_message.publish()
|
||||
|
||||
if qos == 1:
|
||||
|
@ -475,4 +475,4 @@ class ProtocolHandler:
|
|||
yield from self.outgoing_queue.put(pubcomp)
|
||||
inflight_message.complete()
|
||||
yield from self.delivered_message.put(inflight_message)
|
||||
del self.session.inflight_in[packet_id]
|
||||
del self.session.incoming_msg[packet_id]
|
||||
|
|
|
@ -31,8 +31,8 @@ class Session:
|
|||
self.parent = 0
|
||||
self.handler = None
|
||||
|
||||
self.inflight_out = dict()
|
||||
self.inflight_in = dict()
|
||||
self.outgoing_msg = dict()
|
||||
self.incoming_msg = dict()
|
||||
self.retained_messages = Queue()
|
||||
|
||||
def _init_states(self):
|
||||
|
|
Ładowanie…
Reference in New Issue