kopia lustrzana https://github.com/Yakifo/amqtt
implement QOS_2 handling
rodzic
9fe5e4e851
commit
e3ac527924
|
@ -168,7 +168,7 @@ class ProtocolHandler:
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
assert app_message.qos == QOS_1
|
assert app_message.qos == QOS_1
|
||||||
if app_message.is_acknowledged():
|
if app_message.puback_packet:
|
||||||
raise HBMQTTException("Message '%d' has already been acknowledged" % app_message.packet_id)
|
raise HBMQTTException("Message '%d' has already been acknowledged" % app_message.packet_id)
|
||||||
if isinstance(app_message, OutgoingApplicationMessage):
|
if isinstance(app_message, OutgoingApplicationMessage):
|
||||||
if app_message.packet_id not in self.session.inflight_out:
|
if app_message.packet_id not in self.session.inflight_out:
|
||||||
|
@ -190,11 +190,14 @@ class ProtocolHandler:
|
||||||
yield from waiter
|
yield from waiter
|
||||||
del self._puback_waiters[app_message.packet_id]
|
del self._puback_waiters[app_message.packet_id]
|
||||||
app_message.puback_packet = waiter.result()
|
app_message.puback_packet = waiter.result()
|
||||||
|
|
||||||
|
# Discard inflight message
|
||||||
|
del self.session.inflight_out[app_message.packet_id]
|
||||||
elif isinstance(app_message, IncomingApplicationMessage):
|
elif isinstance(app_message, IncomingApplicationMessage):
|
||||||
# Initiate delivery
|
# Initiate delivery
|
||||||
yield from self.session.delivered_message_queue.put(app_message)
|
yield from self.session.delivered_message_queue.put(app_message)
|
||||||
# Send PUBACK
|
# Send PUBACK
|
||||||
puback = app_message.build_puback_packet()
|
puback = PubackPacket.build(self.packet_id)
|
||||||
yield from self._send_packet(puback)
|
yield from self._send_packet(puback)
|
||||||
app_message.puback_packet = puback
|
app_message.puback_packet = puback
|
||||||
|
|
||||||
|
@ -210,56 +213,70 @@ class ProtocolHandler:
|
||||||
"""
|
"""
|
||||||
assert app_message.qos == QOS_2
|
assert app_message.qos == QOS_2
|
||||||
if isinstance(app_message, OutgoingApplicationMessage):
|
if isinstance(app_message, OutgoingApplicationMessage):
|
||||||
# Store message
|
if app_message.pubrel_packet and app_message.pubcomp_packet:
|
||||||
publish_packet = None
|
raise HBMQTTException("Message '%d' has already been acknowledged" % app_message.packet_id)
|
||||||
if app_message.publish_packet is not None:
|
if not app_message.pubrel_packet:
|
||||||
# This is a retry flow, no need to store just check the message exists in session
|
# Store message
|
||||||
if app_message.packet_id not in self.session.inflight_out:
|
publish_packet = None
|
||||||
raise HBMQTTException("Unknown inflight message '%d' in session" % app_message.packet_id)
|
if app_message.publish_packet is not None:
|
||||||
publish_packet = app_message.build_publish_packet()
|
# This is a retry flow, no need to store just check the message exists in session
|
||||||
else:
|
if app_message.packet_id not in self.session.inflight_out:
|
||||||
# Store message in session
|
raise HBMQTTException("Unknown inflight message '%d' in session" % app_message.packet_id)
|
||||||
self.session.inflight_out[app_message.packet_id] = app_message
|
publish_packet = app_message.build_publish_packet(dup=True)
|
||||||
publish_packet = app_message.build_publish_packet()
|
else:
|
||||||
|
# Store message in session
|
||||||
# Send PUBLISH packet
|
self.session.inflight_out[app_message.packet_id] = app_message
|
||||||
yield from self._send_packet(publish_packet)
|
publish_packet = app_message.build_publish_packet()
|
||||||
app_message.publish_packet = publish_packet
|
# Send PUBLISH packet
|
||||||
|
yield from self._send_packet(publish_packet)
|
||||||
if not app_message.pubrec_packet:
|
app_message.publish_packet = publish_packet
|
||||||
|
# Wait PUBREC
|
||||||
if app_message.packet_id in self._pubrec_waiters:
|
if app_message.packet_id in self._pubrec_waiters:
|
||||||
# PUBREC waiter already exists for this packet ID
|
# PUBREC waiter already exists for this packet ID
|
||||||
message = "Can't add PUBREC waiter, a waiter already exists for message Id '%s'" \
|
message = "Can't add PUBREC waiter, a waiter already exists for message Id '%s'" \
|
||||||
% app_message.packet_id
|
% app_message.packet_id
|
||||||
self.logger.warning(message)
|
self.logger.warning(message)
|
||||||
raise HBMQTTException(message)
|
raise HBMQTTException(message)
|
||||||
# Wait for PUBREC
|
|
||||||
waiter = asyncio.Future(loop=self._loop)
|
waiter = asyncio.Future(loop=self._loop)
|
||||||
self._pubrec_waiters[app_message.packet_id] = waiter
|
self._pubrec_waiters[app_message.packet_id] = waiter
|
||||||
yield from waiter
|
yield from waiter
|
||||||
del self._pubrec_waiters[app_message.packet_id]
|
del self._pubrec_waiters[app_message.packet_id]
|
||||||
app_message.pubrec_packet = waiter.result()
|
app_message.pubrec_packet = waiter.result()
|
||||||
|
if not app_message.pubcomp_packet:
|
||||||
if not app_message.pubrel_packet:
|
|
||||||
# Send pubrel
|
# Send pubrel
|
||||||
app_message.pubrel_packet = PubrelPacket.build(app_message.packet_id)
|
app_message.pubrel_packet = PubrelPacket.build(app_message.packet_id)
|
||||||
yield from self._send_packet(app_message.pubrel_packet)
|
yield from self._send_packet(app_message.pubrel_packet)
|
||||||
|
|
||||||
if not app_message.pubcomp_packet:
|
|
||||||
# Wait for PUBCOMP
|
# Wait for PUBCOMP
|
||||||
waiter = asyncio.Future(loop=self._loop)
|
waiter = asyncio.Future(loop=self._loop)
|
||||||
self._pubcomp_waiters[app_message.packet_id] = waiter
|
self._pubcomp_waiters[app_message.packet_id] = waiter
|
||||||
yield from waiter
|
yield from waiter
|
||||||
del self._pubcomp_waiters[app_message.packet_id]
|
del self._pubcomp_waiters[app_message.packet_id]
|
||||||
app_message.pubcomp_packet = waiter.result()
|
app_message.pubcomp_packet = waiter.result()
|
||||||
|
# Discard inflight message
|
||||||
|
del self.session.inflight_out[app_message.packet_id]
|
||||||
elif isinstance(app_message, IncomingApplicationMessage):
|
elif isinstance(app_message, IncomingApplicationMessage):
|
||||||
if app_message.publish_packet.dup_flag:
|
self.session.inflight_in[app_message.packet_id] = app_message
|
||||||
self.logger.warning("[MQTT-3.3.1-2] DUP flag must set to 0 for QOS 0 message. Message ignored: %s" %
|
# Send pubrec
|
||||||
repr(app_message.publish_packet))
|
pubrec_packet = PubrecPacket.build(app_message.packet_id)
|
||||||
else:
|
yield from self._send_packet(pubrec_packet)
|
||||||
# Assign packet_id as it's needed internally
|
app_message.pubrec_packet = pubrec_packet
|
||||||
yield from self.session.delivered_message_queue.put(app_message)
|
# Wait PUBREL
|
||||||
|
if app_message.packet_id in self._pubrel_waiters:
|
||||||
|
# PUBREL waiter already exists for this packet ID
|
||||||
|
message = "Can't add PUBREC waiter, a waiter already exists for message Id '%s'" \
|
||||||
|
% app_message.packet_id
|
||||||
|
self.logger.warning(message)
|
||||||
|
raise HBMQTTException(message)
|
||||||
|
waiter = asyncio.Future(loop=self._loop)
|
||||||
|
self._pubrel_waiters[app_message.packet_id] = waiter
|
||||||
|
yield from waiter
|
||||||
|
del self._pubrel_waiters[app_message.packet_id]
|
||||||
|
app_message.pubrel_packet = waiter.result()
|
||||||
|
#Discard message
|
||||||
|
del self.session.inflight_in[app_message.packet_id]
|
||||||
|
# Send pubcomp
|
||||||
|
pubcomp_packet = PubcompPacket.build(app_message.packet_id)
|
||||||
|
yield from self._send_packet(pubcomp_packet)
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
|
Ładowanie…
Reference in New Issue