kopia lustrzana https://github.com/Yakifo/amqtt
519 wiersze
21 KiB
Python
519 wiersze
21 KiB
Python
# Copyright (c) 2015 Nicolas JOUANIN
|
|
#
|
|
# See the file license.txt for copying permission.
|
|
|
|
import asyncio
|
|
import logging
|
|
import ssl
|
|
import copy
|
|
from urllib.parse import urlparse, urlunparse
|
|
from functools import wraps
|
|
|
|
from hbmqtt.utils import not_in_dict_or_none
|
|
from hbmqtt.session import Session
|
|
from hbmqtt.mqtt.connack import CONNECTION_ACCEPTED
|
|
from hbmqtt.mqtt.protocol.client_handler import ClientProtocolHandler
|
|
from hbmqtt.adapters import StreamReaderAdapter, StreamWriterAdapter, WebSocketsReader, WebSocketsWriter
|
|
from hbmqtt.plugins.manager import PluginManager, BaseContext
|
|
from hbmqtt.mqtt.protocol.handler import ProtocolHandlerException
|
|
from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
|
|
import websockets
|
|
from websockets.uri import InvalidURI
|
|
from websockets.exceptions import InvalidHandshake
|
|
from collections import deque
|
|
|
|
|
|
_defaults = {
|
|
'keep_alive': 10,
|
|
'ping_delay': 1,
|
|
'default_qos': 0,
|
|
'default_retain': False,
|
|
'auto_reconnect': True,
|
|
'reconnect_max_interval': 10,
|
|
'reconnect_retries': 2,
|
|
}
|
|
|
|
|
|
class ClientException(Exception):
|
|
pass
|
|
|
|
|
|
class ConnectException(ClientException):
|
|
pass
|
|
|
|
|
|
class ClientContext(BaseContext):
|
|
"""
|
|
ClientContext is used as the context passed to plugins interacting with the client.
|
|
It act as an adapter to client services from plugins
|
|
"""
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.config = None
|
|
|
|
|
|
base_logger = logging.getLogger(__name__)
|
|
|
|
|
|
def mqtt_connected(func):
|
|
"""
|
|
MQTTClient coroutines decorator which will wait until connection before calling the decorated method.
|
|
:param func: coroutine to be called once connected
|
|
:return: coroutine result
|
|
"""
|
|
@asyncio.coroutine
|
|
@wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
if not self._connected_state.is_set():
|
|
base_logger.warning("Client not connected, waiting for it")
|
|
asyncio.wait([self._connected_state.wait(), self._no_more_connections.wait()], return_when=asyncio.FIRST_COMPLETED)
|
|
if self._no_more_connections.is_set():
|
|
raise ClientException("Will not reconnect")
|
|
return (yield from func(self, *args, **kwargs))
|
|
return wrapper
|
|
|
|
|
|
class MQTTClient:
|
|
"""
|
|
MQTT client implementation.
|
|
|
|
MQTTClient instances provides API for connecting to a broker and send/receive messages using the MQTT protocol.
|
|
|
|
:param client_id: MQTT client ID to use when connecting to the broker. If none, it will generated randomly by :func:`hbmqtt.utils.gen_client_id`
|
|
:param config: Client configuration
|
|
:param loop: asynio loop to use
|
|
:return: class instance
|
|
"""
|
|
|
|
def __init__(self, client_id=None, config=None, loop=None):
|
|
self.logger = logging.getLogger(__name__)
|
|
self.config = copy.deepcopy(_defaults)
|
|
if config is not None:
|
|
self.config.update(config)
|
|
if client_id is not None:
|
|
self.client_id = client_id
|
|
else:
|
|
from hbmqtt.utils import gen_client_id
|
|
self.client_id = gen_client_id()
|
|
self.logger.debug("Using generated client ID : %s" % self.client_id)
|
|
|
|
if loop is not None:
|
|
self._loop = loop
|
|
else:
|
|
self._loop = asyncio.get_event_loop()
|
|
self.session = None
|
|
self._handler = None
|
|
self._disconnect_task = None
|
|
self._connected_state = asyncio.Event(loop=self._loop)
|
|
self._no_more_connections = asyncio.Event(loop=self._loop)
|
|
|
|
# Init plugins manager
|
|
context = ClientContext()
|
|
context.config = self.config
|
|
self.plugins_manager = PluginManager('hbmqtt.client.plugins', context)
|
|
self.client_tasks = deque()
|
|
|
|
@asyncio.coroutine
|
|
def connect(self,
|
|
uri=None,
|
|
cleansession=None,
|
|
cafile=None,
|
|
capath=None,
|
|
cadata=None):
|
|
"""
|
|
Connect to a remote broker.
|
|
|
|
At first, a network connection is established with the server using the given protocol (``mqtt``, ``mqtts``, ``ws`` or ``wss``). Once the socket is connected, a `CONNECT <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028>`_ message is sent with the requested informations.
|
|
|
|
This method is a *coroutine*.
|
|
|
|
:param uri: Broker URI connection, conforming to `MQTT URI scheme <https://github.com/mqtt/mqtt.github.io/wiki/URI-Scheme>`_. Uses ``uri`` config attribute by default.
|
|
:param cleansession: MQTT CONNECT clean session flag
|
|
:param cafile: server certificate authority file (optional, used for secured connection)
|
|
:param capath: server certificate authority path (optional, used for secured connection)
|
|
:param cadata: server certificate authority data (optional, used for secured connection)
|
|
:return: `CONNACK <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718033>`_ return code
|
|
:raise: :class:`hbmqtt.client.ConnectException` if connection fails
|
|
"""
|
|
|
|
self.session = self._initsession(uri, cleansession, cafile, capath, cadata)
|
|
self.logger.debug("Connect to: %s" % uri)
|
|
|
|
try:
|
|
return (yield from self._do_connect())
|
|
except BaseException as be:
|
|
self.logger.warning("Connection failed: %r" % be)
|
|
auto_reconnect = self.config.get('auto_reconnect', False)
|
|
if not auto_reconnect:
|
|
raise
|
|
else:
|
|
return (yield from self.reconnect())
|
|
|
|
@asyncio.coroutine
|
|
def disconnect(self):
|
|
"""
|
|
Disconnect from the connected broker.
|
|
|
|
This method sends a `DISCONNECT <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090>`_ message and closes the network socket.
|
|
|
|
This method is a *coroutine*.
|
|
"""
|
|
|
|
if self.session.transitions.is_connected():
|
|
if not self._disconnect_task.done():
|
|
self._disconnect_task.cancel()
|
|
yield from self._handler.mqtt_disconnect()
|
|
self._connected_state.clear()
|
|
yield from self._handler.stop()
|
|
self.session.transitions.disconnect()
|
|
else:
|
|
self.logger.warning("Client session is not currently connected, ignoring call")
|
|
|
|
@asyncio.coroutine
|
|
def reconnect(self, cleansession=None):
|
|
"""
|
|
Reconnect a previously connected broker.
|
|
|
|
Reconnection tries to establish a network connection and send a `CONNECT <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028>`_ message.
|
|
Retries interval and attempts can be controled with the ``reconnect_max_interval`` and ``reconnect_retries`` configuration parameters.
|
|
|
|
This method is a *coroutine*.
|
|
|
|
:param cleansession: clean session flag used in MQTT CONNECT messages sent for reconnections.
|
|
:return: `CONNACK <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718033>`_ return code
|
|
:raise: :class:`hbmqtt.client.ConnectException` if re-connection fails after max retries.
|
|
"""
|
|
|
|
if self.session.transitions.is_connected():
|
|
self.logger.warning("Client already connected")
|
|
return CONNECTION_ACCEPTED
|
|
|
|
if cleansession:
|
|
self.session.clean_session = cleansession
|
|
self.logger.debug("Reconnecting with session parameters: %s" % self.session)
|
|
reconnect_max_interval = self.config.get('reconnect_max_interval', 10)
|
|
reconnect_retries = self.config.get('reconnect_retries', 5)
|
|
nb_attempt = 1
|
|
yield from asyncio.sleep(1, loop=self._loop)
|
|
while True:
|
|
try:
|
|
self.logger.debug("Reconnect attempt %d ..." % nb_attempt)
|
|
return (yield from self._do_connect())
|
|
except BaseException as e:
|
|
self.logger.warning("Reconnection attempt failed: %r" % e)
|
|
if nb_attempt > reconnect_retries:
|
|
self.logger.error("Maximum number of connection attempts reached. Reconnection aborted")
|
|
raise ConnectException("Too many connection attempts failed")
|
|
exp = 2 ** nb_attempt
|
|
delay = exp if exp < reconnect_max_interval else reconnect_max_interval
|
|
self.logger.debug("Waiting %d second before next attempt" % delay)
|
|
yield from asyncio.sleep(delay, loop=self._loop)
|
|
nb_attempt += 1
|
|
|
|
@asyncio.coroutine
|
|
def _do_connect(self):
|
|
return_code = yield from self._connect_coro()
|
|
self._disconnect_task = asyncio.ensure_future(self.handle_connection_close(), loop=self._loop)
|
|
return return_code
|
|
|
|
@mqtt_connected
|
|
@asyncio.coroutine
|
|
def ping(self):
|
|
"""
|
|
Ping the broker.
|
|
|
|
Send a MQTT `PINGREQ <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718081>`_ message for response.
|
|
|
|
This method is a *coroutine*.
|
|
"""
|
|
|
|
if self.session.transitions.is_connected():
|
|
yield from self._handler.mqtt_ping()
|
|
else:
|
|
self.logger.warning("MQTT PING request incompatible with current session state '%s'" %
|
|
self.session.transitions.state)
|
|
|
|
@mqtt_connected
|
|
@asyncio.coroutine
|
|
def publish(self, topic, message, qos=None, retain=None):
|
|
"""
|
|
Publish a message to the broker.
|
|
|
|
Send a MQTT `PUBLISH <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037>`_ message and wait for acknowledgment depending on Quality Of Service
|
|
|
|
This method is a *coroutine*.
|
|
|
|
:param topic: topic name to which message data is published
|
|
:param message: payload message (as bytes) to send.
|
|
:param qos: requested publish quality of service : QOS_0, QOS_1 or QOS_2. Defaults to ``default_qos`` config parameter or QOS_0.
|
|
:param retain: retain flag. Defaults to ``default_retain`` config parameter or False.
|
|
"""
|
|
|
|
def get_retain_and_qos():
|
|
if qos:
|
|
assert qos in (QOS_0, QOS_1, QOS_2)
|
|
_qos = qos
|
|
else:
|
|
_qos = self.config['default_qos']
|
|
try:
|
|
_qos = self.config['topics'][topic]['qos']
|
|
except KeyError:
|
|
pass
|
|
if retain:
|
|
_retain = retain
|
|
else:
|
|
_retain = self.config['default_retain']
|
|
try:
|
|
_retain = self.config['topics'][topic]['retain']
|
|
except KeyError:
|
|
pass
|
|
return _qos, _retain
|
|
(app_qos, app_retain) = get_retain_and_qos()
|
|
return (yield from self._handler.mqtt_publish(topic, message, app_qos, app_retain))
|
|
|
|
@mqtt_connected
|
|
@asyncio.coroutine
|
|
def subscribe(self, topics):
|
|
"""
|
|
Subscribe to some topics.
|
|
|
|
Send a MQTT `SUBSCRIBE <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718063>`_ message and wait for broker acknowledgment.
|
|
|
|
This method is a *coroutine*.
|
|
|
|
:param topics: array of topics pattern to subscribe with associated QoS.
|
|
:return: `SUBACK <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718068>`_ message return code.
|
|
|
|
Example of ``topics`` argument expected structure:
|
|
::
|
|
|
|
[
|
|
('$SYS/broker/uptime', QOS_1),
|
|
('$SYS/broker/load/#', QOS_2),
|
|
]
|
|
"""
|
|
return (yield from self._handler.mqtt_subscribe(topics, self.session.next_packet_id))
|
|
|
|
@mqtt_connected
|
|
@asyncio.coroutine
|
|
def unsubscribe(self, topics):
|
|
"""
|
|
Unsubscribe from some topics.
|
|
|
|
Send a MQTT `UNSUBSCRIBE <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718072>`_ message and wait for broker `UNSUBACK <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718077>`_ message.
|
|
|
|
This method is a *coroutine*.
|
|
|
|
:param topics: array of topics to unsubscribe from.
|
|
|
|
Example of ``topics`` argument expected structure:
|
|
::
|
|
|
|
['$SYS/broker/uptime', '$SYS/broker/load/#']
|
|
"""
|
|
yield from self._handler.mqtt_unsubscribe(topics, self.session.next_packet_id)
|
|
|
|
@asyncio.coroutine
|
|
def deliver_message(self, timeout=None):
|
|
"""
|
|
Deliver next received message.
|
|
|
|
Deliver next message received from the broker. If no message is available, this methods waits until next message arrives or ``timeout`` occurs.
|
|
|
|
This method is a *coroutine*.
|
|
|
|
:param timeout: maximum number of seconds to wait before returning. If timeout is not specified or None, there is no limit to the wait time until next message arrives.
|
|
:return: instance of :class:`hbmqtt.session.ApplicationMessage` containing received message information flow.
|
|
:raises: :class:`asyncio.TimeoutError` if timeout occurs before a message is delivered
|
|
"""
|
|
deliver_task = asyncio.ensure_future(self._handler.mqtt_deliver_next_message(), loop=self._loop)
|
|
self.client_tasks.append(deliver_task)
|
|
self.logger.debug("Waiting message delivery")
|
|
done, pending = yield from asyncio.wait([deliver_task], loop=self._loop, return_when=asyncio.FIRST_EXCEPTION, timeout=timeout)
|
|
if deliver_task in done:
|
|
if deliver_task.exception() is not None:
|
|
# deliver_task raised an exception, pass it on to our caller
|
|
raise deliver_task.exception()
|
|
self.client_tasks.pop()
|
|
return deliver_task.result()
|
|
else:
|
|
#timeout occured before message received
|
|
deliver_task.cancel()
|
|
raise asyncio.TimeoutError
|
|
|
|
@asyncio.coroutine
|
|
def _connect_coro(self):
|
|
kwargs = dict()
|
|
|
|
# Decode URI attributes
|
|
uri_attributes = urlparse(self.session.broker_uri)
|
|
scheme = uri_attributes.scheme
|
|
secure = True if scheme in ('mqtts', 'wss') else False
|
|
self.session.username = self.session.username if self.session.username else uri_attributes.username
|
|
self.session.password = self.session.password if self.session.password else uri_attributes.password
|
|
self.session.remote_address = uri_attributes.hostname
|
|
self.session.remote_port = uri_attributes.port
|
|
if scheme in ('mqtt', 'mqtts') and not self.session.remote_port:
|
|
self.session.remote_port = 8883 if scheme == 'mqtts' else 1883
|
|
if scheme in ('ws', 'wss') and not self.session.remote_port:
|
|
self.session.remote_port = 443 if scheme == 'wss' else 80
|
|
if scheme in ('ws', 'wss'):
|
|
# Rewrite URI to conform to https://tools.ietf.org/html/rfc6455#section-3
|
|
uri = (scheme, self.session.remote_address + ":" + str(self.session.remote_port), uri_attributes[2],
|
|
uri_attributes[3], uri_attributes[4], uri_attributes[5])
|
|
self.session.broker_uri = urlunparse(uri)
|
|
# Init protocol handler
|
|
#if not self._handler:
|
|
self._handler = ClientProtocolHandler(self.plugins_manager, loop=self._loop)
|
|
|
|
if secure:
|
|
sc = ssl.create_default_context(
|
|
ssl.Purpose.SERVER_AUTH,
|
|
cafile=self.session.cafile,
|
|
capath=self.session.capath,
|
|
cadata=self.session.cadata)
|
|
if 'certfile' in self.config and 'keyfile' in self.config:
|
|
sc.load_cert_chain(self.config['certfile'], self.config['keyfile'])
|
|
if 'check_hostname' in self.config and isinstance(self.config['check_hostname'], bool):
|
|
sc.check_hostname = self.config['check_hostname']
|
|
kwargs['ssl'] = sc
|
|
|
|
try:
|
|
reader = None
|
|
writer = None
|
|
self._connected_state.clear()
|
|
# Open connection
|
|
if scheme in ('mqtt', 'mqtts'):
|
|
conn_reader, conn_writer = \
|
|
yield from asyncio.open_connection(
|
|
self.session.remote_address,
|
|
self.session.remote_port, loop=self._loop, **kwargs)
|
|
reader = StreamReaderAdapter(conn_reader)
|
|
writer = StreamWriterAdapter(conn_writer)
|
|
elif scheme in ('ws', 'wss'):
|
|
websocket = yield from websockets.connect(
|
|
self.session.broker_uri,
|
|
subprotocols=['mqtt'],
|
|
loop=self._loop,
|
|
**kwargs)
|
|
reader = WebSocketsReader(websocket)
|
|
writer = WebSocketsWriter(websocket)
|
|
# Start MQTT protocol
|
|
self._handler.attach(self.session, reader, writer)
|
|
return_code = yield from self._handler.mqtt_connect()
|
|
if return_code is not CONNECTION_ACCEPTED:
|
|
self.session.transitions.disconnect()
|
|
self.logger.warning("Connection rejected with code '%s'" % return_code)
|
|
exc = ConnectException("Connection rejected by broker")
|
|
exc.return_code = return_code
|
|
raise exc
|
|
else:
|
|
# Handle MQTT protocol
|
|
yield from self._handler.start()
|
|
self.session.transitions.connect()
|
|
self._connected_state.set()
|
|
self.logger.debug("connected to %s:%s" % (self.session.remote_address, self.session.remote_port))
|
|
return return_code
|
|
except InvalidURI as iuri:
|
|
self.logger.warning("connection failed: invalid URI '%s'" % self.session.broker_uri)
|
|
self.session.transitions.disconnect()
|
|
raise ConnectException("connection failed: invalid URI '%s'" % self.session.broker_uri, iuri)
|
|
except InvalidHandshake as ihs:
|
|
self.logger.warning("connection failed: invalid websocket handshake")
|
|
self.session.transitions.disconnect()
|
|
raise ConnectException("connection failed: invalid websocket handshake", ihs)
|
|
except (ProtocolHandlerException, ConnectionError, OSError) as e:
|
|
self.logger.warning("MQTT connection failed: %r" % e)
|
|
self.session.transitions.disconnect()
|
|
raise ConnectException(e)
|
|
|
|
@asyncio.coroutine
|
|
def handle_connection_close(self):
|
|
|
|
def cancel_tasks():
|
|
self._no_more_connections.set()
|
|
while self.client_tasks:
|
|
task = self.client_tasks.popleft()
|
|
if not task.done():
|
|
task.set_exception(ClientException("Connection lost"))
|
|
|
|
self.logger.debug("Watch broker disconnection")
|
|
# Wait for disconnection from broker (like connection lost)
|
|
yield from self._handler.wait_disconnect()
|
|
self.logger.warning("Disconnected from broker")
|
|
|
|
# Block client API
|
|
self._connected_state.clear()
|
|
|
|
# stop an clean handler
|
|
#yield from self._handler.stop()
|
|
self._handler.detach()
|
|
self.session.transitions.disconnect()
|
|
|
|
if self.config.get('auto_reconnect', False):
|
|
# Try reconnection
|
|
self.logger.debug("Auto-reconnecting")
|
|
try:
|
|
yield from self.reconnect()
|
|
except ConnectException:
|
|
# Cancel client pending tasks
|
|
cancel_tasks()
|
|
else:
|
|
# Cancel client pending tasks
|
|
cancel_tasks()
|
|
|
|
def _initsession(
|
|
self,
|
|
uri=None,
|
|
cleansession=None,
|
|
cafile=None,
|
|
capath=None,
|
|
cadata=None) -> Session:
|
|
# Load config
|
|
broker_conf = self.config.get('broker', dict()).copy()
|
|
if uri:
|
|
broker_conf['uri'] = uri
|
|
if cafile:
|
|
broker_conf['cafile'] = cafile
|
|
elif 'cafile' not in broker_conf:
|
|
broker_conf['cafile'] = None
|
|
if capath:
|
|
broker_conf['capath'] = capath
|
|
elif 'capath' not in broker_conf:
|
|
broker_conf['capath'] = None
|
|
if cadata:
|
|
broker_conf['cadata'] = cadata
|
|
elif 'cadata' not in broker_conf:
|
|
broker_conf['cadata'] = None
|
|
|
|
if cleansession is not None:
|
|
broker_conf['cleansession'] = cleansession
|
|
|
|
for key in ['uri']:
|
|
if not_in_dict_or_none(broker_conf, key):
|
|
raise ClientException("Missing connection parameter '%s'" % key)
|
|
|
|
s = Session()
|
|
s.broker_uri = uri
|
|
s.client_id = self.client_id
|
|
s.cafile = broker_conf['cafile']
|
|
s.capath = broker_conf['capath']
|
|
s.cadata = broker_conf['cadata']
|
|
if cleansession is not None:
|
|
s.clean_session = cleansession
|
|
else:
|
|
s.clean_session = self.config.get('cleansession', True)
|
|
s.keep_alive = self.config['keep_alive'] - self.config['ping_delay']
|
|
if 'will' in self.config:
|
|
s.will_flag = True
|
|
s.will_retain = self.config['will']['retain']
|
|
s.will_topic = self.config['will']['topic']
|
|
s.will_message = self.config['will']['message']
|
|
s.will_qos = self.config['will']['qos']
|
|
else:
|
|
s.will_flag = False
|
|
s.will_retain = False
|
|
s.will_topic = None
|
|
s.will_message = None
|
|
return s
|