From 6da03d99137b5b45bc844a09710b64c723c09caa Mon Sep 17 00:00:00 2001 From: Nico Date: Mon, 12 Oct 2015 21:48:45 +0200 Subject: [PATCH] Make use of ac_timeout parameter --- hbmqtt/mqtt/protocol/handler.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index 450e29f..4f88118 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -162,6 +162,17 @@ class ProtocolHandler: @asyncio.coroutine def mqtt_publish(self, topic, data, qos, retain, ack_timeout=None): + """ + Sends a MQTT publish message and manages messages flows. + This methods doesn't return until the message has been acknowledged by receiver or timeout occur + :param topic: MQTT topic to publish + :param data: data to send on topic + :param qos: quality of service to use for message flow. Can be QOS_0, QOS_1 or QOS_2 + :param retain: retain message flag + :param ack_timeout: acknowledge timeout. If set, this method will return a TimeOut error if the acknowledgment + is not completed before ack_timeout second + :return: ApplicationMessage used during inflight operations + """ if qos in (QOS_1, QOS_2): packet_id = self.session.next_packet_id if packet_id in self.session.inflight_out: @@ -171,7 +182,11 @@ class ProtocolHandler: message = OutgoingApplicationMessage(packet_id, topic, qos, data, retain) # Handle message flow - yield from asyncio.wait_for(self._handle_message_flow(message), 60, loop=self._loop) + if ack_timeout is not None and ack_timeout > 0: + yield from asyncio.wait_for(self._handle_message_flow(message), ack_timeout, loop=self._loop) + else: + yield from self._handle_message_flow(message) + return message @asyncio.coroutine