kopia lustrzana https://github.com/Yakifo/amqtt
Create InflightMessage subclass for incoming/outgoing messages
rodzic
2dc782adc0
commit
6c61d2577e
|
@ -144,6 +144,14 @@ class InFlightMessage:
|
|||
self.start_ack_timeout()
|
||||
|
||||
|
||||
class IncomingInFlightMessage(InFlightMessage):
|
||||
pass
|
||||
|
||||
|
||||
class OutgoingInFlightMessage(InFlightMessage):
|
||||
pass
|
||||
|
||||
|
||||
class ProtocolHandler:
|
||||
"""
|
||||
Class implementing the MQTT communication protocol using asyncio features
|
||||
|
@ -220,7 +228,7 @@ class ProtocolHandler:
|
|||
packet = PublishPacket.build(topic, message, packet_id, False, qos, retain)
|
||||
yield from self.outgoing_queue.put(packet)
|
||||
if qos != QOS_0:
|
||||
inflight_message = InFlightMessage(packet, qos, loop=self._loop)
|
||||
inflight_message = OutgoingInFlightMessage(packet, qos, loop=self._loop)
|
||||
inflight_message.sent_publish()
|
||||
self.session.inflight_out[packet_id] = inflight_message
|
||||
ack = yield from inflight_message.wait_acknowledge()
|
||||
|
@ -433,19 +441,19 @@ class ProtocolHandler:
|
|||
self.logger.warn("Received PUBREL for unknown pending subscription with Id: %s" % packet_id)
|
||||
|
||||
@asyncio.coroutine
|
||||
def handle_publish(self, publish : PublishPacket):
|
||||
def handle_publish(self, publish_packet: PublishPacket):
|
||||
inflight_message = None
|
||||
packet_id = publish.variable_header.packet_id
|
||||
qos = (publish.fixed_header.flags >> 1) & 0x03
|
||||
packet_id = publish_packet.variable_header.packet_id
|
||||
qos = (publish_packet.fixed_header.flags >> 1) & 0x03
|
||||
|
||||
if qos == 0:
|
||||
inflight_message = InFlightMessage(publish, qos)
|
||||
inflight_message = IncomingInFlightMessage(publish_packet, qos)
|
||||
yield from self.delivered_message.put(inflight_message)
|
||||
else:
|
||||
if packet_id in self.session.inflight_in:
|
||||
inflight_message = self.session.inflight_in[packet_id]
|
||||
else:
|
||||
inflight_message = InFlightMessage(publish, qos)
|
||||
inflight_message = InFlightMessage(publish_packet, qos)
|
||||
self.session.inflight_in[packet_id] = inflight_message
|
||||
inflight_message.publish()
|
||||
|
||||
|
|
Ładowanie…
Reference in New Issue