2015-07-07 20:48:53 +00:00
|
|
|
# Copyright (c) 2015 Nicolas JOUANIN
|
|
|
|
#
|
|
|
|
# See the file license.txt for copying permission.
|
|
|
|
import logging
|
2015-08-02 21:37:49 +00:00
|
|
|
import ssl
|
|
|
|
import websockets
|
2015-08-06 20:44:37 +00:00
|
|
|
import asyncio
|
2015-08-09 21:07:55 +00:00
|
|
|
from datetime import datetime
|
2015-07-07 20:48:53 +00:00
|
|
|
|
2015-08-06 20:44:37 +00:00
|
|
|
from functools import partial
|
2015-07-07 20:48:53 +00:00
|
|
|
from transitions import Machine, MachineError
|
2015-07-08 19:54:10 +00:00
|
|
|
from hbmqtt.session import Session
|
2015-07-12 20:35:56 +00:00
|
|
|
from hbmqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler
|
2015-07-10 20:55:22 +00:00
|
|
|
from hbmqtt.mqtt.connect import ConnectPacket
|
2015-08-01 20:16:39 +00:00
|
|
|
from hbmqtt.mqtt.connack import *
|
2015-07-10 20:55:22 +00:00
|
|
|
from hbmqtt.errors import HBMQTTException
|
|
|
|
from hbmqtt.utils import format_client_message, gen_client_id
|
2015-08-10 19:30:03 +00:00
|
|
|
from hbmqtt.mqtt.packet import PUBLISH
|
2015-08-09 21:07:55 +00:00
|
|
|
from hbmqtt.codecs import int_to_bytes_str
|
2015-08-02 21:37:49 +00:00
|
|
|
from hbmqtt.adapters import (
|
|
|
|
StreamReaderAdapter,
|
|
|
|
StreamWriterAdapter,
|
|
|
|
ReaderAdapter,
|
|
|
|
WriterAdapter,
|
|
|
|
WebSocketsReader,
|
|
|
|
WebSocketsWriter)
|
2015-07-07 20:48:53 +00:00
|
|
|
|
|
|
|
|
|
|
|
_defaults = {
|
2015-07-27 13:38:38 +00:00
|
|
|
'timeout-disconnect-delay': 2,
|
|
|
|
'publish-retry-delay': 5,
|
2015-07-07 20:48:53 +00:00
|
|
|
}
|
|
|
|
|
2015-08-09 21:07:55 +00:00
|
|
|
DOLLAR_SYS_ROOT = '$SYS/broker/'
|
|
|
|
STAT_BYTES_SENT = 'bytes_sent'
|
|
|
|
STAT_BYTES_RECEIVED = 'bytes_received'
|
|
|
|
STAT_MSG_SENT = 'messages_sent'
|
|
|
|
STAT_MSG_RECEIVED = 'messages_received'
|
2015-08-10 19:30:03 +00:00
|
|
|
STAT_PUBLISH_SENT = 'publish_sent'
|
|
|
|
STAT_PUBLISH_RECEIVED = 'publish_received'
|
2015-08-09 21:07:55 +00:00
|
|
|
STAT_UPTIME = 'uptime'
|
2015-08-10 19:30:03 +00:00
|
|
|
STAT_CLIENTS_MAXIMUM = 'clients_maximum'
|
2015-07-07 20:48:53 +00:00
|
|
|
|
|
|
|
class BrokerException(BaseException):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
2015-07-15 19:00:12 +00:00
|
|
|
class Subscription:
|
|
|
|
def __init__(self, session, qos):
|
|
|
|
self.session = session
|
|
|
|
self.qos = qos
|
|
|
|
|
2015-07-27 12:02:52 +00:00
|
|
|
def __repr__(self):
|
|
|
|
return type(self).__name__ + '(client_id={0}, qos={1!r})'.format(self.session.client_id, self.qos)
|
|
|
|
|
2015-07-15 19:00:12 +00:00
|
|
|
|
2015-07-12 20:35:56 +00:00
|
|
|
class RetainedApplicationMessage:
|
2015-07-15 19:00:12 +00:00
|
|
|
def __init__(self, source_session, topic, data, qos=None):
|
2015-07-12 20:35:56 +00:00
|
|
|
self.source_session = source_session
|
|
|
|
self.topic = topic
|
|
|
|
self.data = data
|
2015-07-14 12:19:46 +00:00
|
|
|
self.qos = qos
|
2015-07-12 20:35:56 +00:00
|
|
|
|
|
|
|
|
2015-08-06 20:44:37 +00:00
|
|
|
class Server:
|
|
|
|
def __init__(self, listener_name, server_instance, max_connections=-1, loop=None):
|
|
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
self.instance = server_instance
|
|
|
|
self.conn_count = 0
|
|
|
|
self.listener_name = listener_name
|
|
|
|
if loop is not None:
|
|
|
|
self._loop = loop
|
|
|
|
else:
|
|
|
|
self._loop = asyncio.get_event_loop()
|
|
|
|
|
|
|
|
self.max_connections = max_connections
|
|
|
|
if self.max_connections > 0:
|
|
|
|
self.semaphore = asyncio.Semaphore(self.max_connections, loop=self._loop)
|
|
|
|
else:
|
|
|
|
self.semaphore = None
|
|
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
|
def acquire_connection(self):
|
|
|
|
if self.semaphore:
|
|
|
|
yield from self.semaphore.acquire()
|
|
|
|
self.conn_count += 1
|
|
|
|
if self.max_connections > 0:
|
|
|
|
self.logger.debug("Listener '%s': %d/%d connections acquired" %
|
|
|
|
(self.listener_name, self.conn_count, self.max_connections))
|
|
|
|
|
|
|
|
def release_connection(self):
|
|
|
|
if self.semaphore:
|
|
|
|
self.semaphore.release()
|
|
|
|
self.conn_count -= 1
|
|
|
|
if self.max_connections > 0:
|
|
|
|
self.logger.debug("Listener '%s': %d/%d connections acquired" %
|
|
|
|
(self.listener_name, self.conn_count, self.max_connections))
|
|
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
|
def close_instance(self):
|
|
|
|
if self.instance:
|
|
|
|
self.instance.close()
|
|
|
|
yield self.instance.wait_closed()
|
|
|
|
|
|
|
|
|
2015-07-07 20:48:53 +00:00
|
|
|
class Broker:
|
|
|
|
states = ['new', 'starting', 'started', 'not_started', 'stopping', 'stopped', 'not_stopped', 'stopped']
|
|
|
|
|
|
|
|
def __init__(self, config=None, loop=None):
|
2015-07-27 21:11:54 +00:00
|
|
|
"""
|
|
|
|
|
|
|
|
:param config: Example Yaml config
|
|
|
|
listeners:
|
|
|
|
- default: #Mandatory
|
|
|
|
max-connections: 50000
|
|
|
|
type: tcp
|
|
|
|
- my-tcp-1:
|
|
|
|
bind: 127.0.0.1:1883
|
|
|
|
- my-tcp-2:
|
|
|
|
bind: 1.2.3.4:1883
|
|
|
|
max-connections: 1000
|
|
|
|
- my-tcp-ssl-1:
|
2015-08-02 21:37:49 +00:00
|
|
|
bind: 127.0.0.1:8883
|
2015-07-27 21:11:54 +00:00
|
|
|
ssl: on
|
|
|
|
cafile: /some/cafile
|
2015-08-02 21:37:49 +00:00
|
|
|
capath: /some/folder
|
|
|
|
capath: certificate data
|
2015-07-27 21:11:54 +00:00
|
|
|
certfile: /some/certfile
|
2015-08-02 21:37:49 +00:00
|
|
|
keyfile: /some/key
|
2015-07-27 21:11:54 +00:00
|
|
|
- my-ws-1:
|
|
|
|
bind: 0.0.0.0:8080
|
|
|
|
type: ws
|
|
|
|
timeout-disconnect-delay: 2
|
|
|
|
publish-retry-delay: 5
|
|
|
|
|
|
|
|
:param loop:
|
|
|
|
:return:
|
|
|
|
"""
|
2015-07-07 20:48:53 +00:00
|
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
self.config = _defaults
|
|
|
|
if config is not None:
|
|
|
|
self.config.update(config)
|
2015-08-02 21:37:49 +00:00
|
|
|
self._build_listeners_config(self.config)
|
2015-07-07 20:48:53 +00:00
|
|
|
|
|
|
|
if loop is not None:
|
|
|
|
self._loop = loop
|
|
|
|
else:
|
|
|
|
self._loop = asyncio.get_event_loop()
|
|
|
|
|
2015-08-06 20:44:37 +00:00
|
|
|
self._servers = dict()
|
2015-07-07 20:48:53 +00:00
|
|
|
self._init_states()
|
2015-07-10 20:55:22 +00:00
|
|
|
self._sessions = dict()
|
2015-07-15 19:00:12 +00:00
|
|
|
self._subscriptions = dict()
|
|
|
|
self._global_retained_messages = dict()
|
2015-07-07 20:48:53 +00:00
|
|
|
|
2015-08-09 21:07:55 +00:00
|
|
|
# Broker statistics initialization
|
|
|
|
self._stats = dict()
|
|
|
|
|
|
|
|
# $SYS tree task handle
|
|
|
|
self.sys_handle = None
|
|
|
|
|
2015-07-27 21:11:54 +00:00
|
|
|
def _build_listeners_config(self, broker_config):
|
2015-08-02 21:37:49 +00:00
|
|
|
self.listeners_config = dict()
|
2015-07-27 21:11:54 +00:00
|
|
|
try:
|
|
|
|
listeners_config = broker_config['listeners']
|
2015-08-02 21:37:49 +00:00
|
|
|
defaults = listeners_config['default']
|
|
|
|
for listener in listeners_config:
|
|
|
|
if listener != 'default':
|
|
|
|
config = dict(defaults)
|
|
|
|
config.update(listeners_config[listener])
|
|
|
|
self.listeners_config[listener] = config
|
|
|
|
except KeyError as ke:
|
|
|
|
raise BrokerException("Listener config not found invalid: %s" % ke)
|
2015-07-27 21:11:54 +00:00
|
|
|
|
2015-07-07 20:48:53 +00:00
|
|
|
def _init_states(self):
|
2015-08-11 19:07:38 +00:00
|
|
|
self.transitions = Machine(states=Broker.states, initial='new')
|
|
|
|
self.transitions.add_transition(trigger='start', source='new', dest='starting')
|
|
|
|
self.transitions.add_transition(trigger='starting_fail', source='starting', dest='not_started')
|
|
|
|
self.transitions.add_transition(trigger='starting_success', source='starting', dest='started')
|
|
|
|
self.transitions.add_transition(trigger='shutdown', source='started', dest='stopping')
|
|
|
|
self.transitions.add_transition(trigger='stopping_success', source='stopping', dest='stopped')
|
|
|
|
self.transitions.add_transition(trigger='stopping_failure', source='stopping', dest='not_stopped')
|
|
|
|
self.transitions.add_transition(trigger='start', source='stopped', dest='starting')
|
2015-07-07 20:48:53 +00:00
|
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
|
def start(self):
|
|
|
|
try:
|
2015-08-11 19:07:38 +00:00
|
|
|
self.transitions.start()
|
2015-07-07 20:48:53 +00:00
|
|
|
self.logger.debug("Broker starting")
|
|
|
|
except MachineError as me:
|
|
|
|
self.logger.debug("Invalid method call at this moment: %s" % me)
|
|
|
|
raise BrokerException("Broker instance can't be started: %s" % me)
|
|
|
|
|
2015-08-09 21:07:55 +00:00
|
|
|
# Clear broker stats
|
|
|
|
self._clear_stats()
|
2015-07-07 20:48:53 +00:00
|
|
|
try:
|
2015-08-09 21:07:55 +00:00
|
|
|
# Start network listeners
|
2015-08-02 21:37:49 +00:00
|
|
|
for listener_name in self.listeners_config:
|
|
|
|
listener = self.listeners_config[listener_name]
|
|
|
|
self.logger.info("Binding listener '%s' to %s" % (listener_name, listener['bind']))
|
|
|
|
|
2015-08-09 21:07:55 +00:00
|
|
|
# Max connections
|
2015-08-06 20:44:37 +00:00
|
|
|
try:
|
|
|
|
max_connections = listener['max_connections']
|
|
|
|
except KeyError:
|
|
|
|
max_connections = -1
|
|
|
|
|
2015-08-02 21:37:49 +00:00
|
|
|
# SSL Context
|
|
|
|
sc = None
|
|
|
|
if 'ssl' in listener and listener['ssl'].upper() == 'ON':
|
|
|
|
try:
|
|
|
|
sc = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
|
|
|
sc.load_cert_chain(listener['certfile'], listener['keyfile'])
|
|
|
|
sc.verify_mode = ssl.CERT_OPTIONAL
|
|
|
|
except KeyError as ke:
|
|
|
|
raise BrokerException("'certfile' or 'keyfile' configuration parameter missing: %s" % ke)
|
|
|
|
except FileNotFoundError as fnfe:
|
|
|
|
raise BrokerException("Can't read cert files '%s' or '%s' : %s" %
|
2015-08-09 21:07:55 +00:00
|
|
|
(listener['certfile'], listener['keyfile'], fnfe))
|
2015-08-02 21:37:49 +00:00
|
|
|
|
|
|
|
if listener['type'] == 'tcp':
|
|
|
|
address, port = listener['bind'].split(':')
|
2015-08-06 20:44:37 +00:00
|
|
|
cb_partial = partial(self.stream_connected, listener_name=listener_name)
|
|
|
|
instance = yield from asyncio.start_server(cb_partial,
|
2015-08-09 21:07:55 +00:00
|
|
|
address,
|
|
|
|
port,
|
|
|
|
ssl=sc,
|
|
|
|
loop=self._loop)
|
2015-08-06 20:44:37 +00:00
|
|
|
self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)
|
2015-08-02 21:37:49 +00:00
|
|
|
elif listener['type'] == 'ws':
|
|
|
|
address, port = listener['bind'].split(':')
|
2015-08-06 20:44:37 +00:00
|
|
|
cb_partial = partial(self.ws_connected, listener_name=listener_name)
|
|
|
|
instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop)
|
|
|
|
self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)
|
2015-08-09 21:07:55 +00:00
|
|
|
|
|
|
|
# Start $SYS topics management
|
|
|
|
try:
|
2015-08-13 19:30:09 +00:00
|
|
|
sys_interval = int(self.config.get('sys_interval', 0))
|
|
|
|
if sys_interval > 0:
|
2015-08-09 21:07:55 +00:00
|
|
|
self.logger.debug("Setup $SYS broadcasting every %d secondes" % sys_interval)
|
2015-08-13 19:30:09 +00:00
|
|
|
self._init_dollar_sys()
|
2015-08-09 21:07:55 +00:00
|
|
|
self.sys_handle = self._loop.call_later(sys_interval, self.broadcast_dollar_sys_topics)
|
|
|
|
except KeyError:
|
|
|
|
pass
|
|
|
|
# 'sys_internal' config parameter not found
|
|
|
|
|
2015-08-11 19:07:38 +00:00
|
|
|
self.transitions.starting_success()
|
2015-08-02 21:37:49 +00:00
|
|
|
self.logger.debug("Broker started")
|
2015-07-07 20:48:53 +00:00
|
|
|
except Exception as e:
|
|
|
|
self.logger.error("Broker startup failed: %s" % e)
|
2015-08-11 19:07:38 +00:00
|
|
|
self.transitions.starting_fail()
|
2015-07-07 20:48:53 +00:00
|
|
|
raise BrokerException("Broker instance can't be started: %s" % e)
|
|
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
|
def shutdown(self):
|
|
|
|
try:
|
2015-08-11 19:07:38 +00:00
|
|
|
self.transitions.shutdown()
|
2015-07-07 20:48:53 +00:00
|
|
|
except MachineError as me:
|
|
|
|
self.logger.debug("Invalid method call at this moment: %s" % me)
|
|
|
|
raise BrokerException("Broker instance can't be stopped: %s" % me)
|
2015-08-09 21:07:55 +00:00
|
|
|
|
|
|
|
# Stop $SYS topics broadcasting
|
|
|
|
if self.sys_handle:
|
|
|
|
self.sys_handle.cancel()
|
|
|
|
|
2015-08-06 20:44:37 +00:00
|
|
|
for listener_name in self._servers:
|
|
|
|
server = self._servers[listener_name]
|
|
|
|
yield from server.close_instance()
|
2015-07-07 20:48:53 +00:00
|
|
|
self.logger.debug("Broker closing")
|
|
|
|
self.logger.info("Broker closed")
|
2015-08-11 19:07:38 +00:00
|
|
|
self.transitions.stopping_success()
|
2015-07-07 20:48:53 +00:00
|
|
|
|
2015-08-09 21:07:55 +00:00
|
|
|
def _clear_stats(self):
|
|
|
|
"""
|
|
|
|
Initializes broker statistics data structures
|
|
|
|
"""
|
|
|
|
for stat in (STAT_BYTES_RECEIVED,
|
|
|
|
STAT_BYTES_SENT,
|
|
|
|
STAT_MSG_RECEIVED,
|
|
|
|
STAT_MSG_SENT,
|
2015-08-10 19:30:03 +00:00
|
|
|
STAT_CLIENTS_MAXIMUM,
|
|
|
|
STAT_PUBLISH_RECEIVED,
|
|
|
|
STAT_PUBLISH_SENT):
|
2015-08-09 21:07:55 +00:00
|
|
|
self._stats[stat] = 0
|
|
|
|
self._stats[STAT_UPTIME] = datetime.now()
|
|
|
|
|
|
|
|
def _init_dollar_sys(self):
|
|
|
|
"""
|
|
|
|
Initializes and publish $SYS static topics
|
|
|
|
"""
|
|
|
|
from hbmqtt.version import get_version
|
|
|
|
self.sys_handle = None
|
|
|
|
version = 'HBMQTT version ' + get_version()
|
|
|
|
self.retain_message(None, DOLLAR_SYS_ROOT + 'version', version.encode())
|
|
|
|
|
|
|
|
def broadcast_dollar_sys_topics(self):
|
|
|
|
"""
|
|
|
|
Broadcast dynamic $SYS topics updates and reschedule next execution depending on 'sys_interval' config
|
|
|
|
parameter.
|
|
|
|
"""
|
|
|
|
|
|
|
|
# Update stats
|
|
|
|
uptime = datetime.now() - self._stats[STAT_UPTIME]
|
2015-08-12 18:21:06 +00:00
|
|
|
client_connected = sum(1 for k, session in self._sessions.items() if session.transitions.state == 'connected')
|
2015-08-10 19:30:03 +00:00
|
|
|
if client_connected > self._stats[STAT_CLIENTS_MAXIMUM]:
|
|
|
|
self._stats[STAT_CLIENTS_MAXIMUM] = client_connected
|
2015-08-12 18:21:06 +00:00
|
|
|
client_disconnected = sum(1 for k, session in self._sessions.items() if session.transitions.state == 'disconnected')
|
2015-08-10 19:30:03 +00:00
|
|
|
inflight_in = 0
|
|
|
|
inflight_out = 0
|
|
|
|
messages_stored = 0
|
|
|
|
for k, session in self._sessions.items():
|
|
|
|
inflight_in += session.inflight_in_count
|
|
|
|
inflight_out += session.inflight_out_count
|
|
|
|
messages_stored += session.retained_messages_count
|
|
|
|
messages_stored += len(self._global_retained_messages)
|
|
|
|
subscriptions_count = 0
|
|
|
|
for topic in self._subscriptions:
|
|
|
|
subscriptions_count += len(self._subscriptions[topic])
|
|
|
|
|
2015-08-09 21:07:55 +00:00
|
|
|
# Broadcast updates
|
|
|
|
tasks = [
|
|
|
|
self._broadcast_sys_topic('load/bytes/received', int_to_bytes_str(self._stats[STAT_BYTES_RECEIVED])),
|
|
|
|
self._broadcast_sys_topic('load/bytes/sent', int_to_bytes_str(self._stats[STAT_BYTES_SENT])),
|
|
|
|
self._broadcast_sys_topic('messages/received', int_to_bytes_str(self._stats[STAT_MSG_RECEIVED])),
|
|
|
|
self._broadcast_sys_topic('messages/sent', int_to_bytes_str(self._stats[STAT_MSG_SENT])),
|
|
|
|
self._broadcast_sys_topic('time', str(datetime.now()).encode('utf-8')),
|
|
|
|
self._broadcast_sys_topic('uptime', int_to_bytes_str(int(uptime.total_seconds()))),
|
|
|
|
self._broadcast_sys_topic('uptime/formated', str(uptime).encode('utf-8')),
|
|
|
|
self._broadcast_sys_topic('clients/connected', int_to_bytes_str(client_connected)),
|
|
|
|
self._broadcast_sys_topic('clients/disconnected', int_to_bytes_str(client_disconnected)),
|
2015-08-10 19:30:03 +00:00
|
|
|
self._broadcast_sys_topic('clients/maximum', int_to_bytes_str(self._stats[STAT_CLIENTS_MAXIMUM])),
|
|
|
|
self._broadcast_sys_topic('clients/total', int_to_bytes_str(client_connected + client_disconnected)),
|
|
|
|
self._broadcast_sys_topic('messages/inflight', int_to_bytes_str(inflight_in + inflight_out)),
|
|
|
|
self._broadcast_sys_topic('messages/inflight/in', int_to_bytes_str(inflight_in)),
|
|
|
|
self._broadcast_sys_topic('messages/inflight/out', int_to_bytes_str(inflight_out)),
|
|
|
|
self._broadcast_sys_topic('messages/inflight/stored', int_to_bytes_str(messages_stored)),
|
|
|
|
self._broadcast_sys_topic('messages/publish/received', int_to_bytes_str(self._stats[STAT_PUBLISH_RECEIVED])),
|
|
|
|
self._broadcast_sys_topic('messages/publish/sent', int_to_bytes_str(self._stats[STAT_PUBLISH_SENT])),
|
|
|
|
self._broadcast_sys_topic('messages/retained/count', int_to_bytes_str(len(self._global_retained_messages))),
|
|
|
|
self._broadcast_sys_topic('messages/subscriptions/count', int_to_bytes_str(subscriptions_count)),
|
2015-08-09 21:07:55 +00:00
|
|
|
]
|
|
|
|
|
|
|
|
# Wait until broadcasting tasks end
|
|
|
|
if len(tasks) > 0:
|
|
|
|
asyncio.wait(tasks)
|
|
|
|
# Reschedule
|
|
|
|
sys_interval = int(self.config['sys_interval'])
|
|
|
|
self.logger.debug("Broadcasting $SYS topics")
|
|
|
|
self.sys_handle = self._loop.call_later(sys_interval, self.broadcast_dollar_sys_topics)
|
|
|
|
|
|
|
|
def _broadcast_sys_topic(self, topic_basename, data):
|
|
|
|
return self._internal_message_broadcast(DOLLAR_SYS_ROOT + topic_basename, data)
|
|
|
|
|
|
|
|
def _internal_message_broadcast(self, topic, data):
|
|
|
|
return asyncio.Task(self.broadcast_application_message(None, topic, data))
|
|
|
|
|
2015-08-02 21:37:49 +00:00
|
|
|
@asyncio.coroutine
|
2015-08-06 20:44:37 +00:00
|
|
|
def ws_connected(self, websocket, uri, listener_name):
|
|
|
|
yield from self.client_connected(listener_name, WebSocketsReader(websocket), WebSocketsWriter(websocket))
|
2015-08-02 21:37:49 +00:00
|
|
|
|
2015-07-07 20:48:53 +00:00
|
|
|
@asyncio.coroutine
|
2015-08-06 20:44:37 +00:00
|
|
|
def stream_connected(self, reader, writer, listener_name):
|
|
|
|
yield from self.client_connected(listener_name, StreamReaderAdapter(reader), StreamWriterAdapter(writer))
|
2015-08-01 20:16:39 +00:00
|
|
|
|
|
|
|
@asyncio.coroutine
|
2015-08-06 20:44:37 +00:00
|
|
|
def client_connected(self, listener_name, reader: ReaderAdapter, writer: WriterAdapter):
|
|
|
|
# Wait for connection available
|
|
|
|
server = self._servers[listener_name]
|
|
|
|
yield from server.acquire_connection()
|
|
|
|
|
2015-08-01 20:16:39 +00:00
|
|
|
remote_address, remote_port = writer.get_peer_info()
|
2015-08-06 20:44:37 +00:00
|
|
|
self.logger.debug("Connection from %s:%d on listener '%s'" % (remote_address, remote_port, listener_name))
|
2015-07-10 20:55:22 +00:00
|
|
|
|
|
|
|
# Wait for first packet and expect a CONNECT
|
|
|
|
connect = None
|
|
|
|
try:
|
|
|
|
connect = yield from ConnectPacket.from_stream(reader)
|
2015-07-12 20:35:56 +00:00
|
|
|
self.logger.debug(" <-in-- " + repr(connect))
|
2015-07-10 20:55:22 +00:00
|
|
|
self.check_connect(connect)
|
|
|
|
except HBMQTTException as exc:
|
2015-07-11 18:52:34 +00:00
|
|
|
self.logger.warn("[MQTT-3.1.0-1] %s: Can't read first packet an CONNECT: %s" %
|
|
|
|
(format_client_message(address=remote_address, port=remote_port), exc))
|
2015-08-01 20:16:39 +00:00
|
|
|
yield from writer.close()
|
2015-07-12 20:35:56 +00:00
|
|
|
self.logger.debug("Connection closed")
|
2015-07-10 20:55:22 +00:00
|
|
|
return
|
|
|
|
except BrokerException as be:
|
2015-07-11 18:52:34 +00:00
|
|
|
self.logger.error('Invalid connection from %s : %s' %
|
|
|
|
(format_client_message(address=remote_address, port=remote_port), be))
|
2015-08-01 20:16:39 +00:00
|
|
|
yield from writer.close()
|
2015-07-12 20:35:56 +00:00
|
|
|
self.logger.debug("Connection closed")
|
2015-07-10 20:55:22 +00:00
|
|
|
return
|
|
|
|
|
2015-07-11 18:52:34 +00:00
|
|
|
connack = None
|
2015-07-10 20:55:22 +00:00
|
|
|
if connect.variable_header.proto_level != 4:
|
|
|
|
# only MQTT 3.1.1 supported
|
2015-07-11 18:52:34 +00:00
|
|
|
self.logger.error('Invalid protocol from %s: %d' %
|
|
|
|
(format_client_message(address=remote_address, port=remote_port),
|
|
|
|
connect.variable_header.protocol_level))
|
2015-08-01 20:16:39 +00:00
|
|
|
connack = ConnackPacket.build(0, UNACCEPTABLE_PROTOCOL_VERSION) # [MQTT-3.2.2-4] session_parent=0
|
2015-07-11 18:52:34 +00:00
|
|
|
elif connect.variable_header.username_flag and connect.payload.username is None:
|
|
|
|
self.logger.error('Invalid username from %s' %
|
|
|
|
(format_client_message(address=remote_address, port=remote_port)))
|
2015-08-01 20:16:39 +00:00
|
|
|
connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.2.2-4] session_parent=0
|
2015-07-11 18:52:34 +00:00
|
|
|
elif connect.variable_header.password_flag and connect.payload.password is None:
|
2015-07-10 20:55:22 +00:00
|
|
|
self.logger.error('Invalid password %s' % (format_client_message(address=remote_address, port=remote_port)))
|
2015-08-01 20:16:39 +00:00
|
|
|
connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.2.2-4] session_parent=0
|
2015-08-09 15:48:47 +00:00
|
|
|
elif connect.variable_header.clean_session_flag is False and connect.payload.client_id is None:
|
2015-07-11 18:52:34 +00:00
|
|
|
self.logger.error('[MQTT-3.1.3-8] [MQTT-3.1.3-9] %s: No client Id provided (cleansession=0)' %
|
|
|
|
format_client_message(address=remote_address, port=remote_port))
|
2015-08-01 20:16:39 +00:00
|
|
|
connack = ConnackPacket.build(0, IDENTIFIER_REJECTED)
|
2015-07-11 18:52:34 +00:00
|
|
|
self.logger.debug(" -out-> " + repr(connack))
|
|
|
|
if connack is not None:
|
2015-07-10 20:55:22 +00:00
|
|
|
self.logger.debug(" -out-> " + repr(connack))
|
|
|
|
yield from connack.to_stream(writer)
|
2015-08-01 20:16:39 +00:00
|
|
|
yield from writer.close()
|
2015-07-10 20:55:22 +00:00
|
|
|
return
|
|
|
|
|
2015-07-12 20:35:56 +00:00
|
|
|
client_session = None
|
2015-07-14 12:19:46 +00:00
|
|
|
self.logger.debug("Clean session={0}".format(connect.variable_header.clean_session_flag))
|
|
|
|
self.logger.debug("known sessions={0}".format(self._sessions))
|
2015-08-09 21:00:30 +00:00
|
|
|
client_id = connect.payload.client_id
|
2015-07-10 20:55:22 +00:00
|
|
|
if connect.variable_header.clean_session_flag:
|
2015-08-09 21:00:30 +00:00
|
|
|
# Delete existing session and create a new one
|
2015-07-27 12:02:52 +00:00
|
|
|
if client_id is not None:
|
|
|
|
self.delete_session(client_id)
|
2015-07-12 20:35:56 +00:00
|
|
|
client_session = Session()
|
|
|
|
client_session.parent = 0
|
2015-08-09 21:00:30 +00:00
|
|
|
client_session.client_id = client_id
|
2015-07-12 20:35:56 +00:00
|
|
|
self._sessions[client_id] = client_session
|
2015-07-10 20:55:22 +00:00
|
|
|
else:
|
|
|
|
# Get session from cache
|
|
|
|
if client_id in self._sessions:
|
2015-07-14 12:19:46 +00:00
|
|
|
self.logger.debug("Found old session %s" % repr(self._sessions[client_id]))
|
2015-07-12 20:35:56 +00:00
|
|
|
client_session = self._sessions[client_id]
|
|
|
|
client_session.parent = 1
|
2015-07-10 20:55:22 +00:00
|
|
|
else:
|
2015-07-12 20:35:56 +00:00
|
|
|
client_session = Session()
|
2015-08-09 21:00:30 +00:00
|
|
|
client_session.client_id = client_id
|
|
|
|
self._sessions[client_id] = client_session
|
2015-07-12 20:35:56 +00:00
|
|
|
client_session.parent = 0
|
2015-07-10 20:55:22 +00:00
|
|
|
|
2015-07-12 20:35:56 +00:00
|
|
|
if client_session.client_id is None:
|
2015-07-10 20:55:22 +00:00
|
|
|
# Generate client ID
|
2015-07-12 20:35:56 +00:00
|
|
|
client_session.client_id = gen_client_id()
|
|
|
|
client_session.remote_address = remote_address
|
|
|
|
client_session.remote_port = remote_port
|
|
|
|
client_session.clean_session = connect.variable_header.clean_session_flag
|
|
|
|
client_session.will_flag = connect.variable_header.will_flag
|
|
|
|
client_session.will_retain = connect.variable_header.will_retain_flag
|
|
|
|
client_session.will_qos = connect.variable_header.will_qos
|
|
|
|
client_session.will_topic = connect.payload.will_topic
|
|
|
|
client_session.will_message = connect.payload.will_message
|
|
|
|
client_session.username = connect.payload.username
|
|
|
|
client_session.password = connect.payload.password
|
|
|
|
client_session.client_id = connect.payload.client_id
|
|
|
|
if connect.variable_header.keep_alive > 0:
|
|
|
|
client_session.keep_alive = connect.variable_header.keep_alive + self.config['timeout-disconnect-delay']
|
|
|
|
else:
|
|
|
|
client_session.keep_alive = 0
|
2015-07-27 13:38:38 +00:00
|
|
|
client_session.publish_retry_delay = self.config['publish-retry-delay']
|
2015-07-12 20:35:56 +00:00
|
|
|
|
|
|
|
client_session.reader = reader
|
|
|
|
client_session.writer = writer
|
|
|
|
|
|
|
|
if self.authenticate(client_session):
|
2015-08-01 20:16:39 +00:00
|
|
|
connack = ConnackPacket.build(client_session.parent, CONNECTION_ACCEPTED)
|
2015-07-12 20:35:56 +00:00
|
|
|
self.logger.info('%s : connection accepted' % format_client_message(session=client_session))
|
2015-07-10 20:55:22 +00:00
|
|
|
self.logger.debug(" -out-> " + repr(connack))
|
|
|
|
yield from connack.to_stream(writer)
|
|
|
|
else:
|
2015-08-01 20:16:39 +00:00
|
|
|
connack = ConnackPacket.build(client_session.parent, NOT_AUTHORIZED)
|
2015-07-12 20:35:56 +00:00
|
|
|
self.logger.info('%s : connection refused' % format_client_message(session=client_session))
|
2015-07-10 20:55:22 +00:00
|
|
|
self.logger.debug(" -out-> " + repr(connack))
|
|
|
|
yield from connack.to_stream(writer)
|
2015-08-01 20:16:39 +00:00
|
|
|
yield from writer.close()
|
2015-07-10 20:55:22 +00:00
|
|
|
return
|
|
|
|
|
2015-08-11 19:07:38 +00:00
|
|
|
client_session.transitions.connect()
|
2015-08-09 21:07:55 +00:00
|
|
|
handler = self._init_handler(reader, writer, client_session)
|
2015-07-24 19:47:05 +00:00
|
|
|
self.logger.debug("%s Start messages handling" % client_session.client_id)
|
2015-07-08 20:47:33 +00:00
|
|
|
yield from handler.start()
|
2015-07-27 12:02:52 +00:00
|
|
|
self.logger.debug("Retained messages queue size: %d" % client_session.retained_messages.qsize())
|
2015-07-12 20:35:56 +00:00
|
|
|
yield from self.publish_session_retained_messages(client_session)
|
2015-07-24 19:47:05 +00:00
|
|
|
self.logger.debug("%s Wait for disconnect" % client_session.client_id)
|
2015-07-11 20:22:33 +00:00
|
|
|
|
|
|
|
connected = True
|
|
|
|
wait_disconnect = asyncio.Task(handler.wait_disconnect())
|
|
|
|
wait_subscription = asyncio.Task(handler.get_next_pending_subscription())
|
2015-07-13 20:07:12 +00:00
|
|
|
wait_unsubscription = asyncio.Task(handler.get_next_pending_unsubscription())
|
2015-07-12 20:35:56 +00:00
|
|
|
wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message())
|
2015-07-11 20:22:33 +00:00
|
|
|
while connected:
|
2015-07-13 20:07:12 +00:00
|
|
|
done, pending = yield from asyncio.wait(
|
|
|
|
[wait_disconnect, wait_subscription, wait_unsubscription, wait_deliver],
|
|
|
|
return_when=asyncio.FIRST_COMPLETED)
|
2015-07-11 20:22:33 +00:00
|
|
|
if wait_disconnect in done:
|
2015-07-26 20:53:11 +00:00
|
|
|
result = wait_disconnect.result()
|
|
|
|
self.logger.debug("%s Result from wait_diconnect: %s" % (client_session.client_id, result))
|
|
|
|
if result is None:
|
|
|
|
self.logger.debug("Will flag: %s" % client_session.will_flag)
|
2015-08-09 21:07:55 +00:00
|
|
|
# Connection closed anormally, send will message
|
2015-07-26 20:53:11 +00:00
|
|
|
if client_session.will_flag:
|
|
|
|
self.logger.debug("Client %s disconnected abnormally, sending will message" %
|
|
|
|
format_client_message(client_session))
|
|
|
|
yield from self.broadcast_application_message(
|
|
|
|
client_session, client_session.will_topic,
|
|
|
|
client_session.will_message,
|
|
|
|
client_session.will_qos)
|
|
|
|
if client_session.will_retain:
|
|
|
|
self.retain_message(client_session,
|
|
|
|
client_session.will_topic,
|
|
|
|
client_session.will_message,
|
|
|
|
client_session.will_qos)
|
|
|
|
connected = False
|
2015-07-14 12:19:46 +00:00
|
|
|
if wait_unsubscription in done:
|
2015-07-25 19:13:17 +00:00
|
|
|
self.logger.debug("%s handling unsubscription" % client_session.client_id)
|
2015-07-13 20:07:12 +00:00
|
|
|
unsubscription = wait_unsubscription.result()
|
2015-07-15 19:00:12 +00:00
|
|
|
for topic in unsubscription['topics']:
|
2015-07-13 20:07:12 +00:00
|
|
|
self.del_subscription(topic, client_session)
|
2015-07-15 19:00:12 +00:00
|
|
|
yield from handler.mqtt_acknowledge_unsubscription(unsubscription['packet_id'])
|
2015-07-13 20:07:12 +00:00
|
|
|
wait_unsubscription = asyncio.Task(handler.get_next_pending_unsubscription())
|
2015-07-14 12:19:46 +00:00
|
|
|
if wait_subscription in done:
|
2015-07-25 19:13:17 +00:00
|
|
|
self.logger.debug("%s handling subscription" % client_session.client_id)
|
2015-07-15 19:00:12 +00:00
|
|
|
subscriptions = wait_subscription.result()
|
2015-07-11 20:22:33 +00:00
|
|
|
return_codes = []
|
2015-07-15 19:00:12 +00:00
|
|
|
for subscription in subscriptions['topics']:
|
|
|
|
return_codes.append(self.add_subscription(subscription, client_session))
|
|
|
|
yield from handler.mqtt_acknowledge_subscription(subscriptions['packet_id'], return_codes)
|
|
|
|
for index, subscription in enumerate(subscriptions['topics']):
|
2015-07-13 20:07:12 +00:00
|
|
|
if return_codes[index] != 0x80:
|
2015-07-15 19:00:12 +00:00
|
|
|
yield from self.publish_retained_messages_for_subscription(subscription, client_session)
|
2015-07-11 20:22:33 +00:00
|
|
|
wait_subscription = asyncio.Task(handler.get_next_pending_subscription())
|
2015-07-15 19:00:12 +00:00
|
|
|
self.logger.debug(repr(self._subscriptions))
|
2015-07-14 12:19:46 +00:00
|
|
|
if wait_deliver in done:
|
2015-07-25 19:13:17 +00:00
|
|
|
self.logger.debug("%s handling message delivery" % client_session.client_id)
|
2015-07-26 19:21:35 +00:00
|
|
|
publish_packet = wait_deliver.result()
|
|
|
|
packet_id = publish_packet.variable_header.packet_id
|
2015-07-12 20:35:56 +00:00
|
|
|
topic_name = publish_packet.variable_header.topic_name
|
|
|
|
data = publish_packet.payload.data
|
2015-07-15 21:02:36 +00:00
|
|
|
yield from self.broadcast_application_message(client_session, topic_name, data)
|
2015-07-12 20:35:56 +00:00
|
|
|
if publish_packet.retain_flag:
|
2015-07-17 20:06:49 +00:00
|
|
|
self.retain_message(client_session, topic_name, data)
|
2015-07-26 19:21:35 +00:00
|
|
|
# Acknowledge message delivery
|
|
|
|
yield from handler.mqtt_acknowledge_delivery(packet_id)
|
2015-07-12 20:35:56 +00:00
|
|
|
wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message())
|
2015-07-14 12:19:46 +00:00
|
|
|
wait_subscription.cancel()
|
|
|
|
wait_unsubscription.cancel()
|
|
|
|
wait_deliver.cancel()
|
2015-07-11 20:22:33 +00:00
|
|
|
|
2015-07-24 19:47:05 +00:00
|
|
|
self.logger.debug("%s Client disconnecting" % client_session.client_id)
|
2015-08-09 21:07:55 +00:00
|
|
|
yield from self._stop_handler(handler)
|
2015-08-11 19:07:38 +00:00
|
|
|
client_session.transitions.disconnect()
|
2015-08-09 21:07:55 +00:00
|
|
|
yield from writer.close()
|
|
|
|
self.logger.debug("%s Session disconnected" % client_session.client_id)
|
|
|
|
server.release_connection()
|
|
|
|
|
|
|
|
def _init_handler(self, reader, writer, session):
|
|
|
|
"""
|
|
|
|
Create a BrokerProtocolHandler and attach to a session
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
handler = BrokerProtocolHandler(reader, writer, self._loop)
|
|
|
|
handler.attach_to_session(session)
|
|
|
|
handler.on_packet_received.connect(self.sys_handle_packet_received)
|
|
|
|
handler.on_packet_sent.connect(self.sys_handle_packet_sent)
|
|
|
|
return handler
|
|
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
|
def _stop_handler(self, handler):
|
|
|
|
"""
|
|
|
|
Stop a running handler and detach if from the session
|
|
|
|
:param handler:
|
|
|
|
:return:
|
|
|
|
"""
|
2015-07-12 20:35:56 +00:00
|
|
|
try:
|
|
|
|
yield from handler.stop()
|
|
|
|
except Exception as e:
|
|
|
|
self.logger.error(e)
|
|
|
|
finally:
|
|
|
|
handler.detach_from_session()
|
2015-07-10 20:55:22 +00:00
|
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
|
def check_connect(self, connect: ConnectPacket):
|
|
|
|
if connect.payload.client_id is None:
|
|
|
|
raise BrokerException('[[MQTT-3.1.3-3]] : Client identifier must be present' )
|
|
|
|
|
|
|
|
if connect.variable_header.will_flag:
|
|
|
|
if connect.payload.will_topic is None or connect.payload.will_message is None:
|
|
|
|
raise BrokerException('will flag set, but will topic/message not present in payload')
|
|
|
|
|
|
|
|
if connect.variable_header.reserved_flag:
|
|
|
|
raise BrokerException('[MQTT-3.1.2-3] CONNECT reserved flag must be set to 0')
|
|
|
|
|
|
|
|
def authenticate(self, session: Session):
|
|
|
|
# TODO : Handle client authentication here
|
2015-07-11 18:52:34 +00:00
|
|
|
return True
|
2015-07-11 20:22:33 +00:00
|
|
|
|
2015-07-17 20:06:49 +00:00
|
|
|
def retain_message(self, source_session, topic_name, data, qos=None):
|
|
|
|
if data is not None and data != b'':
|
|
|
|
# If retained flag set, store the message for further subscriptions
|
2015-08-09 21:07:55 +00:00
|
|
|
self.logger.debug("Retaining message on topic %s" % topic_name)
|
2015-07-17 20:06:49 +00:00
|
|
|
retained_message = RetainedApplicationMessage(source_session, topic_name, data, qos)
|
|
|
|
self._global_retained_messages[topic_name] = retained_message
|
|
|
|
else:
|
|
|
|
# [MQTT-3.3.1-10]
|
2015-08-09 21:07:55 +00:00
|
|
|
self.logger.debug("Clear retained messages for topic '%s'" % topic_name)
|
2015-07-17 20:06:49 +00:00
|
|
|
del self._global_retained_messages[topic_name]
|
|
|
|
|
2015-07-15 19:00:12 +00:00
|
|
|
def add_subscription(self, subscription, session):
|
2015-07-12 20:35:56 +00:00
|
|
|
import re
|
2015-07-15 21:02:36 +00:00
|
|
|
wildcard_pattern = re.compile('.*?/?\+/?.*?')
|
2015-07-11 20:22:33 +00:00
|
|
|
try:
|
2015-07-15 19:00:12 +00:00
|
|
|
a_filter = subscription['filter']
|
|
|
|
if '#' in a_filter and not a_filter.endswith('#'):
|
2015-07-12 20:35:56 +00:00
|
|
|
# [MQTT-4.7.1-2] Wildcard character '#' is only allowed as last character in filter
|
|
|
|
return 0x80
|
2015-07-15 21:02:36 +00:00
|
|
|
if '+' in a_filter and not wildcard_pattern.match(a_filter):
|
2015-07-12 20:35:56 +00:00
|
|
|
# [MQTT-4.7.1-3] + wildcard character must occupy entire level
|
|
|
|
return 0x80
|
|
|
|
|
2015-07-15 19:00:12 +00:00
|
|
|
qos = subscription['qos']
|
2015-07-11 20:22:33 +00:00
|
|
|
if 'max-qos' in self.config and qos > self.config['max-qos']:
|
|
|
|
qos = self.config['max-qos']
|
2015-07-15 19:00:12 +00:00
|
|
|
if a_filter not in self._subscriptions:
|
|
|
|
self._subscriptions[a_filter] = []
|
|
|
|
already_subscribed = next(
|
|
|
|
(s for s in self._subscriptions[a_filter] if s.session.client_id == session.client_id), None)
|
|
|
|
if not already_subscribed:
|
|
|
|
self._subscriptions[a_filter].append(Subscription(session, qos))
|
|
|
|
else:
|
|
|
|
self.logger.debug("Client %s has already subscribed to %s" % (format_client_message(session=session), a_filter))
|
2015-07-11 20:22:33 +00:00
|
|
|
return qos
|
|
|
|
except KeyError:
|
|
|
|
return 0x80
|
2015-07-12 20:35:56 +00:00
|
|
|
|
2015-07-13 20:07:12 +00:00
|
|
|
def del_subscription(self, a_filter, session):
|
|
|
|
try:
|
2015-07-15 19:00:12 +00:00
|
|
|
subscriptions = self._subscriptions[a_filter]
|
|
|
|
for index, subscription in enumerate(subscriptions):
|
|
|
|
if subscription.session.client_id == session.client_id:
|
2015-07-13 20:07:12 +00:00
|
|
|
self.logger.debug("Removing subscription on topic '%s' for client %s" %
|
|
|
|
(a_filter, format_client_message(session=session)))
|
2015-07-15 19:00:12 +00:00
|
|
|
subscriptions.pop(index)
|
2015-07-13 20:07:12 +00:00
|
|
|
except KeyError:
|
|
|
|
# Unsubscribe topic not found in current subscribed topics
|
|
|
|
pass
|
|
|
|
|
2015-08-09 15:48:47 +00:00
|
|
|
def matches(self, topic, a_filter):
|
2015-07-12 20:35:56 +00:00
|
|
|
import re
|
2015-08-09 15:48:47 +00:00
|
|
|
match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[\$\s\w\d]+'))
|
2015-07-12 20:35:56 +00:00
|
|
|
if match_pattern.match(topic):
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
|
|
|
|
@asyncio.coroutine
|
2015-07-15 21:02:36 +00:00
|
|
|
def broadcast_application_message(self, source_session, topic, data, force_qos=None):
|
2015-08-10 20:05:03 +00:00
|
|
|
#self.logger.debug("Broadcasting message from %s on topic %s" %
|
|
|
|
# (format_client_message(session=source_session), topic)
|
|
|
|
# )
|
2015-07-12 20:35:56 +00:00
|
|
|
publish_tasks = []
|
2015-07-15 21:02:36 +00:00
|
|
|
try:
|
|
|
|
for k_filter in self._subscriptions:
|
|
|
|
if self.matches(topic, k_filter):
|
|
|
|
subscriptions = self._subscriptions[k_filter]
|
|
|
|
for subscription in subscriptions:
|
|
|
|
target_session = subscription.session
|
|
|
|
qos = subscription.qos
|
|
|
|
if force_qos is not None:
|
|
|
|
qos = force_qos
|
|
|
|
if target_session.machine.state == 'connected':
|
|
|
|
self.logger.debug("broadcasting application message from %s on topic '%s' to %s" %
|
|
|
|
(format_client_message(session=source_session),
|
|
|
|
topic, format_client_message(session=target_session)))
|
|
|
|
handler = subscription.session.handler
|
|
|
|
publish_tasks.append(
|
2015-07-25 21:21:25 +00:00
|
|
|
asyncio.Task(handler.mqtt_publish(topic, data, qos, retain=False))
|
2015-07-15 21:02:36 +00:00
|
|
|
)
|
|
|
|
else:
|
|
|
|
self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" %
|
|
|
|
(format_client_message(session=source_session),
|
|
|
|
topic, format_client_message(session=target_session)))
|
|
|
|
retained_message = RetainedApplicationMessage(source_session, topic, data, qos)
|
|
|
|
publish_tasks.append(
|
|
|
|
asyncio.Task(target_session.retained_messages.put(retained_message))
|
|
|
|
)
|
2015-07-27 12:02:52 +00:00
|
|
|
|
2015-07-15 21:02:36 +00:00
|
|
|
if len(publish_tasks) > 0:
|
|
|
|
asyncio.wait(publish_tasks)
|
|
|
|
except Exception as e:
|
|
|
|
self.logger.warn("Message broadcasting failed: %s", e)
|
2015-08-10 20:05:03 +00:00
|
|
|
#self.logger.debug("End Broadcasting message from %s on topic %s" %
|
|
|
|
# (format_client_message(session=source_session), topic)
|
|
|
|
# )
|
2015-07-12 20:35:56 +00:00
|
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
|
def publish_session_retained_messages(self, session):
|
2015-07-15 19:00:12 +00:00
|
|
|
self.logger.debug("Publishing %d messages retained for session %s" %
|
|
|
|
(session.retained_messages.qsize(), format_client_message(session=session))
|
|
|
|
)
|
2015-07-14 12:19:46 +00:00
|
|
|
publish_tasks = []
|
2015-07-12 20:35:56 +00:00
|
|
|
while not session.retained_messages.empty():
|
|
|
|
retained = yield from session.retained_messages.get()
|
2015-07-14 12:19:46 +00:00
|
|
|
publish_tasks.append(asyncio.Task(
|
|
|
|
session.handler.mqtt_publish(
|
2015-07-26 20:53:11 +00:00
|
|
|
retained.topic, retained.data, retained.qos, True)))
|
2015-07-14 12:19:46 +00:00
|
|
|
if len(publish_tasks) > 0:
|
|
|
|
asyncio.wait(publish_tasks)
|
2015-07-12 20:35:56 +00:00
|
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
|
def publish_retained_messages_for_subscription(self, subscription, session):
|
|
|
|
self.logger.debug("Begin broadcasting messages retained due to subscription on '%s' from %s" %
|
|
|
|
(subscription['filter'], format_client_message(session=session)))
|
|
|
|
publish_tasks = []
|
|
|
|
for d_topic in self._global_retained_messages:
|
|
|
|
self.logger.debug("matching : %s %s" % (d_topic, subscription['filter']))
|
|
|
|
if self.matches(d_topic, subscription['filter']):
|
|
|
|
self.logger.debug("%s and %s match" % (d_topic, subscription['filter']))
|
|
|
|
retained = self._global_retained_messages[d_topic]
|
|
|
|
publish_tasks.append(asyncio.Task(
|
|
|
|
session.handler.mqtt_publish(
|
2015-07-25 21:21:25 +00:00
|
|
|
retained.topic, retained.data, subscription['qos'], True)))
|
2015-07-12 20:35:56 +00:00
|
|
|
if len(publish_tasks) > 0:
|
|
|
|
asyncio.wait(publish_tasks)
|
|
|
|
self.logger.debug("End broadcasting messages retained due to subscription on '%s' from %s" %
|
|
|
|
(subscription['filter'], format_client_message(session=session)))
|
2015-07-27 12:02:52 +00:00
|
|
|
|
|
|
|
def delete_session(self, client_id):
|
|
|
|
"""
|
|
|
|
Delete an existing session data, for example due to clean session set in CONNECT
|
|
|
|
:param client_id:
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
session = self._sessions[client_id]
|
|
|
|
except KeyError:
|
|
|
|
session = None
|
|
|
|
if session is None:
|
|
|
|
self.logger.warn("Delete session : session %s doesn't exist" % client_id)
|
|
|
|
return
|
|
|
|
|
|
|
|
# Delete subscriptions
|
|
|
|
self.logger.debug("deleting session %s subscriptions" % repr(session))
|
|
|
|
nb_sub = 0
|
2015-08-09 21:07:55 +00:00
|
|
|
for a_filter in self._subscriptions:
|
|
|
|
self.del_subscription(a_filter, session)
|
2015-07-27 12:02:52 +00:00
|
|
|
nb_sub += 1
|
|
|
|
self.logger.debug("%d subscriptions deleted" % nb_sub)
|
|
|
|
|
|
|
|
self.logger.debug("deleting existing session %s" % repr(self._sessions[client_id]))
|
|
|
|
del self._sessions[client_id]
|
2015-08-09 21:07:55 +00:00
|
|
|
|
|
|
|
def sys_handle_packet_received(self, packet):
|
|
|
|
if packet:
|
|
|
|
packet_size = packet.bytes_length
|
|
|
|
self._stats[STAT_BYTES_RECEIVED] += packet_size
|
|
|
|
self._stats[STAT_MSG_RECEIVED] += 1
|
2015-08-10 19:30:03 +00:00
|
|
|
if packet.fixed_header.packet_type == PUBLISH:
|
|
|
|
self._stats[STAT_PUBLISH_RECEIVED] += 1
|
2015-08-09 21:07:55 +00:00
|
|
|
|
|
|
|
def sys_handle_packet_sent(self, packet):
|
|
|
|
if packet:
|
|
|
|
packet_size = packet.bytes_length
|
|
|
|
self._stats[STAT_BYTES_SENT] += packet_size
|
|
|
|
self._stats[STAT_MSG_SENT] += 1
|
2015-08-10 19:30:03 +00:00
|
|
|
if packet.fixed_header.packet_type == PUBLISH:
|
|
|
|
self._stats[STAT_PUBLISH_SENT] += 1
|