kopia lustrzana https://github.com/Yakifo/amqtt
Improve documentation
rodzic
06597542ba
commit
82985b6f1c
|
@ -1,4 +1,95 @@
|
||||||
Broker API reference
|
Broker API reference
|
||||||
====================
|
====================
|
||||||
|
|
||||||
TBD
|
The :class:`~hbmqtt.broker.Broker` class provides a complete MQTT 3.1.1 broker implementation. This class allows Python developers to embed a MQTT broker in their own applications.
|
||||||
|
|
||||||
|
Usage example
|
||||||
|
-------------
|
||||||
|
|
||||||
|
The following example shows how to start a broker using the default configuration:
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
from hbmqtt.broker import Broker
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def test_coro():
|
||||||
|
broker = Broker()
|
||||||
|
yield from broker.start()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
|
||||||
|
logging.basicConfig(level=logging.INFO, format=formatter)
|
||||||
|
asyncio.get_event_loop().run_until_complete(broker_coro())
|
||||||
|
asyncio.get_event_loop().run_forever()
|
||||||
|
|
||||||
|
When executed, this script gets the default event loop and asks it to run the ``broker_coro`` until it completes.
|
||||||
|
``broker_coro`` creates :class:`~hbmqtt.broker.Broker` instance and then :meth:`~hbmqtt.broker.Broker.start` the broker for serving.
|
||||||
|
Once completed, the loop is ran forever, making this script never stop ...
|
||||||
|
|
||||||
|
Reference
|
||||||
|
---------
|
||||||
|
|
||||||
|
Broker API
|
||||||
|
..........
|
||||||
|
|
||||||
|
.. automodule:: hbmqtt.broker
|
||||||
|
|
||||||
|
.. autoclass:: Broker
|
||||||
|
|
||||||
|
.. automethod:: start
|
||||||
|
.. automethod:: shutdown
|
||||||
|
|
||||||
|
Broker configuration
|
||||||
|
....................
|
||||||
|
|
||||||
|
The :class:`~hbmqtt.broker.Broker` ``__init__`` method accepts a ``config`` parameter which allow to setup some behaviour and defaults settings. This argument must be a Python dict object. For convinience, it is presented below as a YAML file [1]_.
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
listeners:
|
||||||
|
default:
|
||||||
|
max-connections: 50000
|
||||||
|
type: tcp
|
||||||
|
my-tcp-1:
|
||||||
|
bind: 127.0.0.1:1883
|
||||||
|
my-tcp-2:
|
||||||
|
bind: 1.2.3.4:1883
|
||||||
|
max-connections: 1000
|
||||||
|
my-tcp-ssl-1:
|
||||||
|
bind: 127.0.0.1:8883
|
||||||
|
ssl: on
|
||||||
|
cafile: /some/cafile
|
||||||
|
capath: /some/folder
|
||||||
|
capath: certificate data
|
||||||
|
certfile: /some/certfile
|
||||||
|
keyfile: /some/key
|
||||||
|
my-ws-1:
|
||||||
|
bind: 0.0.0.0:8080
|
||||||
|
type: ws
|
||||||
|
timeout-disconnect-delay: 2
|
||||||
|
auth:
|
||||||
|
plugins: ['auth.anonymous'] #List of plugins to activate for authentication among all registered plugins
|
||||||
|
allow-anonymous: true / false
|
||||||
|
password-file: /some/passwd_file
|
||||||
|
|
||||||
|
The ``listeners`` section allows to define network listeners which must be started by the :class:`~hbmqtt.broker.Broker`. Several listeners can be setup. ``default`` subsection defines common attributes for all listeners. Each listener can have the following settings:
|
||||||
|
|
||||||
|
* ``bind``: IP address and port binding.
|
||||||
|
* ``max-connections``: Set maximum number of active connection for the listener. ``0`` means no limit.
|
||||||
|
* ``type``: transport protocol type; can be ``tcp`` for classic TCP listener or ``ws`` for MQTT over websocket.
|
||||||
|
* ``ssl`` enables (``on``) or disable secured connection over the transport protocol.
|
||||||
|
* ``cafile``, ``cadata``, ``certfile`` and ``keyfile`` : mandatory parameters for SSL secured connections.
|
||||||
|
|
||||||
|
The ``auth`` section setup authentication behaviour:
|
||||||
|
|
||||||
|
* ``plugins``: defines the list of activated plugins. Note the plugins must be defined in the ``hbmqtt.broker.plugins`` `entry point <https://pythonhosted.org/setuptools/setuptools.html#dynamic-discovery-of-services-and-plugins>`_.
|
||||||
|
* ``allow-anonymous`` : used by the internal :class:`hbmqtt.plugins.authentication.AnonymousAuthPlugin` plugin. This parameter enables (``on``) or disable anonymous connection, ie. connection without username.
|
||||||
|
* ``password-file`` : used by the internal :class:`hbmqtt.plugins.authentication.FileAuthPlugin` plugin. This parameter gives to path of the password file to load for authenticating users.
|
||||||
|
|
||||||
|
|
||||||
|
.. [1] See `PyYAML <http://pyyaml.org/wiki/PyYAMLDocumentation>`_ for loading YAML files as Python dict.
|
||||||
|
|
|
@ -15,7 +15,7 @@ ApplicationMessage
|
||||||
:members:
|
:members:
|
||||||
|
|
||||||
.. autoclass:: IncomingApplicationMessage
|
.. autoclass:: IncomingApplicationMessage
|
||||||
:members:
|
:show-inheritance:
|
||||||
|
|
||||||
.. autoclass:: OutgoingApplicationMessage
|
.. autoclass:: OutgoingApplicationMessage
|
||||||
:members:
|
:show-inheritance:
|
||||||
|
|
|
@ -138,43 +138,17 @@ class BrokerContext(BaseContext):
|
||||||
|
|
||||||
|
|
||||||
class Broker:
|
class Broker:
|
||||||
|
"""
|
||||||
|
MQTT 3.1.1 compliant broker implementation
|
||||||
|
|
||||||
|
:param config: Example Yaml config
|
||||||
|
:param loop: asyncio loop to use. Defaults to ``asyncio.get_event_loop()`` if none is given
|
||||||
|
:param plugin_namespace: Plugin namespace to use when loading plugin entry_points. Defaults to ``hbmqtt.broker.plugins``
|
||||||
|
|
||||||
|
"""
|
||||||
states = ['new', 'starting', 'started', 'not_started', 'stopping', 'stopped', 'not_stopped', 'stopped']
|
states = ['new', 'starting', 'started', 'not_started', 'stopping', 'stopped', 'not_stopped', 'stopped']
|
||||||
|
|
||||||
def __init__(self, config=None, loop=None, plugin_namespace=None):
|
def __init__(self, config=None, loop=None, plugin_namespace=None):
|
||||||
"""
|
|
||||||
|
|
||||||
:param config: Example Yaml config
|
|
||||||
listeners:
|
|
||||||
default: #Mandatory
|
|
||||||
max-connections: 50000
|
|
||||||
type: tcp
|
|
||||||
my-tcp-1:
|
|
||||||
bind: 127.0.0.1:1883
|
|
||||||
my-tcp-2:
|
|
||||||
bind: 1.2.3.4:1883
|
|
||||||
max-connections: 1000
|
|
||||||
my-tcp-ssl-1:
|
|
||||||
bind: 127.0.0.1:8883
|
|
||||||
ssl: on
|
|
||||||
cafile: /some/cafile
|
|
||||||
capath: /some/folder
|
|
||||||
capath: certificate data
|
|
||||||
certfile: /some/certfile
|
|
||||||
keyfile: /some/key
|
|
||||||
my-ws-1:
|
|
||||||
bind: 0.0.0.0:8080
|
|
||||||
type: ws
|
|
||||||
timeout-disconnect-delay: 2
|
|
||||||
auth:
|
|
||||||
plugins: ['auth.anonymous'] #List of plugins to activate for authentication among all registered plugins
|
|
||||||
allow-anonymous: true / false
|
|
||||||
password-file: /some/passwd_file
|
|
||||||
persistence:
|
|
||||||
plugin: 'persistence-sqlite'
|
|
||||||
|
|
||||||
:param loop:
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
self.config = _defaults
|
self.config = _defaults
|
||||||
if config is not None:
|
if config is not None:
|
||||||
|
@ -228,6 +202,13 @@ class Broker:
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def start(self):
|
def start(self):
|
||||||
|
"""
|
||||||
|
Start the broker to serve with the given configuration
|
||||||
|
|
||||||
|
Start method opens network sockets and will start listening for incoming connections.
|
||||||
|
|
||||||
|
This method is a *coroutine*.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
self._sessions = dict()
|
self._sessions = dict()
|
||||||
self._subscriptions = dict()
|
self._subscriptions = dict()
|
||||||
|
@ -295,6 +276,11 @@ class Broker:
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def shutdown(self):
|
def shutdown(self):
|
||||||
|
"""
|
||||||
|
Stop broker instance.
|
||||||
|
|
||||||
|
Closes all connected session, stop listening on network socket and free resources.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
self._sessions = dict()
|
self._sessions = dict()
|
||||||
self._subscriptions = dict()
|
self._subscriptions = dict()
|
||||||
|
|
|
@ -125,6 +125,8 @@ class MQTTClient:
|
||||||
|
|
||||||
At first, a network connection is established with the server using the given protocol (``mqtt``, ``mqtts``, ``ws`` or ``wss``). Once the socket is connected, a `CONNECT <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028>`_ message is sent with the requested informations.
|
At first, a network connection is established with the server using the given protocol (``mqtt``, ``mqtts``, ``ws`` or ``wss``). Once the socket is connected, a `CONNECT <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028>`_ message is sent with the requested informations.
|
||||||
|
|
||||||
|
This method is a *coroutine*.
|
||||||
|
|
||||||
:param uri: Broker URI connection, conforming to `MQTT URI scheme <https://github.com/mqtt/mqtt.github.io/wiki/URI-Scheme>`_. Uses ``uri`` config attribute by default.
|
:param uri: Broker URI connection, conforming to `MQTT URI scheme <https://github.com/mqtt/mqtt.github.io/wiki/URI-Scheme>`_. Uses ``uri`` config attribute by default.
|
||||||
:param cleansession: MQTT CONNECT clean session flag
|
:param cleansession: MQTT CONNECT clean session flag
|
||||||
:param cafile: server certificate authority file (optional, used for secured connection)
|
:param cafile: server certificate authority file (optional, used for secured connection)
|
||||||
|
@ -154,6 +156,8 @@ class MQTTClient:
|
||||||
Disconnect from the connected broker.
|
Disconnect from the connected broker.
|
||||||
|
|
||||||
This method sends a `DISCONNECT <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090>`_ message and closes the network socket.
|
This method sends a `DISCONNECT <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090>`_ message and closes the network socket.
|
||||||
|
|
||||||
|
This method is a *coroutine*.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if self.session.transitions.is_connected():
|
if self.session.transitions.is_connected():
|
||||||
|
@ -174,6 +178,8 @@ class MQTTClient:
|
||||||
Reconnection tries to establish a network connection and send a `CONNECT <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028>`_ message.
|
Reconnection tries to establish a network connection and send a `CONNECT <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028>`_ message.
|
||||||
Retries interval and attempts can be controled with the ``reconnect_max_interval`` and ``reconnect_retries`` configuration parameters.
|
Retries interval and attempts can be controled with the ``reconnect_max_interval`` and ``reconnect_retries`` configuration parameters.
|
||||||
|
|
||||||
|
This method is a *coroutine*.
|
||||||
|
|
||||||
:param cleansession: clean session flag used in MQTT CONNECT messages sent for reconnections.
|
:param cleansession: clean session flag used in MQTT CONNECT messages sent for reconnections.
|
||||||
:return: `CONNACK <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718033>`_ return code
|
:return: `CONNACK <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718033>`_ return code
|
||||||
:raise: :class:`hbmqtt.client.ConnectException` if re-connection fails after max retries.
|
:raise: :class:`hbmqtt.client.ConnectException` if re-connection fails after max retries.
|
||||||
|
@ -219,6 +225,8 @@ class MQTTClient:
|
||||||
Ping the broker.
|
Ping the broker.
|
||||||
|
|
||||||
Send a MQTT `PINGREQ <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718081>`_ message for response.
|
Send a MQTT `PINGREQ <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718081>`_ message for response.
|
||||||
|
|
||||||
|
This method is a *coroutine*.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if self.session.transitions.is_connected():
|
if self.session.transitions.is_connected():
|
||||||
|
@ -235,6 +243,8 @@ class MQTTClient:
|
||||||
|
|
||||||
Send a MQTT `PUBLISH <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037>`_ message and wait for acknowledgment depending on Quality Of Service
|
Send a MQTT `PUBLISH <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037>`_ message and wait for acknowledgment depending on Quality Of Service
|
||||||
|
|
||||||
|
This method is a *coroutine*.
|
||||||
|
|
||||||
:param topic: topic name to which message data is published
|
:param topic: topic name to which message data is published
|
||||||
:param message: payload message (as bytes) to send.
|
:param message: payload message (as bytes) to send.
|
||||||
:param qos: requested publish quality of service : QOS_0, QOS_1 or QOS_2. Defaults to ``default_qos`` config parameter or QOS_0.
|
:param qos: requested publish quality of service : QOS_0, QOS_1 or QOS_2. Defaults to ``default_qos`` config parameter or QOS_0.
|
||||||
|
@ -271,6 +281,8 @@ class MQTTClient:
|
||||||
|
|
||||||
Send a MQTT `SUBSCRIBE <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718063>`_ message and wait for broker acknowledgment.
|
Send a MQTT `SUBSCRIBE <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718063>`_ message and wait for broker acknowledgment.
|
||||||
|
|
||||||
|
This method is a *coroutine*.
|
||||||
|
|
||||||
:param topics: array of topics pattern to subscribe with associated QoS.
|
:param topics: array of topics pattern to subscribe with associated QoS.
|
||||||
:return: `SUBACK <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718068>`_ message return code.
|
:return: `SUBACK <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718068>`_ message return code.
|
||||||
|
|
||||||
|
@ -292,6 +304,8 @@ class MQTTClient:
|
||||||
|
|
||||||
Send a MQTT `UNSUBSCRIBE <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718072>`_ message and wait for broker `UNSUBACK <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718077>`_ message.
|
Send a MQTT `UNSUBSCRIBE <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718072>`_ message and wait for broker `UNSUBACK <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718077>`_ message.
|
||||||
|
|
||||||
|
This method is a *coroutine*.
|
||||||
|
|
||||||
:param topics: array of topics to unsubscribe from.
|
:param topics: array of topics to unsubscribe from.
|
||||||
|
|
||||||
Example of ``topics`` argument expected structure:
|
Example of ``topics`` argument expected structure:
|
||||||
|
@ -308,6 +322,8 @@ class MQTTClient:
|
||||||
|
|
||||||
Deliver next message received from the broker. If no message is available, this methods waits until next message arrives or ``timeout`` occurs.
|
Deliver next message received from the broker. If no message is available, this methods waits until next message arrives or ``timeout`` occurs.
|
||||||
|
|
||||||
|
This method is a *coroutine*.
|
||||||
|
|
||||||
:param timeout: maximum number of seconds to wait before returning. If timeout is not specified or None, there is no limit to the wait time until next message arrives.
|
:param timeout: maximum number of seconds to wait before returning. If timeout is not specified or None, there is no limit to the wait time until next message arrives.
|
||||||
:return: instance of :class:`hbmqtt.session.ApplicationMessage` containing received message information flow.
|
:return: instance of :class:`hbmqtt.session.ApplicationMessage` containing received message information flow.
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -20,23 +20,41 @@ class ApplicationMessage:
|
||||||
def __init__(self, packet_id, topic, qos, data, retain):
|
def __init__(self, packet_id, topic, qos, data, retain):
|
||||||
self.packet_id = packet_id
|
self.packet_id = packet_id
|
||||||
""" Publish message `packet identifier <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718025>`_"""
|
""" Publish message `packet identifier <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718025>`_"""
|
||||||
|
|
||||||
self.topic = topic
|
self.topic = topic
|
||||||
""" Publish message topic"""
|
""" Publish message topic"""
|
||||||
|
|
||||||
self.qos = qos
|
self.qos = qos
|
||||||
""" Publish message Quality of Service"""
|
""" Publish message Quality of Service"""
|
||||||
|
|
||||||
self.data = data
|
self.data = data
|
||||||
""" Publish message payload data"""
|
""" Publish message payload data"""
|
||||||
|
|
||||||
self.retain = retain
|
self.retain = retain
|
||||||
""" Publish message retain flag"""
|
""" Publish message retain flag"""
|
||||||
|
|
||||||
self.publish_packet = None
|
self.publish_packet = None
|
||||||
""" :class:`hbmqtt.mqtt.publish.PublishPacket` instance corresponding to the `PUBLISH <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037>`_ packet in the messages flow. ``None`` if the PUBLISH packet has not already been received or sent."""
|
""" :class:`hbmqtt.mqtt.publish.PublishPacket` instance corresponding to the `PUBLISH <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037>`_ packet in the messages flow. ``None`` if the PUBLISH packet has not already been received or sent."""
|
||||||
|
|
||||||
self.puback_packet = None
|
self.puback_packet = None
|
||||||
""" :class:`hbmqtt.mqtt.puback.PubackPacket` instance corresponding to the `PUBACK <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718043>`_ packet in the messages flow. ``None`` if QoS != QOS_1 or if the PUBACK packet has not already been received or sent."""
|
""" :class:`hbmqtt.mqtt.puback.PubackPacket` instance corresponding to the `PUBACK <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718043>`_ packet in the messages flow. ``None`` if QoS != QOS_1 or if the PUBACK packet has not already been received or sent."""
|
||||||
|
|
||||||
self.pubrec_packet = None
|
self.pubrec_packet = None
|
||||||
|
""" :class:`hbmqtt.mqtt.puback.PubrecPacket` instance corresponding to the `PUBREC <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718048>`_ packet in the messages flow. ``None`` if QoS != QOS_2 or if the PUBREC packet has not already been received or sent."""
|
||||||
|
|
||||||
self.pubrel_packet = None
|
self.pubrel_packet = None
|
||||||
|
""" :class:`hbmqtt.mqtt.puback.PubrelPacket` instance corresponding to the `PUBREL <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718053>`_ packet in the messages flow. ``None`` if QoS != QOS_2 or if the PUBREL packet has not already been received or sent."""
|
||||||
|
|
||||||
self.pubcomp_packet = None
|
self.pubcomp_packet = None
|
||||||
|
""" :class:`hbmqtt.mqtt.puback.PubrelPacket` instance corresponding to the `PUBCOMP <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718058>`_ packet in the messages flow. ``None`` if QoS != QOS_2 or if the PUBCOMP packet has not already been received or sent."""
|
||||||
|
|
||||||
def build_publish_packet(self, dup=False):
|
def build_publish_packet(self, dup=False):
|
||||||
|
"""
|
||||||
|
Build :class:`hbmqtt.mqtt.publish.PublishPacket` from attributes
|
||||||
|
|
||||||
|
:param dup: force dup flag
|
||||||
|
:return: :class:`hbmqtt.mqtt.publish.PublishPacket` built from ApplicationMessage instance attributes
|
||||||
|
"""
|
||||||
return PublishPacket.build(self.topic, self.data, self.packet_id, dup, self.qos, self.retain)
|
return PublishPacket.build(self.topic, self.data, self.packet_id, dup, self.qos, self.retain)
|
||||||
|
|
||||||
def __eq__(self, other):
|
def __eq__(self, other):
|
||||||
|
|
Ładowanie…
Reference in New Issue