diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index afb5643..150afa5 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -5,6 +5,7 @@ import logging import ssl import websockets import asyncio +from datetime import datetime from functools import partial from transitions import Machine, MachineError @@ -14,6 +15,8 @@ from hbmqtt.mqtt.connect import ConnectPacket from hbmqtt.mqtt.connack import * from hbmqtt.errors import HBMQTTException from hbmqtt.utils import format_client_message, gen_client_id +from hbmqtt.mqtt.packet import PUBLISH +from hbmqtt.codecs import int_to_bytes_str from hbmqtt.adapters import ( StreamReaderAdapter, StreamWriterAdapter, @@ -28,6 +31,15 @@ _defaults = { 'publish-retry-delay': 5, } +DOLLAR_SYS_ROOT = '$SYS/broker/' +STAT_BYTES_SENT = 'bytes_sent' +STAT_BYTES_RECEIVED = 'bytes_received' +STAT_MSG_SENT = 'messages_sent' +STAT_MSG_RECEIVED = 'messages_received' +STAT_PUBLISH_SENT = 'publish_sent' +STAT_PUBLISH_RECEIVED = 'publish_received' +STAT_UPTIME = 'uptime' +STAT_CLIENTS_MAXIMUM = 'clients_maximum' class BrokerException(BaseException): pass @@ -141,6 +153,12 @@ class Broker: self._subscriptions = dict() self._global_retained_messages = dict() + # Broker statistics initialization + self._stats = dict() + + # $SYS tree task handle + self.sys_handle = None + def _build_listeners_config(self, broker_config): self.listeners_config = dict() try: @@ -173,12 +191,15 @@ class Broker: self.logger.debug("Invalid method call at this moment: %s" % me) raise BrokerException("Broker instance can't be started: %s" % me) + # Clear broker stats + self._clear_stats() try: + # Start network listeners for listener_name in self.listeners_config: listener = self.listeners_config[listener_name] self.logger.info("Binding listener '%s' to %s" % (listener_name, listener['bind'])) - #Max connections + # Max connections try: max_connections = listener['max_connections'] except KeyError: @@ -195,22 +216,34 @@ class Broker: raise BrokerException("'certfile' or 'keyfile' configuration parameter missing: %s" % ke) except FileNotFoundError as fnfe: raise BrokerException("Can't read cert files '%s' or '%s' : %s" % - (listener['certfile'], listener['keyfile'], fnfe)) + (listener['certfile'], listener['keyfile'], fnfe)) if listener['type'] == 'tcp': address, port = listener['bind'].split(':') cb_partial = partial(self.stream_connected, listener_name=listener_name) instance = yield from asyncio.start_server(cb_partial, - address, - port, - ssl=sc, - loop=self._loop) + address, + port, + ssl=sc, + loop=self._loop) self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop) elif listener['type'] == 'ws': address, port = listener['bind'].split(':') cb_partial = partial(self.ws_connected, listener_name=listener_name) instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop) self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop) + + # Start $SYS topics management + self._init_dollar_sys() + try: + sys_interval = int(self.config['sys_interval']) + if sys_interval: + self.logger.debug("Setup $SYS broadcasting every %d secondes" % sys_interval) + self.sys_handle = self._loop.call_later(sys_interval, self.broadcast_dollar_sys_topics) + except KeyError: + pass + # 'sys_internal' config parameter not found + self.machine.starting_success() self.logger.debug("Broker started") except Exception as e: @@ -225,6 +258,11 @@ class Broker: except MachineError as me: self.logger.debug("Invalid method call at this moment: %s" % me) raise BrokerException("Broker instance can't be stopped: %s" % me) + + # Stop $SYS topics broadcasting + if self.sys_handle: + self.sys_handle.cancel() + for listener_name in self._servers: server = self._servers[listener_name] yield from server.close_instance() @@ -232,6 +270,90 @@ class Broker: self.logger.info("Broker closed") self.machine.stopping_success() + def _clear_stats(self): + """ + Initializes broker statistics data structures + """ + for stat in (STAT_BYTES_RECEIVED, + STAT_BYTES_SENT, + STAT_MSG_RECEIVED, + STAT_MSG_SENT, + STAT_CLIENTS_MAXIMUM, + STAT_PUBLISH_RECEIVED, + STAT_PUBLISH_SENT): + self._stats[stat] = 0 + self._stats[STAT_UPTIME] = datetime.now() + + def _init_dollar_sys(self): + """ + Initializes and publish $SYS static topics + """ + from hbmqtt.version import get_version + self.sys_handle = None + version = 'HBMQTT version ' + get_version() + self.retain_message(None, DOLLAR_SYS_ROOT + 'version', version.encode()) + + def broadcast_dollar_sys_topics(self): + """ + Broadcast dynamic $SYS topics updates and reschedule next execution depending on 'sys_interval' config + parameter. + """ + + # Update stats + uptime = datetime.now() - self._stats[STAT_UPTIME] + client_connected = sum(1 for k, session in self._sessions.items() if session.machine.state == 'connected') + if client_connected > self._stats[STAT_CLIENTS_MAXIMUM]: + self._stats[STAT_CLIENTS_MAXIMUM] = client_connected + client_disconnected = sum(1 for k, session in self._sessions.items() if session.machine.state == 'disconnected') + inflight_in = 0 + inflight_out = 0 + messages_stored = 0 + for k, session in self._sessions.items(): + inflight_in += session.inflight_in_count + inflight_out += session.inflight_out_count + messages_stored += session.retained_messages_count + messages_stored += len(self._global_retained_messages) + subscriptions_count = 0 + for topic in self._subscriptions: + subscriptions_count += len(self._subscriptions[topic]) + + # Broadcast updates + tasks = [ + self._broadcast_sys_topic('load/bytes/received', int_to_bytes_str(self._stats[STAT_BYTES_RECEIVED])), + self._broadcast_sys_topic('load/bytes/sent', int_to_bytes_str(self._stats[STAT_BYTES_SENT])), + self._broadcast_sys_topic('messages/received', int_to_bytes_str(self._stats[STAT_MSG_RECEIVED])), + self._broadcast_sys_topic('messages/sent', int_to_bytes_str(self._stats[STAT_MSG_SENT])), + self._broadcast_sys_topic('time', str(datetime.now()).encode('utf-8')), + self._broadcast_sys_topic('uptime', int_to_bytes_str(int(uptime.total_seconds()))), + self._broadcast_sys_topic('uptime/formated', str(uptime).encode('utf-8')), + self._broadcast_sys_topic('clients/connected', int_to_bytes_str(client_connected)), + self._broadcast_sys_topic('clients/disconnected', int_to_bytes_str(client_disconnected)), + self._broadcast_sys_topic('clients/maximum', int_to_bytes_str(self._stats[STAT_CLIENTS_MAXIMUM])), + self._broadcast_sys_topic('clients/total', int_to_bytes_str(client_connected + client_disconnected)), + self._broadcast_sys_topic('messages/inflight', int_to_bytes_str(inflight_in + inflight_out)), + self._broadcast_sys_topic('messages/inflight/in', int_to_bytes_str(inflight_in)), + self._broadcast_sys_topic('messages/inflight/out', int_to_bytes_str(inflight_out)), + self._broadcast_sys_topic('messages/inflight/stored', int_to_bytes_str(messages_stored)), + self._broadcast_sys_topic('messages/publish/received', int_to_bytes_str(self._stats[STAT_PUBLISH_RECEIVED])), + self._broadcast_sys_topic('messages/publish/sent', int_to_bytes_str(self._stats[STAT_PUBLISH_SENT])), + self._broadcast_sys_topic('messages/retained/count', int_to_bytes_str(len(self._global_retained_messages))), + self._broadcast_sys_topic('messages/subscriptions/count', int_to_bytes_str(subscriptions_count)), + ] + + # Wait until broadcasting tasks end + if len(tasks) > 0: + asyncio.wait(tasks) + # Reschedule + sys_interval = int(self.config['sys_interval']) + self.logger.debug("Broadcasting $SYS topics") + self.sys_handle = self._loop.call_later(sys_interval, self.broadcast_dollar_sys_topics) + + def _broadcast_sys_topic(self, topic_basename, data): + return self._internal_message_broadcast(DOLLAR_SYS_ROOT + topic_basename, data) + + def _internal_message_broadcast(self, topic, data): + return asyncio.Task(self.broadcast_application_message(None, topic, data)) + @asyncio.coroutine def ws_connected(self, websocket, uri, listener_name): yield from self.client_connected(listener_name, WebSocketsReader(websocket), WebSocketsWriter(websocket)) @@ -282,7 +404,7 @@ class Broker: elif connect.variable_header.password_flag and connect.payload.password is None: self.logger.error('Invalid password %s' % (format_client_message(address=remote_address, port=remote_port))) connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.2.2-4] session_parent=0 - elif connect.variable_header.clean_session_flag == False and connect.payload.client_id is None: + elif connect.variable_header.clean_session_flag is False and connect.payload.client_id is None: self.logger.error('[MQTT-3.1.3-8] [MQTT-3.1.3-9] %s: No client Id provided (cleansession=0)' % format_client_message(address=remote_address, port=remote_port)) connack = ConnackPacket.build(0, IDENTIFIER_REJECTED) @@ -296,22 +418,25 @@ class Broker: client_session = None self.logger.debug("Clean session={0}".format(connect.variable_header.clean_session_flag)) self.logger.debug("known sessions={0}".format(self._sessions)) + client_id = connect.payload.client_id if connect.variable_header.clean_session_flag: - client_id = connect.payload.client_id + # Delete existing session and create a new one if client_id is not None: self.delete_session(client_id) client_session = Session() client_session.parent = 0 + client_session.client_id = client_id self._sessions[client_id] = client_session else: # Get session from cache - client_id = connect.payload.client_id if client_id in self._sessions: self.logger.debug("Found old session %s" % repr(self._sessions[client_id])) client_session = self._sessions[client_id] client_session.parent = 1 else: client_session = Session() + client_session.client_id = client_id + self._sessions[client_id] = client_session client_session.parent = 0 if client_session.client_id is None: @@ -351,8 +476,7 @@ class Broker: return client_session.machine.connect() - handler = BrokerProtocolHandler(reader, writer, self._loop) - handler.attach_to_session(client_session) + handler = self._init_handler(reader, writer, client_session) self.logger.debug("%s Start messages handling" % client_session.client_id) yield from handler.start() self.logger.debug("Retained messages queue size: %d" % client_session.retained_messages.qsize()) @@ -373,7 +497,7 @@ class Broker: self.logger.debug("%s Result from wait_diconnect: %s" % (client_session.client_id, result)) if result is None: self.logger.debug("Will flag: %s" % client_session.will_flag) - #Connection closed anormally, send will message + # Connection closed anormally, send will message if client_session.will_flag: self.logger.debug("Client %s disconnected abnormally, sending will message" % format_client_message(client_session)) @@ -423,17 +547,36 @@ class Broker: wait_deliver.cancel() self.logger.debug("%s Client disconnecting" % client_session.client_id) + yield from self._stop_handler(handler) + client_session.machine.disconnect() + yield from writer.close() + self.logger.debug("%s Session disconnected" % client_session.client_id) + server.release_connection() + + def _init_handler(self, reader, writer, session): + """ + Create a BrokerProtocolHandler and attach to a session + :return: + """ + handler = BrokerProtocolHandler(reader, writer, self._loop) + handler.attach_to_session(session) + handler.on_packet_received.connect(self.sys_handle_packet_received) + handler.on_packet_sent.connect(self.sys_handle_packet_sent) + return handler + + @asyncio.coroutine + def _stop_handler(self, handler): + """ + Stop a running handler and detach if from the session + :param handler: + :return: + """ try: yield from handler.stop() except Exception as e: self.logger.error(e) finally: handler.detach_from_session() - handler = None - client_session.machine.disconnect() - yield from writer.close() - self.logger.debug("%s Session disconnected" % client_session.client_id) - server.release_connection() @asyncio.coroutine def check_connect(self, connect: ConnectPacket): @@ -454,17 +597,16 @@ class Broker: def retain_message(self, source_session, topic_name, data, qos=None): if data is not None and data != b'': # If retained flag set, store the message for further subscriptions - self.logger.debug("%s Retaining message on topic %s" % (source_session.client_id, topic_name)) + self.logger.debug("Retaining message on topic %s" % topic_name) retained_message = RetainedApplicationMessage(source_session, topic_name, data, qos) self._global_retained_messages[topic_name] = retained_message else: # [MQTT-3.3.1-10] - self.logger.debug("%s Clear retained messages for topic '%s'" % (source_session.client_id, topic_name)) + self.logger.debug("Clear retained messages for topic '%s'" % topic_name) del self._global_retained_messages[topic_name] def add_subscription(self, subscription, session): import re - #wildcard_pattern = re.compile('(/.+?\+)|(/\+.+?)|(/.+?\+.+?)') wildcard_pattern = re.compile('.*?/?\+/?.*?') try: a_filter = subscription['filter'] @@ -502,9 +644,9 @@ class Broker: # Unsubscribe topic not found in current subscribed topics pass - def matches(self, topic, filter): + def matches(self, topic, a_filter): import re - match_pattern = re.compile(filter.replace('#', '.*').replace('+', '[\s\w\d]+')) + match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[\$\s\w\d]+')) if match_pattern.match(topic): return True else: @@ -515,7 +657,6 @@ class Broker: self.logger.debug("Broadcasting message from %s on topic %s" % (format_client_message(session=source_session), topic) ) - self.logger.debug("Current subscriptions: %s" % repr(self._subscriptions)) publish_tasks = [] try: for k_filter in self._subscriptions: @@ -603,10 +744,26 @@ class Broker: # Delete subscriptions self.logger.debug("deleting session %s subscriptions" % repr(session)) nb_sub = 0 - for filter in self._subscriptions: - self.del_subscription(filter, session) + for a_filter in self._subscriptions: + self.del_subscription(a_filter, session) nb_sub += 1 self.logger.debug("%d subscriptions deleted" % nb_sub) self.logger.debug("deleting existing session %s" % repr(self._sessions[client_id])) del self._sessions[client_id] + + def sys_handle_packet_received(self, packet): + if packet: + packet_size = packet.bytes_length + self._stats[STAT_BYTES_RECEIVED] += packet_size + self._stats[STAT_MSG_RECEIVED] += 1 + if packet.fixed_header.packet_type == PUBLISH: + self._stats[STAT_PUBLISH_RECEIVED] += 1 + + def sys_handle_packet_sent(self, packet): + if packet: + packet_size = packet.bytes_length + self._stats[STAT_BYTES_SENT] += packet_size + self._stats[STAT_MSG_SENT] += 1 + if packet.fixed_header.packet_type == PUBLISH: + self._stats[STAT_PUBLISH_SENT] += 1 diff --git a/hbmqtt/client.py b/hbmqtt/client.py index 7022904..3082d5d 100644 --- a/hbmqtt/client.py +++ b/hbmqtt/client.py @@ -308,9 +308,9 @@ class MQTTClient: s.capath = broker_conf['capath'] s.cadata = broker_conf['cadata'] if cleansession is not None: - s.cleansession = cleansession + s.clean_session = cleansession else: - s.cleansession = self.config.get('cleansession', True) + s.clean_session = self.config.get('cleansession', True) s.keep_alive = self.config['keep_alive'] - self.config['ping_delay'] if 'will' in self.config: s.will_flag = True diff --git a/hbmqtt/codecs.py b/hbmqtt/codecs.py index b62fa3a..46a0446 100644 --- a/hbmqtt/codecs.py +++ b/hbmqtt/codecs.py @@ -102,3 +102,13 @@ def decode_packet_id(reader) -> int: """ packet_id_bytes = yield from read_or_raise(reader, 2) return bytes_to_int(packet_id_bytes) + + +def int_to_bytes_str(value: int) -> bytes: + """ + Converts a int value to a bytes array containing the numeric character. + Ex: 123 -> b'123' + :param value: int value to convert + :return: bytes array + """ + return str(value).encode('utf-8') diff --git a/hbmqtt/mqtt/packet.py b/hbmqtt/mqtt/packet.py index 18ea334..7d3f0b0 100644 --- a/hbmqtt/mqtt/packet.py +++ b/hbmqtt/mqtt/packet.py @@ -1,7 +1,7 @@ # Copyright (c) 2015 Nicolas JOUANIN # # See the file license.txt for copying permission. -from hbmqtt.errors import CodecException, MQTTException, HBMQTTException +from hbmqtt.errors import CodecException, MQTTException from hbmqtt.codecs import * from hbmqtt.adapters import ReaderAdapter, WriterAdapter import abc @@ -171,7 +171,7 @@ class MQTTPacket: writer.write(self.to_bytes()) yield from writer.drain() - def to_bytes(self): + def to_bytes(self) -> bytes: if self.variable_header: variable_header_bytes = self.variable_header.to_bytes() else: @@ -208,6 +208,10 @@ class MQTTPacket: else: return cls(fixed_header, variable_header, payload) + @property + def bytes_length(self): + return len(self.to_bytes()) + def __repr__(self): return type(self).__name__ + '(fixed={0!r}, variable={1!r}, payload={2!r})'.\ format(self.fixed_header, self.variable_header, self.payload) diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index 4e48994..552a5bb 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -2,6 +2,7 @@ # # See the file license.txt for copying permission. import logging +from blinker import Signal from hbmqtt.mqtt import packet_class from hbmqtt.mqtt.packet import * from hbmqtt.mqtt.connack import ConnackPacket @@ -29,6 +30,9 @@ class ProtocolHandler: Class implementing the MQTT communication protocol using asyncio features """ + on_packet_sent = Signal() + on_packet_received = Signal() + def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, loop=None): self.logger = logging.getLogger(__name__) self.session = None @@ -97,9 +101,12 @@ class ProtocolHandler: @asyncio.coroutine def mqtt_publish(self, topic, message, qos, retain): - packet_id = self.session.next_packet_id - if packet_id in self.session.outgoing_msg: - self.logger.warn("%s A message with the same packet ID is already in flight" % self.session.client_id) + if qos: + packet_id = self.session.next_packet_id + if packet_id in self.session.outgoing_msg: + self.logger.warn("%s A message with the same packet ID is already in flight" % self.session.client_id) + else: + packet_id = None packet = PublishPacket.build(topic, message, packet_id, False, qos, retain) yield from self.outgoing_queue.put(packet) if qos != QOS_0: @@ -148,6 +155,7 @@ class ProtocolHandler: cls = packet_class(fixed_header) packet = yield from cls.from_stream(self.reader, fixed_header=fixed_header) self.logger.debug("%s <-in-- %s" % (self.session.client_id, repr(packet))) + self._loop.call_soon(self.on_packet_received.send, packet) task = None if packet.fixed_header.packet_type == CONNACK: @@ -213,7 +221,7 @@ class ProtocolHandler: break yield from packet.to_stream(self.writer) self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet))) - yield from self.writer.drain() + self._loop.call_soon(self.on_packet_sent.send, packet) except asyncio.TimeoutError as ce: self.logger.debug("%s Output queue get timeout" % self.session.client_id) if self._running: @@ -234,6 +242,7 @@ class ProtocolHandler: break yield from packet.to_stream(self.session.writer) self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet))) + self._loop.call_soon(self.on_packet_sent, packet) except asyncio.QueueEmpty: break except Exception as e: diff --git a/hbmqtt/session.py b/hbmqtt/session.py index 9da3acc..17e1c9f 100644 --- a/hbmqtt/session.py +++ b/hbmqtt/session.py @@ -57,5 +57,17 @@ class Session: self._packet_id += 1 return self._packet_id + @property + def inflight_in_count(self): + return len(self.incoming_msg) + + @property + def inflight_out_count(self): + return len(self.outgoing_msg) + + @property + def retained_messages_count(self): + return self.retained_messages.qsize() + def __repr__(self): return type(self).__name__ + '(clientId={0}, state={1})'.format(self.client_id, self.machine.state) diff --git a/hbmqtt/utils.py b/hbmqtt/utils.py index 65ea6cb..728980c 100644 --- a/hbmqtt/utils.py +++ b/hbmqtt/utils.py @@ -16,11 +16,13 @@ def not_in_dict_or_none(dict, key): return False -def format_client_message(session=None, address=None, port=None, id=None): +def format_client_message(session=None, address=None, port=None): if session: return "(client @=%s:%d id=%s)" % (session.remote_address, session.remote_port, session.client_id) + elif address is not None and port is not None: + return "(client @=%s:%d)" % (address, port) else: - return "(client @=%s:%d id=%s)" % (address, port, id) + return "(unknown client)" def gen_client_id(): diff --git a/samples/broker_start.py b/samples/broker_start.py index 1b3151e..4fd204a 100644 --- a/samples/broker_start.py +++ b/samples/broker_start.py @@ -17,7 +17,8 @@ config = { 'bind': '127.0.0.1:8080', 'type': 'ws' }, - } + }, + 'sys_interval': 10, } broker = Broker(config)