amqtt/hbmqtt/plugins/sys/broker.py

159 wiersze
7.5 KiB
Python

# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
from datetime import datetime
import hbmqtt
from hbmqtt.mqtt.packet import PUBLISH
from hbmqtt.codecs import int_to_bytes_str
import asyncio
import sys
from collections import deque
DOLLAR_SYS_ROOT = '$SYS/broker/'
STAT_BYTES_SENT = 'bytes_sent'
STAT_BYTES_RECEIVED = 'bytes_received'
STAT_MSG_SENT = 'messages_sent'
STAT_MSG_RECEIVED = 'messages_received'
STAT_PUBLISH_SENT = 'publish_sent'
STAT_PUBLISH_RECEIVED = 'publish_received'
STAT_START_TIME = 'start_time'
STAT_CLIENTS_MAXIMUM = 'clients_maximum'
STAT_CLIENTS_CONNECTED = 'clients_connected'
STAT_CLIENTS_DISCONNECTED = 'clients_disconnected'
class BrokerSysPlugin:
def __init__(self, context):
self.context = context
# Broker statistics initialization
self._stats = dict()
self._sys_handle = None
def _clear_stats(self):
"""
Initializes broker statistics data structures
"""
for stat in (STAT_BYTES_RECEIVED,
STAT_BYTES_SENT,
STAT_MSG_RECEIVED,
STAT_MSG_SENT,
STAT_CLIENTS_MAXIMUM,
STAT_CLIENTS_CONNECTED,
STAT_CLIENTS_DISCONNECTED,
STAT_PUBLISH_RECEIVED,
STAT_PUBLISH_SENT):
self._stats[stat] = 0
async def _broadcast_sys_topic(self, topic_basename, data):
return (await self.context.broadcast_message(topic_basename, data))
def schedule_broadcast_sys_topic(self, topic_basename, data):
return asyncio.ensure_future(
self._broadcast_sys_topic(DOLLAR_SYS_ROOT + topic_basename, data),
loop=self.context.loop
)
async def on_broker_pre_start(self, *args, **kwargs):
self._clear_stats()
async def on_broker_post_start(self, *args, **kwargs):
self._stats[STAT_START_TIME] = datetime.now()
version = f'HBMQTT version {hbmqtt.__version__}'
self.context.retain_message(DOLLAR_SYS_ROOT + 'version', version.encode())
# Start $SYS topics management
try:
sys_interval = int(self.context.config.get('sys_interval', 0))
if sys_interval > 0:
self.context.logger.debug("Setup $SYS broadcasting every %d secondes" % sys_interval)
self.sys_handle = self.context.loop.call_later(sys_interval, self.broadcast_dollar_sys_topics)
else:
self.context.logger.debug("$SYS disabled")
except KeyError:
pass
# 'sys_internal' config parameter not found
async def on_broker_pre_stop(self, *args, **kwargs):
# Stop $SYS topics broadcasting
if self.sys_handle:
self.sys_handle.cancel()
def broadcast_dollar_sys_topics(self):
"""
Broadcast dynamic $SYS topics updates and reschedule next execution depending on 'sys_interval' config
parameter.
"""
# Update stats
uptime = datetime.now() - self._stats[STAT_START_TIME]
client_connected = self._stats[STAT_CLIENTS_CONNECTED]
client_disconnected = self._stats[STAT_CLIENTS_DISCONNECTED]
inflight_in = 0
inflight_out = 0
messages_stored = 0
for session in self.context.sessions:
inflight_in += session.inflight_in_count
inflight_out += session.inflight_out_count
messages_stored += session.retained_messages_count
messages_stored += len(self.context.retained_messages)
subscriptions_count = 0
for topic in self.context.subscriptions:
subscriptions_count += len(self.context.subscriptions[topic])
# Broadcast updates
tasks = deque()
tasks.append(self.schedule_broadcast_sys_topic('load/bytes/received', int_to_bytes_str(self._stats[STAT_BYTES_RECEIVED])))
tasks.append(self.schedule_broadcast_sys_topic('load/bytes/sent', int_to_bytes_str(self._stats[STAT_BYTES_SENT])))
tasks.append(self.schedule_broadcast_sys_topic('messages/received', int_to_bytes_str(self._stats[STAT_MSG_RECEIVED])))
tasks.append(self.schedule_broadcast_sys_topic('messages/sent', int_to_bytes_str(self._stats[STAT_MSG_SENT])))
tasks.append(self.schedule_broadcast_sys_topic('time', str(datetime.now()).encode('utf-8')))
tasks.append(self.schedule_broadcast_sys_topic('uptime', int_to_bytes_str(int(uptime.total_seconds()))))
tasks.append(self.schedule_broadcast_sys_topic('uptime/formated', str(uptime).encode('utf-8')))
tasks.append(self.schedule_broadcast_sys_topic('clients/connected', int_to_bytes_str(client_connected)))
tasks.append(self.schedule_broadcast_sys_topic('clients/disconnected', int_to_bytes_str(client_disconnected)))
tasks.append(self.schedule_broadcast_sys_topic('clients/maximum', int_to_bytes_str(self._stats[STAT_CLIENTS_MAXIMUM])))
tasks.append(self.schedule_broadcast_sys_topic('clients/total', int_to_bytes_str(client_connected + client_disconnected)))
tasks.append(self.schedule_broadcast_sys_topic('messages/inflight', int_to_bytes_str(inflight_in + inflight_out)))
tasks.append(self.schedule_broadcast_sys_topic('messages/inflight/in', int_to_bytes_str(inflight_in)))
tasks.append(self.schedule_broadcast_sys_topic('messages/inflight/out', int_to_bytes_str(inflight_out)))
tasks.append(self.schedule_broadcast_sys_topic('messages/inflight/stored', int_to_bytes_str(messages_stored)))
tasks.append(self.schedule_broadcast_sys_topic('messages/publish/received', int_to_bytes_str(self._stats[STAT_PUBLISH_RECEIVED])))
tasks.append(self.schedule_broadcast_sys_topic('messages/publish/sent', int_to_bytes_str(self._stats[STAT_PUBLISH_SENT])))
tasks.append(self.schedule_broadcast_sys_topic('messages/retained/count', int_to_bytes_str(len(self.context.retained_messages))))
tasks.append(self.schedule_broadcast_sys_topic('messages/subscriptions/count', int_to_bytes_str(subscriptions_count)))
# Wait until broadcasting tasks end
while tasks and tasks[0].done():
tasks.popleft()
# Reschedule
sys_interval = int(self.context.config['sys_interval'])
self.context.logger.debug("Broadcasting $SYS topics")
self.sys_handle = self.context.loop.call_later(sys_interval, self.broadcast_dollar_sys_topics)
async def on_mqtt_packet_received(self, *args, **kwargs):
packet = kwargs.get('packet')
if packet:
packet_size = packet.bytes_length
self._stats[STAT_BYTES_RECEIVED] += packet_size
self._stats[STAT_MSG_RECEIVED] += 1
if packet.fixed_header.packet_type == PUBLISH:
self._stats[STAT_PUBLISH_RECEIVED] += 1
async def on_mqtt_packet_sent(self, *args, **kwargs):
packet = kwargs.get('packet')
if packet:
packet_size = packet.bytes_length
self._stats[STAT_BYTES_SENT] += packet_size
self._stats[STAT_MSG_SENT] += 1
if packet.fixed_header.packet_type == PUBLISH:
self._stats[STAT_PUBLISH_SENT] += 1
async def on_broker_client_connected(self, *args, **kwargs):
self._stats[STAT_CLIENTS_CONNECTED] += 1
self._stats[STAT_CLIENTS_MAXIMUM] = max(self._stats[STAT_CLIENTS_MAXIMUM], self._stats[STAT_CLIENTS_CONNECTED])
async def on_broker_client_disconnected(self, *args, **kwargs):
self._stats[STAT_CLIENTS_CONNECTED] -= 1
self._stats[STAT_CLIENTS_DISCONNECTED] += 1