kopia lustrzana https://github.com/Yakifo/amqtt
Make use of ac_timeout parameter
rodzic
c3a144c6a3
commit
6da03d9913
|
@ -162,6 +162,17 @@ class ProtocolHandler:
|
|||
|
||||
@asyncio.coroutine
|
||||
def mqtt_publish(self, topic, data, qos, retain, ack_timeout=None):
|
||||
"""
|
||||
Sends a MQTT publish message and manages messages flows.
|
||||
This methods doesn't return until the message has been acknowledged by receiver or timeout occur
|
||||
:param topic: MQTT topic to publish
|
||||
:param data: data to send on topic
|
||||
:param qos: quality of service to use for message flow. Can be QOS_0, QOS_1 or QOS_2
|
||||
:param retain: retain message flag
|
||||
:param ack_timeout: acknowledge timeout. If set, this method will return a TimeOut error if the acknowledgment
|
||||
is not completed before ack_timeout second
|
||||
:return: ApplicationMessage used during inflight operations
|
||||
"""
|
||||
if qos in (QOS_1, QOS_2):
|
||||
packet_id = self.session.next_packet_id
|
||||
if packet_id in self.session.inflight_out:
|
||||
|
@ -171,7 +182,11 @@ class ProtocolHandler:
|
|||
|
||||
message = OutgoingApplicationMessage(packet_id, topic, qos, data, retain)
|
||||
# Handle message flow
|
||||
yield from asyncio.wait_for(self._handle_message_flow(message), 60, loop=self._loop)
|
||||
if ack_timeout is not None and ack_timeout > 0:
|
||||
yield from asyncio.wait_for(self._handle_message_flow(message), ack_timeout, loop=self._loop)
|
||||
else:
|
||||
yield from self._handle_message_flow(message)
|
||||
|
||||
return message
|
||||
|
||||
@asyncio.coroutine
|
||||
|
|
Ładowanie…
Reference in New Issue