diff --git a/docs/references/mqttclient.rst b/docs/references/mqttclient.rst index e4fed06..b33d2df 100644 --- a/docs/references/mqttclient.rst +++ b/docs/references/mqttclient.rst @@ -1,6 +1,147 @@ -MQTTClient API reference -======================== +MQTTClient API +============== -.. autoclass:: hbmqtt.client.MQTTClient - :members: - :undoc-members: \ No newline at end of file +The :class:`~hbmqtt.client.MQTTClient` class implements the client part of MQTT protocol. It can be used to publish and/or subscribe MQTT message on a broker accessible on the network through TCP or websocket protocol, both secured or unsecured. + + +Usage examples +-------------- + +Subscriber +.......... + +The example below shows how to write a simple MQTT client which subscribes a topic and prints every messages received from the broker : + +.. code-block:: python + + @asyncio.coroutine + def uptime_coro(): + C = MQTTClient() + yield from C.connect('mqtt://test.mosquitto.org/') + # Subscribe to '$SYS/broker/uptime' with QOS=1 + # Subscribe to '$SYS/broker/load/#' with QOS=2 + yield from C.subscribe([ + ('$SYS/broker/uptime', QOS_1), + ('$SYS/broker/load/#', QOS_2), + ]) + try: + for i in range(1, 100): + message = yield from C.deliver_message() + packet = message.publish_packet + print("%d: %s => %s" % (i, packet.variable_header.topic_name, str(packet.payload.data))) + yield from C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#']) + yield from C.disconnect() + except ClientException as ce: + logger.error("Client exception: %s" % ce) + + if __name__ == '__main__': + asyncio.get_event_loop().run_until_complete(uptime_coro()) + +When executed, this script gets the default event loop and asks it to run the ``uptime_coro`` until it completes. +``uptime_coro`` starts by initializing a :class:`~hbmqtt.client.MQTTClient` instance. +The coroutine then call :meth:`~hbmqtt.client.MQTTClient.connect` to connect to the broker, here ``test.mosquitto.org``. +Once connected, the coroutine subscribes to some topics, and then wait for 100 messages. Each message received is simply written to output. +Finally, the coroutine unsubscribes from topics and disconnects from the broker. + +Publisher +......... + +The example below uses the :class:`~hbmqtt.client.MQTTClient` class to implement a publisher. +This test publish 3 messages asynchronously to the broker on a test topic. +For the purposes of the test, each message is published with a different Quality Of Service. +This example also shows to method for publishing message asynchronously. + +.. code-block:: python + + @asyncio.coroutine + def test_coro(): + C = MQTTClient() + yield from C.connect('mqtt://test.mosquitto.org/') + tasks = [ + asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_0')), + asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)), + asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)), + ] + yield from asyncio.wait(tasks) + logger.info("messages published") + yield from C.disconnect() + + + @asyncio.coroutine + def test_coro2(): + try: + C = MQTTClient() + ret = yield from C.connect('mqtt://test.mosquitto.org:1883/') + message = yield from C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0) + message = yield from C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1) + message = yield from C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2) + #print(message) + logger.info("messages published") + yield from C.disconnect() + except ConnectException as ce: + logger.error("Connection failed: %s" % ce) + asyncio.get_event_loop().stop() + + + if __name__ == '__main__': + formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" + logging.basicConfig(level=logging.DEBUG, format=formatter) + asyncio.get_event_loop().run_until_complete(test_coro()) + asyncio.get_event_loop().run_until_complete(test_coro2()) + +As usual, the script runs the publish code through the async loop. ``test_coro()`` and ``test_coro()`` are ran in sequence. +Both do the same job. ``test_coro()`` publish 3 messages in sequence. ``test_coro2()`` publishes the same message asynchronously. +The difference appears the looking at the sequence of MQTT messages sent. + +``test_coro()`` achieves: +:: + + hbmqtt/YDYY;NNRpYQSy3?o -out-> PublishPacket(ts=2015-11-11 21:54:48.843901, fixed=MQTTFixedHeader(length=28, flags=0x0), variable=PublishVariableHeader(topic=a/b, packet_id=None), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_0'")) + hbmqtt/YDYY;NNRpYQSy3?o -out-> PublishPacket(ts=2015-11-11 21:54:48.844152, fixed=MQTTFixedHeader(length=30, flags=0x2), variable=PublishVariableHeader(topic=a/b, packet_id=1), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_1'")) + hbmqtt/YDYY;NNRpYQSy3?o <-in-- PubackPacket(ts=2015-11-11 21:54:48.979665, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=1), payload=None) + hbmqtt/YDYY;NNRpYQSy3?o -out-> PublishPacket(ts=2015-11-11 21:54:48.980886, fixed=MQTTFixedHeader(length=30, flags=0x4), variable=PublishVariableHeader(topic=a/b, packet_id=2), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_2'")) + hbmqtt/YDYY;NNRpYQSy3?o <-in-- PubrecPacket(ts=2015-11-11 21:54:49.029691, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None) + hbmqtt/YDYY;NNRpYQSy3?o -out-> PubrelPacket(ts=2015-11-11 21:54:49.030823, fixed=MQTTFixedHeader(length=2, flags=0x2), variable=PacketIdVariableHeader(packet_id=2), payload=None) + hbmqtt/YDYY;NNRpYQSy3?o <-in-- PubcompPacket(ts=2015-11-11 21:54:49.092514, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None)fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None) + +while ``test_coro2()`` runs: +:: + + hbmqtt/LYRf52W[56SOjW04 -out-> PublishPacket(ts=2015-11-11 21:54:48.466123, fixed=MQTTFixedHeader(length=28, flags=0x0), variable=PublishVariableHeader(topic=a/b, packet_id=None), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_0'")) + hbmqtt/LYRf52W[56SOjW04 -out-> PublishPacket(ts=2015-11-11 21:54:48.466432, fixed=MQTTFixedHeader(length=30, flags=0x2), variable=PublishVariableHeader(topic=a/b, packet_id=1), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_1'")) + hbmqtt/LYRf52W[56SOjW04 -out-> PublishPacket(ts=2015-11-11 21:54:48.466695, fixed=MQTTFixedHeader(length=30, flags=0x4), variable=PublishVariableHeader(topic=a/b, packet_id=2), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_2'")) + hbmqtt/LYRf52W[56SOjW04 <-in-- PubackPacket(ts=2015-11-11 21:54:48.613062, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=1), payload=None) + hbmqtt/LYRf52W[56SOjW04 <-in-- PubrecPacket(ts=2015-11-11 21:54:48.661073, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None) + hbmqtt/LYRf52W[56SOjW04 -out-> PubrelPacket(ts=2015-11-11 21:54:48.661925, fixed=MQTTFixedHeader(length=2, flags=0x2), variable=PacketIdVariableHeader(packet_id=2), payload=None) + hbmqtt/LYRf52W[56SOjW04 <-in-- PubcompPacket(ts=2015-11-11 21:54:48.713107, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None) + +Both coroutines have the same results except that``test_coro2()`` manages messages flow in parallel which may be more efficient. + +Reference +--------- + +MQTTClient API +.............. + +.. automodule:: hbmqtt.client + + .. autoclass:: MQTTClient + + .. automethod:: connect(uri=None, cleansession=None, cafile=None, capath=None, cadata=None) + + .. automethod:: disconnect() + + .. automethod:: reconnect(cleansession=None) + + .. automethod:: ping() + + .. automethod:: publish() + + .. automethod:: subscribe() + + .. automethod:: unsubscribe() + + .. automethod:: deliver_message() + +MQTTClient configuration +........................ \ No newline at end of file diff --git a/hbmqtt/client.py b/hbmqtt/client.py index 4ab2ee0..b73c3f7 100644 --- a/hbmqtt/client.py +++ b/hbmqtt/client.py @@ -73,12 +73,12 @@ def mqtt_connected(func): class MQTTClient: """ - MQTT client implementation. Create ``MQTTClient`` instances to connect to a broker and send/receive messages using the MQTT protocol + MQTT client implementation. instances provides API for connecting to a broker and send/receive messages using the MQTT protocol - :param client_id: MWTT client ID to use when connecting to the broker. If none, it will generated randomly + :param client_id: MQTT client ID to use when connecting to the broker. If none, it will generated randomly by :func:`hbmqtt.utils.gen_client_id` :param config: Client configuration :param loop: asynio loop to use - :return: + :return: class instance """ def __init__(self, client_id=None, config=None, loop=None): self.logger = logging.getLogger(__name__) @@ -116,12 +116,15 @@ class MQTTClient: capath=None, cadata=None): """ - Connect to a remote broker + Connect to a remote broker. Establish the network connection and send a `CONNECT `_ message. - :param uri: Broker URI connection, conforming to `MQTT URI scheme `_. - :param cleansession: MQTT CONNECT clean session flaf - :param cafile: server certificate authority file - :return: + :param uri: Broker URI connection, conforming to `MQTT URI scheme `_. Uses ``uri`` config attribute by default. + :param cleansession: MQTT CONNECT clean session flag + :param cafile: server certificate authority file (optional, used for secured connection) + :param capath: server certificate authority path (optional, used for secured connection) + :param cadata: server certificate authority data (optional, used for secured connection) + :return: `CONNACK `_ return code + :raise: :class:`hbmqtt.client.ConnectException` if connection fails """ self.session = self._initsession(uri, cleansession, cafile, capath, cadata) self.logger.debug("Connect to: %s" % uri) @@ -139,6 +142,10 @@ class MQTTClient: @mqtt_connected @asyncio.coroutine def disconnect(self): + """ + Disconnect from the connected broker. + This method sends a `DISCONNECT `_ message and closes the network connection. + """ if self.session.transitions.is_connected(): if not self._disconnect_task.done(): self._disconnect_task.cancel() @@ -151,6 +158,15 @@ class MQTTClient: @asyncio.coroutine def reconnect(self, cleansession=None): + """ + Reconnect a previously connectd broker (and which has been disconnected unexpectedly). + Reconnection tries to establish a network connection and send a `CONNECT `_ message. + Retries interval and attempts can be controled with the ``reconnect_max_interval`` and ``reconnect_retries`` configuration parameters. + + :param cleansession: clean session flag used in MQTT CONNECT messages sent for reconnections. + :return: `CONNACK `_ return code + :raise: :class:`hbmqtt.client.ConnectException` if re-connection fails after max retries. + """ if self.session.transitions.is_connected(): self.logger.warn("Client already connected") return CONNECTION_ACCEPTED @@ -189,6 +205,7 @@ class MQTTClient: def ping(self): """ Send a MQTT ping request and wait for response + :return: None """ if self.session.transitions.is_connected():