kopia lustrzana https://github.com/Yakifo/amqtt
Documentation update
rodzic
9ba8a46914
commit
6c0657b438
|
@ -1,6 +1,147 @@
|
|||
MQTTClient API reference
|
||||
========================
|
||||
MQTTClient API
|
||||
==============
|
||||
|
||||
.. autoclass:: hbmqtt.client.MQTTClient
|
||||
:members:
|
||||
:undoc-members:
|
||||
The :class:`~hbmqtt.client.MQTTClient` class implements the client part of MQTT protocol. It can be used to publish and/or subscribe MQTT message on a broker accessible on the network through TCP or websocket protocol, both secured or unsecured.
|
||||
|
||||
|
||||
Usage examples
|
||||
--------------
|
||||
|
||||
Subscriber
|
||||
..........
|
||||
|
||||
The example below shows how to write a simple MQTT client which subscribes a topic and prints every messages received from the broker :
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
@asyncio.coroutine
|
||||
def uptime_coro():
|
||||
C = MQTTClient()
|
||||
yield from C.connect('mqtt://test.mosquitto.org/')
|
||||
# Subscribe to '$SYS/broker/uptime' with QOS=1
|
||||
# Subscribe to '$SYS/broker/load/#' with QOS=2
|
||||
yield from C.subscribe([
|
||||
('$SYS/broker/uptime', QOS_1),
|
||||
('$SYS/broker/load/#', QOS_2),
|
||||
])
|
||||
try:
|
||||
for i in range(1, 100):
|
||||
message = yield from C.deliver_message()
|
||||
packet = message.publish_packet
|
||||
print("%d: %s => %s" % (i, packet.variable_header.topic_name, str(packet.payload.data)))
|
||||
yield from C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
|
||||
yield from C.disconnect()
|
||||
except ClientException as ce:
|
||||
logger.error("Client exception: %s" % ce)
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.get_event_loop().run_until_complete(uptime_coro())
|
||||
|
||||
When executed, this script gets the default event loop and asks it to run the ``uptime_coro`` until it completes.
|
||||
``uptime_coro`` starts by initializing a :class:`~hbmqtt.client.MQTTClient` instance.
|
||||
The coroutine then call :meth:`~hbmqtt.client.MQTTClient.connect` to connect to the broker, here ``test.mosquitto.org``.
|
||||
Once connected, the coroutine subscribes to some topics, and then wait for 100 messages. Each message received is simply written to output.
|
||||
Finally, the coroutine unsubscribes from topics and disconnects from the broker.
|
||||
|
||||
Publisher
|
||||
.........
|
||||
|
||||
The example below uses the :class:`~hbmqtt.client.MQTTClient` class to implement a publisher.
|
||||
This test publish 3 messages asynchronously to the broker on a test topic.
|
||||
For the purposes of the test, each message is published with a different Quality Of Service.
|
||||
This example also shows to method for publishing message asynchronously.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
C = MQTTClient()
|
||||
yield from C.connect('mqtt://test.mosquitto.org/')
|
||||
tasks = [
|
||||
asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_0')),
|
||||
asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)),
|
||||
asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)),
|
||||
]
|
||||
yield from asyncio.wait(tasks)
|
||||
logger.info("messages published")
|
||||
yield from C.disconnect()
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_coro2():
|
||||
try:
|
||||
C = MQTTClient()
|
||||
ret = yield from C.connect('mqtt://test.mosquitto.org:1883/')
|
||||
message = yield from C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)
|
||||
message = yield from C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)
|
||||
message = yield from C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
|
||||
#print(message)
|
||||
logger.info("messages published")
|
||||
yield from C.disconnect()
|
||||
except ConnectException as ce:
|
||||
logger.error("Connection failed: %s" % ce)
|
||||
asyncio.get_event_loop().stop()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
||||
logging.basicConfig(level=logging.DEBUG, format=formatter)
|
||||
asyncio.get_event_loop().run_until_complete(test_coro())
|
||||
asyncio.get_event_loop().run_until_complete(test_coro2())
|
||||
|
||||
As usual, the script runs the publish code through the async loop. ``test_coro()`` and ``test_coro()`` are ran in sequence.
|
||||
Both do the same job. ``test_coro()`` publish 3 messages in sequence. ``test_coro2()`` publishes the same message asynchronously.
|
||||
The difference appears the looking at the sequence of MQTT messages sent.
|
||||
|
||||
``test_coro()`` achieves:
|
||||
::
|
||||
|
||||
hbmqtt/YDYY;NNRpYQSy3?o -out-> PublishPacket(ts=2015-11-11 21:54:48.843901, fixed=MQTTFixedHeader(length=28, flags=0x0), variable=PublishVariableHeader(topic=a/b, packet_id=None), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_0'"))
|
||||
hbmqtt/YDYY;NNRpYQSy3?o -out-> PublishPacket(ts=2015-11-11 21:54:48.844152, fixed=MQTTFixedHeader(length=30, flags=0x2), variable=PublishVariableHeader(topic=a/b, packet_id=1), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_1'"))
|
||||
hbmqtt/YDYY;NNRpYQSy3?o <-in-- PubackPacket(ts=2015-11-11 21:54:48.979665, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=1), payload=None)
|
||||
hbmqtt/YDYY;NNRpYQSy3?o -out-> PublishPacket(ts=2015-11-11 21:54:48.980886, fixed=MQTTFixedHeader(length=30, flags=0x4), variable=PublishVariableHeader(topic=a/b, packet_id=2), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_2'"))
|
||||
hbmqtt/YDYY;NNRpYQSy3?o <-in-- PubrecPacket(ts=2015-11-11 21:54:49.029691, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None)
|
||||
hbmqtt/YDYY;NNRpYQSy3?o -out-> PubrelPacket(ts=2015-11-11 21:54:49.030823, fixed=MQTTFixedHeader(length=2, flags=0x2), variable=PacketIdVariableHeader(packet_id=2), payload=None)
|
||||
hbmqtt/YDYY;NNRpYQSy3?o <-in-- PubcompPacket(ts=2015-11-11 21:54:49.092514, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None)fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None)
|
||||
|
||||
while ``test_coro2()`` runs:
|
||||
::
|
||||
|
||||
hbmqtt/LYRf52W[56SOjW04 -out-> PublishPacket(ts=2015-11-11 21:54:48.466123, fixed=MQTTFixedHeader(length=28, flags=0x0), variable=PublishVariableHeader(topic=a/b, packet_id=None), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_0'"))
|
||||
hbmqtt/LYRf52W[56SOjW04 -out-> PublishPacket(ts=2015-11-11 21:54:48.466432, fixed=MQTTFixedHeader(length=30, flags=0x2), variable=PublishVariableHeader(topic=a/b, packet_id=1), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_1'"))
|
||||
hbmqtt/LYRf52W[56SOjW04 -out-> PublishPacket(ts=2015-11-11 21:54:48.466695, fixed=MQTTFixedHeader(length=30, flags=0x4), variable=PublishVariableHeader(topic=a/b, packet_id=2), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_2'"))
|
||||
hbmqtt/LYRf52W[56SOjW04 <-in-- PubackPacket(ts=2015-11-11 21:54:48.613062, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=1), payload=None)
|
||||
hbmqtt/LYRf52W[56SOjW04 <-in-- PubrecPacket(ts=2015-11-11 21:54:48.661073, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None)
|
||||
hbmqtt/LYRf52W[56SOjW04 -out-> PubrelPacket(ts=2015-11-11 21:54:48.661925, fixed=MQTTFixedHeader(length=2, flags=0x2), variable=PacketIdVariableHeader(packet_id=2), payload=None)
|
||||
hbmqtt/LYRf52W[56SOjW04 <-in-- PubcompPacket(ts=2015-11-11 21:54:48.713107, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None)
|
||||
|
||||
Both coroutines have the same results except that``test_coro2()`` manages messages flow in parallel which may be more efficient.
|
||||
|
||||
Reference
|
||||
---------
|
||||
|
||||
MQTTClient API
|
||||
..............
|
||||
|
||||
.. automodule:: hbmqtt.client
|
||||
|
||||
.. autoclass:: MQTTClient
|
||||
|
||||
.. automethod:: connect(uri=None, cleansession=None, cafile=None, capath=None, cadata=None)
|
||||
|
||||
.. automethod:: disconnect()
|
||||
|
||||
.. automethod:: reconnect(cleansession=None)
|
||||
|
||||
.. automethod:: ping()
|
||||
|
||||
.. automethod:: publish()
|
||||
|
||||
.. automethod:: subscribe()
|
||||
|
||||
.. automethod:: unsubscribe()
|
||||
|
||||
.. automethod:: deliver_message()
|
||||
|
||||
MQTTClient configuration
|
||||
........................
|
|
@ -73,12 +73,12 @@ def mqtt_connected(func):
|
|||
|
||||
class MQTTClient:
|
||||
"""
|
||||
MQTT client implementation. Create ``MQTTClient`` instances to connect to a broker and send/receive messages using the MQTT protocol
|
||||
MQTT client implementation. instances provides API for connecting to a broker and send/receive messages using the MQTT protocol
|
||||
|
||||
:param client_id: MWTT client ID to use when connecting to the broker. If none, it will generated randomly
|
||||
:param client_id: MQTT client ID to use when connecting to the broker. If none, it will generated randomly by :func:`hbmqtt.utils.gen_client_id`
|
||||
:param config: Client configuration
|
||||
:param loop: asynio loop to use
|
||||
:return:
|
||||
:return: class instance
|
||||
"""
|
||||
def __init__(self, client_id=None, config=None, loop=None):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
@ -116,12 +116,15 @@ class MQTTClient:
|
|||
capath=None,
|
||||
cadata=None):
|
||||
"""
|
||||
Connect to a remote broker
|
||||
Connect to a remote broker. Establish the network connection and send a `CONNECT <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028>`_ message.
|
||||
|
||||
:param uri: Broker URI connection, conforming to `MQTT URI scheme <https://github.com/mqtt/mqtt.github.io/wiki/URI-Scheme>`_.
|
||||
:param cleansession: MQTT CONNECT clean session flaf
|
||||
:param cafile: server certificate authority file
|
||||
:return:
|
||||
:param uri: Broker URI connection, conforming to `MQTT URI scheme <https://github.com/mqtt/mqtt.github.io/wiki/URI-Scheme>`_. Uses ``uri`` config attribute by default.
|
||||
:param cleansession: MQTT CONNECT clean session flag
|
||||
:param cafile: server certificate authority file (optional, used for secured connection)
|
||||
:param capath: server certificate authority path (optional, used for secured connection)
|
||||
:param cadata: server certificate authority data (optional, used for secured connection)
|
||||
:return: `CONNACK <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718033>`_ return code
|
||||
:raise: :class:`hbmqtt.client.ConnectException` if connection fails
|
||||
"""
|
||||
self.session = self._initsession(uri, cleansession, cafile, capath, cadata)
|
||||
self.logger.debug("Connect to: %s" % uri)
|
||||
|
@ -139,6 +142,10 @@ class MQTTClient:
|
|||
@mqtt_connected
|
||||
@asyncio.coroutine
|
||||
def disconnect(self):
|
||||
"""
|
||||
Disconnect from the connected broker.
|
||||
This method sends a `DISCONNECT <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090>`_ message and closes the network connection.
|
||||
"""
|
||||
if self.session.transitions.is_connected():
|
||||
if not self._disconnect_task.done():
|
||||
self._disconnect_task.cancel()
|
||||
|
@ -151,6 +158,15 @@ class MQTTClient:
|
|||
|
||||
@asyncio.coroutine
|
||||
def reconnect(self, cleansession=None):
|
||||
"""
|
||||
Reconnect a previously connectd broker (and which has been disconnected unexpectedly).
|
||||
Reconnection tries to establish a network connection and send a `CONNECT <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028>`_ message.
|
||||
Retries interval and attempts can be controled with the ``reconnect_max_interval`` and ``reconnect_retries`` configuration parameters.
|
||||
|
||||
:param cleansession: clean session flag used in MQTT CONNECT messages sent for reconnections.
|
||||
:return: `CONNACK <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718033>`_ return code
|
||||
:raise: :class:`hbmqtt.client.ConnectException` if re-connection fails after max retries.
|
||||
"""
|
||||
if self.session.transitions.is_connected():
|
||||
self.logger.warn("Client already connected")
|
||||
return CONNECTION_ACCEPTED
|
||||
|
@ -189,6 +205,7 @@ class MQTTClient:
|
|||
def ping(self):
|
||||
"""
|
||||
Send a MQTT ping request and wait for response
|
||||
|
||||
:return: None
|
||||
"""
|
||||
if self.session.transitions.is_connected():
|
||||
|
|
Ładowanie…
Reference in New Issue