Merge branch 'feature/dollar_sys' into develop

HBMQTT-11
pull/8/head
Nicolas Jouanin 2015-08-10 21:31:45 +02:00
commit b1cb963ece
8 zmienionych plików z 231 dodań i 36 usunięć

Wyświetl plik

@ -5,6 +5,7 @@ import logging
import ssl
import websockets
import asyncio
from datetime import datetime
from functools import partial
from transitions import Machine, MachineError
@ -14,6 +15,8 @@ from hbmqtt.mqtt.connect import ConnectPacket
from hbmqtt.mqtt.connack import *
from hbmqtt.errors import HBMQTTException
from hbmqtt.utils import format_client_message, gen_client_id
from hbmqtt.mqtt.packet import PUBLISH
from hbmqtt.codecs import int_to_bytes_str
from hbmqtt.adapters import (
StreamReaderAdapter,
StreamWriterAdapter,
@ -28,6 +31,15 @@ _defaults = {
'publish-retry-delay': 5,
}
DOLLAR_SYS_ROOT = '$SYS/broker/'
STAT_BYTES_SENT = 'bytes_sent'
STAT_BYTES_RECEIVED = 'bytes_received'
STAT_MSG_SENT = 'messages_sent'
STAT_MSG_RECEIVED = 'messages_received'
STAT_PUBLISH_SENT = 'publish_sent'
STAT_PUBLISH_RECEIVED = 'publish_received'
STAT_UPTIME = 'uptime'
STAT_CLIENTS_MAXIMUM = 'clients_maximum'
class BrokerException(BaseException):
pass
@ -141,6 +153,12 @@ class Broker:
self._subscriptions = dict()
self._global_retained_messages = dict()
# Broker statistics initialization
self._stats = dict()
# $SYS tree task handle
self.sys_handle = None
def _build_listeners_config(self, broker_config):
self.listeners_config = dict()
try:
@ -173,12 +191,15 @@ class Broker:
self.logger.debug("Invalid method call at this moment: %s" % me)
raise BrokerException("Broker instance can't be started: %s" % me)
# Clear broker stats
self._clear_stats()
try:
# Start network listeners
for listener_name in self.listeners_config:
listener = self.listeners_config[listener_name]
self.logger.info("Binding listener '%s' to %s" % (listener_name, listener['bind']))
#Max connections
# Max connections
try:
max_connections = listener['max_connections']
except KeyError:
@ -195,22 +216,34 @@ class Broker:
raise BrokerException("'certfile' or 'keyfile' configuration parameter missing: %s" % ke)
except FileNotFoundError as fnfe:
raise BrokerException("Can't read cert files '%s' or '%s' : %s" %
(listener['certfile'], listener['keyfile'], fnfe))
(listener['certfile'], listener['keyfile'], fnfe))
if listener['type'] == 'tcp':
address, port = listener['bind'].split(':')
cb_partial = partial(self.stream_connected, listener_name=listener_name)
instance = yield from asyncio.start_server(cb_partial,
address,
port,
ssl=sc,
loop=self._loop)
address,
port,
ssl=sc,
loop=self._loop)
self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)
elif listener['type'] == 'ws':
address, port = listener['bind'].split(':')
cb_partial = partial(self.ws_connected, listener_name=listener_name)
instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop)
self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)
# Start $SYS topics management
self._init_dollar_sys()
try:
sys_interval = int(self.config['sys_interval'])
if sys_interval:
self.logger.debug("Setup $SYS broadcasting every %d secondes" % sys_interval)
self.sys_handle = self._loop.call_later(sys_interval, self.broadcast_dollar_sys_topics)
except KeyError:
pass
# 'sys_internal' config parameter not found
self.machine.starting_success()
self.logger.debug("Broker started")
except Exception as e:
@ -225,6 +258,11 @@ class Broker:
except MachineError as me:
self.logger.debug("Invalid method call at this moment: %s" % me)
raise BrokerException("Broker instance can't be stopped: %s" % me)
# Stop $SYS topics broadcasting
if self.sys_handle:
self.sys_handle.cancel()
for listener_name in self._servers:
server = self._servers[listener_name]
yield from server.close_instance()
@ -232,6 +270,90 @@ class Broker:
self.logger.info("Broker closed")
self.machine.stopping_success()
def _clear_stats(self):
"""
Initializes broker statistics data structures
"""
for stat in (STAT_BYTES_RECEIVED,
STAT_BYTES_SENT,
STAT_MSG_RECEIVED,
STAT_MSG_SENT,
STAT_CLIENTS_MAXIMUM,
STAT_PUBLISH_RECEIVED,
STAT_PUBLISH_SENT):
self._stats[stat] = 0
self._stats[STAT_UPTIME] = datetime.now()
def _init_dollar_sys(self):
"""
Initializes and publish $SYS static topics
"""
from hbmqtt.version import get_version
self.sys_handle = None
version = 'HBMQTT version ' + get_version()
self.retain_message(None, DOLLAR_SYS_ROOT + 'version', version.encode())
def broadcast_dollar_sys_topics(self):
"""
Broadcast dynamic $SYS topics updates and reschedule next execution depending on 'sys_interval' config
parameter.
"""
# Update stats
uptime = datetime.now() - self._stats[STAT_UPTIME]
client_connected = sum(1 for k, session in self._sessions.items() if session.machine.state == 'connected')
if client_connected > self._stats[STAT_CLIENTS_MAXIMUM]:
self._stats[STAT_CLIENTS_MAXIMUM] = client_connected
client_disconnected = sum(1 for k, session in self._sessions.items() if session.machine.state == 'disconnected')
inflight_in = 0
inflight_out = 0
messages_stored = 0
for k, session in self._sessions.items():
inflight_in += session.inflight_in_count
inflight_out += session.inflight_out_count
messages_stored += session.retained_messages_count
messages_stored += len(self._global_retained_messages)
subscriptions_count = 0
for topic in self._subscriptions:
subscriptions_count += len(self._subscriptions[topic])
# Broadcast updates
tasks = [
self._broadcast_sys_topic('load/bytes/received', int_to_bytes_str(self._stats[STAT_BYTES_RECEIVED])),
self._broadcast_sys_topic('load/bytes/sent', int_to_bytes_str(self._stats[STAT_BYTES_SENT])),
self._broadcast_sys_topic('messages/received', int_to_bytes_str(self._stats[STAT_MSG_RECEIVED])),
self._broadcast_sys_topic('messages/sent', int_to_bytes_str(self._stats[STAT_MSG_SENT])),
self._broadcast_sys_topic('time', str(datetime.now()).encode('utf-8')),
self._broadcast_sys_topic('uptime', int_to_bytes_str(int(uptime.total_seconds()))),
self._broadcast_sys_topic('uptime/formated', str(uptime).encode('utf-8')),
self._broadcast_sys_topic('clients/connected', int_to_bytes_str(client_connected)),
self._broadcast_sys_topic('clients/disconnected', int_to_bytes_str(client_disconnected)),
self._broadcast_sys_topic('clients/maximum', int_to_bytes_str(self._stats[STAT_CLIENTS_MAXIMUM])),
self._broadcast_sys_topic('clients/total', int_to_bytes_str(client_connected + client_disconnected)),
self._broadcast_sys_topic('messages/inflight', int_to_bytes_str(inflight_in + inflight_out)),
self._broadcast_sys_topic('messages/inflight/in', int_to_bytes_str(inflight_in)),
self._broadcast_sys_topic('messages/inflight/out', int_to_bytes_str(inflight_out)),
self._broadcast_sys_topic('messages/inflight/stored', int_to_bytes_str(messages_stored)),
self._broadcast_sys_topic('messages/publish/received', int_to_bytes_str(self._stats[STAT_PUBLISH_RECEIVED])),
self._broadcast_sys_topic('messages/publish/sent', int_to_bytes_str(self._stats[STAT_PUBLISH_SENT])),
self._broadcast_sys_topic('messages/retained/count', int_to_bytes_str(len(self._global_retained_messages))),
self._broadcast_sys_topic('messages/subscriptions/count', int_to_bytes_str(subscriptions_count)),
]
# Wait until broadcasting tasks end
if len(tasks) > 0:
asyncio.wait(tasks)
# Reschedule
sys_interval = int(self.config['sys_interval'])
self.logger.debug("Broadcasting $SYS topics")
self.sys_handle = self._loop.call_later(sys_interval, self.broadcast_dollar_sys_topics)
def _broadcast_sys_topic(self, topic_basename, data):
return self._internal_message_broadcast(DOLLAR_SYS_ROOT + topic_basename, data)
def _internal_message_broadcast(self, topic, data):
return asyncio.Task(self.broadcast_application_message(None, topic, data))
@asyncio.coroutine
def ws_connected(self, websocket, uri, listener_name):
yield from self.client_connected(listener_name, WebSocketsReader(websocket), WebSocketsWriter(websocket))
@ -282,7 +404,7 @@ class Broker:
elif connect.variable_header.password_flag and connect.payload.password is None:
self.logger.error('Invalid password %s' % (format_client_message(address=remote_address, port=remote_port)))
connack = ConnackPacket.build(0, BAD_USERNAME_PASSWORD) # [MQTT-3.2.2-4] session_parent=0
elif connect.variable_header.clean_session_flag == False and connect.payload.client_id is None:
elif connect.variable_header.clean_session_flag is False and connect.payload.client_id is None:
self.logger.error('[MQTT-3.1.3-8] [MQTT-3.1.3-9] %s: No client Id provided (cleansession=0)' %
format_client_message(address=remote_address, port=remote_port))
connack = ConnackPacket.build(0, IDENTIFIER_REJECTED)
@ -296,22 +418,25 @@ class Broker:
client_session = None
self.logger.debug("Clean session={0}".format(connect.variable_header.clean_session_flag))
self.logger.debug("known sessions={0}".format(self._sessions))
client_id = connect.payload.client_id
if connect.variable_header.clean_session_flag:
client_id = connect.payload.client_id
# Delete existing session and create a new one
if client_id is not None:
self.delete_session(client_id)
client_session = Session()
client_session.parent = 0
client_session.client_id = client_id
self._sessions[client_id] = client_session
else:
# Get session from cache
client_id = connect.payload.client_id
if client_id in self._sessions:
self.logger.debug("Found old session %s" % repr(self._sessions[client_id]))
client_session = self._sessions[client_id]
client_session.parent = 1
else:
client_session = Session()
client_session.client_id = client_id
self._sessions[client_id] = client_session
client_session.parent = 0
if client_session.client_id is None:
@ -351,8 +476,7 @@ class Broker:
return
client_session.machine.connect()
handler = BrokerProtocolHandler(reader, writer, self._loop)
handler.attach_to_session(client_session)
handler = self._init_handler(reader, writer, client_session)
self.logger.debug("%s Start messages handling" % client_session.client_id)
yield from handler.start()
self.logger.debug("Retained messages queue size: %d" % client_session.retained_messages.qsize())
@ -373,7 +497,7 @@ class Broker:
self.logger.debug("%s Result from wait_diconnect: %s" % (client_session.client_id, result))
if result is None:
self.logger.debug("Will flag: %s" % client_session.will_flag)
#Connection closed anormally, send will message
# Connection closed anormally, send will message
if client_session.will_flag:
self.logger.debug("Client %s disconnected abnormally, sending will message" %
format_client_message(client_session))
@ -423,17 +547,36 @@ class Broker:
wait_deliver.cancel()
self.logger.debug("%s Client disconnecting" % client_session.client_id)
yield from self._stop_handler(handler)
client_session.machine.disconnect()
yield from writer.close()
self.logger.debug("%s Session disconnected" % client_session.client_id)
server.release_connection()
def _init_handler(self, reader, writer, session):
"""
Create a BrokerProtocolHandler and attach to a session
:return:
"""
handler = BrokerProtocolHandler(reader, writer, self._loop)
handler.attach_to_session(session)
handler.on_packet_received.connect(self.sys_handle_packet_received)
handler.on_packet_sent.connect(self.sys_handle_packet_sent)
return handler
@asyncio.coroutine
def _stop_handler(self, handler):
"""
Stop a running handler and detach if from the session
:param handler:
:return:
"""
try:
yield from handler.stop()
except Exception as e:
self.logger.error(e)
finally:
handler.detach_from_session()
handler = None
client_session.machine.disconnect()
yield from writer.close()
self.logger.debug("%s Session disconnected" % client_session.client_id)
server.release_connection()
@asyncio.coroutine
def check_connect(self, connect: ConnectPacket):
@ -454,17 +597,16 @@ class Broker:
def retain_message(self, source_session, topic_name, data, qos=None):
if data is not None and data != b'':
# If retained flag set, store the message for further subscriptions
self.logger.debug("%s Retaining message on topic %s" % (source_session.client_id, topic_name))
self.logger.debug("Retaining message on topic %s" % topic_name)
retained_message = RetainedApplicationMessage(source_session, topic_name, data, qos)
self._global_retained_messages[topic_name] = retained_message
else:
# [MQTT-3.3.1-10]
self.logger.debug("%s Clear retained messages for topic '%s'" % (source_session.client_id, topic_name))
self.logger.debug("Clear retained messages for topic '%s'" % topic_name)
del self._global_retained_messages[topic_name]
def add_subscription(self, subscription, session):
import re
#wildcard_pattern = re.compile('(/.+?\+)|(/\+.+?)|(/.+?\+.+?)')
wildcard_pattern = re.compile('.*?/?\+/?.*?')
try:
a_filter = subscription['filter']
@ -502,9 +644,9 @@ class Broker:
# Unsubscribe topic not found in current subscribed topics
pass
def matches(self, topic, filter):
def matches(self, topic, a_filter):
import re
match_pattern = re.compile(filter.replace('#', '.*').replace('+', '[\s\w\d]+'))
match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[\$\s\w\d]+'))
if match_pattern.match(topic):
return True
else:
@ -515,7 +657,6 @@ class Broker:
self.logger.debug("Broadcasting message from %s on topic %s" %
(format_client_message(session=source_session), topic)
)
self.logger.debug("Current subscriptions: %s" % repr(self._subscriptions))
publish_tasks = []
try:
for k_filter in self._subscriptions:
@ -603,10 +744,26 @@ class Broker:
# Delete subscriptions
self.logger.debug("deleting session %s subscriptions" % repr(session))
nb_sub = 0
for filter in self._subscriptions:
self.del_subscription(filter, session)
for a_filter in self._subscriptions:
self.del_subscription(a_filter, session)
nb_sub += 1
self.logger.debug("%d subscriptions deleted" % nb_sub)
self.logger.debug("deleting existing session %s" % repr(self._sessions[client_id]))
del self._sessions[client_id]
def sys_handle_packet_received(self, packet):
if packet:
packet_size = packet.bytes_length
self._stats[STAT_BYTES_RECEIVED] += packet_size
self._stats[STAT_MSG_RECEIVED] += 1
if packet.fixed_header.packet_type == PUBLISH:
self._stats[STAT_PUBLISH_RECEIVED] += 1
def sys_handle_packet_sent(self, packet):
if packet:
packet_size = packet.bytes_length
self._stats[STAT_BYTES_SENT] += packet_size
self._stats[STAT_MSG_SENT] += 1
if packet.fixed_header.packet_type == PUBLISH:
self._stats[STAT_PUBLISH_SENT] += 1

Wyświetl plik

@ -308,9 +308,9 @@ class MQTTClient:
s.capath = broker_conf['capath']
s.cadata = broker_conf['cadata']
if cleansession is not None:
s.cleansession = cleansession
s.clean_session = cleansession
else:
s.cleansession = self.config.get('cleansession', True)
s.clean_session = self.config.get('cleansession', True)
s.keep_alive = self.config['keep_alive'] - self.config['ping_delay']
if 'will' in self.config:
s.will_flag = True

Wyświetl plik

@ -102,3 +102,13 @@ def decode_packet_id(reader) -> int:
"""
packet_id_bytes = yield from read_or_raise(reader, 2)
return bytes_to_int(packet_id_bytes)
def int_to_bytes_str(value: int) -> bytes:
"""
Converts a int value to a bytes array containing the numeric character.
Ex: 123 -> b'123'
:param value: int value to convert
:return: bytes array
"""
return str(value).encode('utf-8')

Wyświetl plik

@ -1,7 +1,7 @@
# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
from hbmqtt.errors import CodecException, MQTTException, HBMQTTException
from hbmqtt.errors import CodecException, MQTTException
from hbmqtt.codecs import *
from hbmqtt.adapters import ReaderAdapter, WriterAdapter
import abc
@ -171,7 +171,7 @@ class MQTTPacket:
writer.write(self.to_bytes())
yield from writer.drain()
def to_bytes(self):
def to_bytes(self) -> bytes:
if self.variable_header:
variable_header_bytes = self.variable_header.to_bytes()
else:
@ -208,6 +208,10 @@ class MQTTPacket:
else:
return cls(fixed_header, variable_header, payload)
@property
def bytes_length(self):
return len(self.to_bytes())
def __repr__(self):
return type(self).__name__ + '(fixed={0!r}, variable={1!r}, payload={2!r})'.\
format(self.fixed_header, self.variable_header, self.payload)

Wyświetl plik

@ -2,6 +2,7 @@
#
# See the file license.txt for copying permission.
import logging
from blinker import Signal
from hbmqtt.mqtt import packet_class
from hbmqtt.mqtt.packet import *
from hbmqtt.mqtt.connack import ConnackPacket
@ -29,6 +30,9 @@ class ProtocolHandler:
Class implementing the MQTT communication protocol using asyncio features
"""
on_packet_sent = Signal()
on_packet_received = Signal()
def __init__(self, reader: ReaderAdapter, writer: WriterAdapter, loop=None):
self.logger = logging.getLogger(__name__)
self.session = None
@ -97,9 +101,12 @@ class ProtocolHandler:
@asyncio.coroutine
def mqtt_publish(self, topic, message, qos, retain):
packet_id = self.session.next_packet_id
if packet_id in self.session.outgoing_msg:
self.logger.warn("%s A message with the same packet ID is already in flight" % self.session.client_id)
if qos:
packet_id = self.session.next_packet_id
if packet_id in self.session.outgoing_msg:
self.logger.warn("%s A message with the same packet ID is already in flight" % self.session.client_id)
else:
packet_id = None
packet = PublishPacket.build(topic, message, packet_id, False, qos, retain)
yield from self.outgoing_queue.put(packet)
if qos != QOS_0:
@ -148,6 +155,7 @@ class ProtocolHandler:
cls = packet_class(fixed_header)
packet = yield from cls.from_stream(self.reader, fixed_header=fixed_header)
self.logger.debug("%s <-in-- %s" % (self.session.client_id, repr(packet)))
self._loop.call_soon(self.on_packet_received.send, packet)
task = None
if packet.fixed_header.packet_type == CONNACK:
@ -213,7 +221,7 @@ class ProtocolHandler:
break
yield from packet.to_stream(self.writer)
self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet)))
yield from self.writer.drain()
self._loop.call_soon(self.on_packet_sent.send, packet)
except asyncio.TimeoutError as ce:
self.logger.debug("%s Output queue get timeout" % self.session.client_id)
if self._running:
@ -234,6 +242,7 @@ class ProtocolHandler:
break
yield from packet.to_stream(self.session.writer)
self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet)))
self._loop.call_soon(self.on_packet_sent, packet)
except asyncio.QueueEmpty:
break
except Exception as e:

Wyświetl plik

@ -57,5 +57,17 @@ class Session:
self._packet_id += 1
return self._packet_id
@property
def inflight_in_count(self):
return len(self.incoming_msg)
@property
def inflight_out_count(self):
return len(self.outgoing_msg)
@property
def retained_messages_count(self):
return self.retained_messages.qsize()
def __repr__(self):
return type(self).__name__ + '(clientId={0}, state={1})'.format(self.client_id, self.machine.state)

Wyświetl plik

@ -16,11 +16,13 @@ def not_in_dict_or_none(dict, key):
return False
def format_client_message(session=None, address=None, port=None, id=None):
def format_client_message(session=None, address=None, port=None):
if session:
return "(client @=%s:%d id=%s)" % (session.remote_address, session.remote_port, session.client_id)
elif address is not None and port is not None:
return "(client @=%s:%d)" % (address, port)
else:
return "(client @=%s:%d id=%s)" % (address, port, id)
return "(unknown client)"
def gen_client_id():

Wyświetl plik

@ -17,7 +17,8 @@ config = {
'bind': '127.0.0.1:8080',
'type': 'ws'
},
}
},
'sys_interval': 10,
}
broker = Broker(config)