kopia lustrzana https://github.com/Yakifo/amqtt
Merge pull request #234 from ajmirsky/issues/232
resolves Yakifo/amqtt#232 : adding system metrics about the brokerpull/237/head
commit
4c21339f7a
|
@ -1,4 +1,5 @@
|
|||
import asyncio
|
||||
from decimal import ROUND_HALF_UP, Decimal
|
||||
from struct import pack, unpack
|
||||
|
||||
from amqtt.adapters import ReaderAdapter
|
||||
|
@ -139,3 +140,10 @@ def int_to_bytes_str(value: int) -> bytes:
|
|||
:return: bytes array.
|
||||
"""
|
||||
return str(value).encode("utf-8")
|
||||
|
||||
|
||||
def float_to_bytes_str(value: float, places:int=3) -> bytes:
|
||||
"""Convert an float value to a bytes array containing the numeric character."""
|
||||
quant = Decimal(f"0.{''.join(['0' for i in range(places-1)])}1")
|
||||
rounded = Decimal(value).quantize(quant, rounding=ROUND_HALF_UP)
|
||||
return str(rounded).encode("utf-8")
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
import asyncio
|
||||
from collections import deque # pylint: disable=C0412
|
||||
from typing import SupportsIndex, SupportsInt, TypeAlias # pylint: disable=C0412
|
||||
from typing import Any, SupportsIndex, SupportsInt, TypeAlias # pylint: disable=C0412
|
||||
|
||||
import psutil
|
||||
|
||||
from amqtt.plugins.base import BasePlugin
|
||||
from amqtt.session import Session
|
||||
|
@ -26,7 +28,7 @@ except ImportError:
|
|||
|
||||
import amqtt
|
||||
from amqtt.broker import BrokerContext
|
||||
from amqtt.codecs_amqtt import int_to_bytes_str
|
||||
from amqtt.codecs_amqtt import float_to_bytes_str, int_to_bytes_str
|
||||
from amqtt.mqtt.packet import PUBLISH, MQTTFixedHeader, MQTTPacket, MQTTPayload, MQTTVariableHeader
|
||||
|
||||
DOLLAR_SYS_ROOT = "$SYS/broker/"
|
||||
|
@ -40,17 +42,35 @@ STAT_START_TIME = "start_time"
|
|||
STAT_CLIENTS_MAXIMUM = "clients_maximum"
|
||||
STAT_CLIENTS_CONNECTED = "clients_connected"
|
||||
STAT_CLIENTS_DISCONNECTED = "clients_disconnected"
|
||||
MEMORY_USAGE_MAXIMUM = "memory_maximum"
|
||||
CPU_USAGE_MAXIMUM = "cpu_usage_maximum"
|
||||
CPU_USAGE_LAST = "cpu_usage_last"
|
||||
|
||||
|
||||
PACKET: TypeAlias = MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader]
|
||||
|
||||
|
||||
def val_to_bytes_str(value: Any) -> bytes:
|
||||
"""Convert an int, float or string to byte string."""
|
||||
match value:
|
||||
case int():
|
||||
return int_to_bytes_str(value)
|
||||
case float():
|
||||
return float_to_bytes_str(value)
|
||||
case str():
|
||||
return value.encode("utf-8")
|
||||
case _:
|
||||
msg = f"Unsupported type {type(value)}"
|
||||
raise NotImplementedError(msg)
|
||||
|
||||
|
||||
class BrokerSysPlugin(BasePlugin[BrokerContext]):
|
||||
def __init__(self, context: BrokerContext) -> None:
|
||||
super().__init__(context)
|
||||
# Broker statistics initialization
|
||||
self._stats: dict[str, int] = {}
|
||||
self._sys_handle: asyncio.Handle | None = None
|
||||
self._current_process = psutil.Process()
|
||||
|
||||
def _clear_stats(self) -> None:
|
||||
"""Initialize broker statistics data structures."""
|
||||
|
@ -64,6 +84,8 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]):
|
|||
STAT_CLIENTS_DISCONNECTED,
|
||||
STAT_PUBLISH_RECEIVED,
|
||||
STAT_PUBLISH_SENT,
|
||||
MEMORY_USAGE_MAXIMUM,
|
||||
CPU_USAGE_MAXIMUM
|
||||
):
|
||||
self._stats[stat] = 0
|
||||
|
||||
|
@ -127,6 +149,14 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]):
|
|||
messages_stored += session.retained_messages_count
|
||||
messages_stored += len(self.context.retained_messages)
|
||||
subscriptions_count = sum(len(sub) for sub in self.context.subscriptions.values())
|
||||
self._stats[STAT_CLIENTS_MAXIMUM] = client_connected
|
||||
|
||||
cpu_usage = self._current_process.cpu_percent(interval=0)
|
||||
self._stats[CPU_USAGE_MAXIMUM] = max(self._stats[CPU_USAGE_MAXIMUM], cpu_usage)
|
||||
|
||||
mem_info_usage = self._current_process.memory_full_info()
|
||||
mem_size = mem_info_usage.rss / (1024 ** 2)
|
||||
self._stats[MEMORY_USAGE_MAXIMUM] = max(self._stats[MEMORY_USAGE_MAXIMUM], mem_size)
|
||||
|
||||
# Broadcast updates
|
||||
tasks: deque[asyncio.Task[None]] = deque()
|
||||
|
@ -150,9 +180,13 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]):
|
|||
"messages/publish/sent": self._stats[STAT_PUBLISH_SENT],
|
||||
"messages/retained/count": len(self.context.retained_messages),
|
||||
"messages/subscriptions/count": subscriptions_count,
|
||||
"heap/size": mem_size,
|
||||
"heap/maximum": self._stats[MEMORY_USAGE_MAXIMUM],
|
||||
"cpu/percent": cpu_usage,
|
||||
"cpu/maximum": self._stats[CPU_USAGE_MAXIMUM],
|
||||
}
|
||||
for stat_name, stat_value in stats.items():
|
||||
data: bytes = int_to_bytes_str(stat_value) if isinstance(stat_value, int) else stat_value.encode("utf-8")
|
||||
data: bytes = val_to_bytes_str(stat_value)
|
||||
tasks.append(self.schedule_broadcast_sys_topic(stat_name, data))
|
||||
|
||||
# Wait until broadcasting tasks end
|
||||
|
|
|
@ -33,6 +33,7 @@ dependencies = [
|
|||
"passlib==1.7.4", # https://pypi.org/project/passlib
|
||||
"PyYAML==6.0.2", # https://pypi.org/project/PyYAML
|
||||
"typer==0.15.4",
|
||||
"psutil>=7.0.0",
|
||||
]
|
||||
|
||||
[dependency-groups]
|
||||
|
|
|
@ -31,7 +31,11 @@ all_sys_topics = [
|
|||
'$SYS/broker/messages/publish/received',
|
||||
'$SYS/broker/messages/publish/sent',
|
||||
'$SYS/broker/messages/retained/count',
|
||||
'$SYS/broker/messages/subscriptions/count'
|
||||
'$SYS/broker/messages/subscriptions/count',
|
||||
'$SYS/broker/heap/size',
|
||||
'$SYS/broker/heap/maximum',
|
||||
'$SYS/broker/cpu/percent',
|
||||
'$SYS/broker/cpu/maximum',
|
||||
]
|
||||
|
||||
|
||||
|
|
2
uv.lock
2
uv.lock
|
@ -13,6 +13,7 @@ version = "0.11.1rc0"
|
|||
source = { editable = "." }
|
||||
dependencies = [
|
||||
{ name = "passlib" },
|
||||
{ name = "psutil" },
|
||||
{ name = "pyyaml" },
|
||||
{ name = "transitions" },
|
||||
{ name = "typer" },
|
||||
|
@ -67,6 +68,7 @@ docs = [
|
|||
requires-dist = [
|
||||
{ name = "coveralls", marker = "extra == 'ci'", specifier = "==4.0.1" },
|
||||
{ name = "passlib", specifier = "==1.7.4" },
|
||||
{ name = "psutil", specifier = ">=7.0.0" },
|
||||
{ name = "pyyaml", specifier = "==6.0.2" },
|
||||
{ name = "transitions", specifier = "==0.9.2" },
|
||||
{ name = "typer", specifier = "==0.15.4" },
|
||||
|
|
Ładowanie…
Reference in New Issue