diff --git a/amqtt/codecs_amqtt.py b/amqtt/codecs_amqtt.py index 3e425a3..1db2d9c 100644 --- a/amqtt/codecs_amqtt.py +++ b/amqtt/codecs_amqtt.py @@ -1,4 +1,5 @@ import asyncio +from decimal import ROUND_HALF_UP, Decimal from struct import pack, unpack from amqtt.adapters import ReaderAdapter @@ -139,3 +140,10 @@ def int_to_bytes_str(value: int) -> bytes: :return: bytes array. """ return str(value).encode("utf-8") + + +def float_to_bytes_str(value: float, places:int=3) -> bytes: + """Convert an float value to a bytes array containing the numeric character.""" + quant = Decimal(f"0.{''.join(['0' for i in range(places-1)])}1") + rounded = Decimal(value).quantize(quant, rounding=ROUND_HALF_UP) + return str(rounded).encode("utf-8") diff --git a/amqtt/plugins/sys/broker.py b/amqtt/plugins/sys/broker.py index 99d3166..b0ba07f 100644 --- a/amqtt/plugins/sys/broker.py +++ b/amqtt/plugins/sys/broker.py @@ -1,6 +1,8 @@ import asyncio from collections import deque # pylint: disable=C0412 -from typing import SupportsIndex, SupportsInt, TypeAlias # pylint: disable=C0412 +from typing import Any, SupportsIndex, SupportsInt, TypeAlias # pylint: disable=C0412 + +import psutil from amqtt.plugins.base import BasePlugin from amqtt.session import Session @@ -26,7 +28,7 @@ except ImportError: import amqtt from amqtt.broker import BrokerContext -from amqtt.codecs_amqtt import int_to_bytes_str +from amqtt.codecs_amqtt import float_to_bytes_str, int_to_bytes_str from amqtt.mqtt.packet import PUBLISH, MQTTFixedHeader, MQTTPacket, MQTTPayload, MQTTVariableHeader DOLLAR_SYS_ROOT = "$SYS/broker/" @@ -40,17 +42,35 @@ STAT_START_TIME = "start_time" STAT_CLIENTS_MAXIMUM = "clients_maximum" STAT_CLIENTS_CONNECTED = "clients_connected" STAT_CLIENTS_DISCONNECTED = "clients_disconnected" +MEMORY_USAGE_MAXIMUM = "memory_maximum" +CPU_USAGE_MAXIMUM = "cpu_usage_maximum" +CPU_USAGE_LAST = "cpu_usage_last" PACKET: TypeAlias = MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader] +def val_to_bytes_str(value: Any) -> bytes: + """Convert an int, float or string to byte string.""" + match value: + case int(): + return int_to_bytes_str(value) + case float(): + return float_to_bytes_str(value) + case str(): + return value.encode("utf-8") + case _: + msg = f"Unsupported type {type(value)}" + raise NotImplementedError(msg) + + class BrokerSysPlugin(BasePlugin[BrokerContext]): def __init__(self, context: BrokerContext) -> None: super().__init__(context) # Broker statistics initialization self._stats: dict[str, int] = {} self._sys_handle: asyncio.Handle | None = None + self._current_process = psutil.Process() def _clear_stats(self) -> None: """Initialize broker statistics data structures.""" @@ -64,6 +84,8 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]): STAT_CLIENTS_DISCONNECTED, STAT_PUBLISH_RECEIVED, STAT_PUBLISH_SENT, + MEMORY_USAGE_MAXIMUM, + CPU_USAGE_MAXIMUM ): self._stats[stat] = 0 @@ -127,6 +149,14 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]): messages_stored += session.retained_messages_count messages_stored += len(self.context.retained_messages) subscriptions_count = sum(len(sub) for sub in self.context.subscriptions.values()) + self._stats[STAT_CLIENTS_MAXIMUM] = client_connected + + cpu_usage = self._current_process.cpu_percent(interval=0) + self._stats[CPU_USAGE_MAXIMUM] = max(self._stats[CPU_USAGE_MAXIMUM], cpu_usage) + + mem_info_usage = self._current_process.memory_full_info() + mem_size = mem_info_usage.rss / (1024 ** 2) + self._stats[MEMORY_USAGE_MAXIMUM] = max(self._stats[MEMORY_USAGE_MAXIMUM], mem_size) # Broadcast updates tasks: deque[asyncio.Task[None]] = deque() @@ -150,9 +180,13 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]): "messages/publish/sent": self._stats[STAT_PUBLISH_SENT], "messages/retained/count": len(self.context.retained_messages), "messages/subscriptions/count": subscriptions_count, + "heap/size": mem_size, + "heap/maximum": self._stats[MEMORY_USAGE_MAXIMUM], + "cpu/percent": cpu_usage, + "cpu/maximum": self._stats[CPU_USAGE_MAXIMUM], } for stat_name, stat_value in stats.items(): - data: bytes = int_to_bytes_str(stat_value) if isinstance(stat_value, int) else stat_value.encode("utf-8") + data: bytes = val_to_bytes_str(stat_value) tasks.append(self.schedule_broadcast_sys_topic(stat_name, data)) # Wait until broadcasting tasks end diff --git a/pyproject.toml b/pyproject.toml index 6af59e7..6edc91f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ dependencies = [ "passlib==1.7.4", # https://pypi.org/project/passlib "PyYAML==6.0.2", # https://pypi.org/project/PyYAML "typer==0.15.4", + "psutil>=7.0.0", ] [dependency-groups] diff --git a/tests/plugins/test_sys.py b/tests/plugins/test_sys.py index c01d322..563a313 100644 --- a/tests/plugins/test_sys.py +++ b/tests/plugins/test_sys.py @@ -31,7 +31,11 @@ all_sys_topics = [ '$SYS/broker/messages/publish/received', '$SYS/broker/messages/publish/sent', '$SYS/broker/messages/retained/count', - '$SYS/broker/messages/subscriptions/count' + '$SYS/broker/messages/subscriptions/count', + '$SYS/broker/heap/size', + '$SYS/broker/heap/maximum', + '$SYS/broker/cpu/percent', + '$SYS/broker/cpu/maximum', ] diff --git a/uv.lock b/uv.lock index 7c853df..8471502 100644 --- a/uv.lock +++ b/uv.lock @@ -13,6 +13,7 @@ version = "0.11.1rc0" source = { editable = "." } dependencies = [ { name = "passlib" }, + { name = "psutil" }, { name = "pyyaml" }, { name = "transitions" }, { name = "typer" }, @@ -67,6 +68,7 @@ docs = [ requires-dist = [ { name = "coveralls", marker = "extra == 'ci'", specifier = "==4.0.1" }, { name = "passlib", specifier = "==1.7.4" }, + { name = "psutil", specifier = ">=7.0.0" }, { name = "pyyaml", specifier = "==6.0.2" }, { name = "transitions", specifier = "==0.9.2" }, { name = "typer", specifier = "==0.15.4" },