Merge branch 'main' into alt_plugin_config

pull/240/head
Andrew Mirsky 2025-06-27 13:59:39 -04:00
commit fb0404321d
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: A98E67635CDF2C39
42 zmienionych plików z 758 dodań i 272 usunięć

1
.gitignore vendored
Wyświetl plik

@ -3,6 +3,7 @@
__pycache__
node_modules
.vite
*.pem
#------- Environment Files -------
.python-version

Wyświetl plik

@ -1,13 +1,13 @@
[![MIT licensed](https://img.shields.io/github/license/Yakifo/amqtt?style=plastic)](https://amqtt.readthedocs.io/en/latest/)
[![CI](https://github.com/Yakifo/amqtt/actions/workflows/ci.yml/badge.svg?branch=rc)](https://github.com/Yakifo/amqtt/actions/workflows/ci.yml)
[![CodeQL](https://github.com/Yakifo/amqtt/actions/workflows/codeql-analysis.yml/badge.svg)](https://github.com/Yakifo/amqtt/actions/workflows/codeql-analysis.yml)
[![Documentation Status](https://img.shields.io/readthedocs/amqtt?style=plastic&logo=readthedocs)](https://amqtt.readthedocs.io/en/latest/)
[![](https://dcbadge.limes.pink/api/server/https://discord.gg/S3sP6dDaF3?style=plastic)](https://discord.gg/S3sP6dDaF3)
[![Read the Docs](https://img.shields.io/readthedocs/amqtt/v0.11.0?style=plastic&logo=readthedocs)](https://amqtt.readthedocs.io/)
[![Discord](https://dcbadge.limes.pink/api/server/https://discord.gg/S3sP6dDaF3?style=plastic)](https://discord.gg/S3sP6dDaF3)
![Python Version](https://img.shields.io/pypi/pyversions/amqtt?style=plastic&logo=python&logoColor=yellow)
![Python Wheel](https://img.shields.io/pypi/wheel/amqtt?style=plastic)
[![PyPI](https://img.shields.io/pypi/v/amqtt?style=plastic&logo=python&logoColor=yellow)](https://pypi.org/project/amqtt/)
![docs/assets/amqtt.svg](docs/assets/amqtt.svg)
![docs/assets/amqtt.svg](https://raw.githubusercontent.com/Yakifo/amqtt/refs/tags/v0.11.0/docs/assets/amqtt.svg)
`aMQTT` is an open source [MQTT](http://www.mqtt.org) broker and client[^1], natively implemented with Python's [asyncio](https://docs.python.org/3/library/asyncio.html).
@ -50,13 +50,14 @@ Bug reports, patches and suggestions welcome! Just [open an issue](https://githu
## Python Version Compatibility
| Version | hbmqtt compatibility | Supported Python Versions | PyPi Release |
| ------- | -------------------- | ------------------------- | ------------ |
| 0.10.x | yes [^2] | 3.7 - 3.9 | 0.10.1 |
| 0.11.x | no [^3] | 3.10 - 3.13 | 0.11.0 |
| Version | hbmqtt compatibility | Supported Python Versions |
| ------- | -------------------- | ------------------------- |
| 0.10.x | yes [^2] | 3.7 - 3.9 |
| 0.11.x | no [^3] | 3.10 - 3.13 |
For a full feature roadmap, see ...
[^1]: Forked from [HBMQTT](https://github.com/beerfactory/hbmqtt) after it was deprecated by the original author.
[^2]: drop-in replacement
[^3]: module renamed and small API differences

Wyświetl plik

@ -1,3 +1,3 @@
"""INIT."""
__version__ = "0.11.0"
__version__ = "0.11.1"

Wyświetl plik

@ -3,7 +3,6 @@ from asyncio import CancelledError, futures
from collections import deque
from collections.abc import Generator
import copy
from enum import Enum
from functools import partial
import logging
from pathlib import Path
@ -23,6 +22,7 @@ from amqtt.adapters import (
WebSocketsWriter,
WriterAdapter,
)
from amqtt.contexts import Action, BaseContext
from amqtt.errors import AMQTTError, BrokerError, MQTTError, NoDataError
from amqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler
from amqtt.session import ApplicationMessage, OutgoingApplicationMessage, Session
@ -30,7 +30,6 @@ from amqtt.utils import format_client_message, gen_client_id, read_yaml_config
from .events import BrokerEvents
from .mqtt.disconnect import DisconnectPacket
from .plugins.contexts import BaseContext
from .plugins.manager import PluginManager
_CONFIG_LISTENER: TypeAlias = dict[str, int | bool | dict[str, Any]]
@ -45,13 +44,6 @@ DEFAULT_PORTS = {"tcp": 1883, "ws": 8883}
AMQTT_MAGIC_VALUE_RET_SUBSCRIBED = 0x80
class Action(Enum):
"""Actions issued by the broker."""
SUBSCRIBE = "subscribe"
PUBLISH = "publish"
class RetainedApplicationMessage(ApplicationMessage):
__slots__ = ("data", "qos", "source_session", "topic")

Wyświetl plik

@ -19,11 +19,11 @@ from amqtt.adapters import (
WebSocketsReader,
WebSocketsWriter,
)
from amqtt.contexts import BaseContext
from amqtt.errors import ClientError, ConnectError, ProtocolHandlerError
from amqtt.mqtt.connack import CONNECTION_ACCEPTED
from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
from amqtt.mqtt.protocol.client_handler import ClientProtocolHandler
from amqtt.plugins.contexts import BaseContext
from amqtt.plugins.manager import PluginManager
from amqtt.session import ApplicationMessage, OutgoingApplicationMessage, Session
from amqtt.utils import gen_client_id, read_yaml_config
@ -457,6 +457,8 @@ class MQTTClient:
# if not self._handler:
self._handler = ClientProtocolHandler(self.plugins_manager)
connection_timeout = self.config.get("connection_timeout", None)
if secure:
sc = ssl.create_default_context(
ssl.Purpose.SERVER_AUTH,
@ -464,10 +466,12 @@ class MQTTClient:
capath=self.session.capath,
cadata=self.session.cadata,
)
if "certfile" in self.config and "keyfile" in self.config:
sc.load_cert_chain(self.config["certfile"], self.config["keyfile"])
if "certfile" in self.config:
sc.load_verify_locations(cafile=self.config["certfile"])
if "check_hostname" in self.config and isinstance(self.config["check_hostname"], bool):
sc.check_hostname = self.config["check_hostname"]
sc.verify_mode = ssl.CERT_REQUIRED
kwargs["ssl"] = sc
try:
@ -477,21 +481,24 @@ class MQTTClient:
# Open connection
if scheme in ("mqtt", "mqtts"):
conn_reader, conn_writer = await asyncio.open_connection(
self.session.remote_address,
self.session.remote_port,
**kwargs,
)
conn_reader, conn_writer = await asyncio.wait_for(
asyncio.open_connection(
self.session.remote_address,
self.session.remote_port,
**kwargs,
), timeout=connection_timeout)
reader = StreamReaderAdapter(conn_reader)
writer = StreamWriterAdapter(conn_writer)
elif scheme in ("ws", "wss") and self.session.broker_uri:
websocket: ClientConnection = await websockets.connect(
self.session.broker_uri,
subprotocols=[websockets.Subprotocol("mqtt")],
additional_headers=self.additional_headers,
**kwargs,
)
websocket: ClientConnection = await asyncio.wait_for(
websockets.connect(
self.session.broker_uri,
subprotocols=[websockets.Subprotocol("mqtt")],
additional_headers=self.additional_headers,
**kwargs,
), timeout=connection_timeout)
reader = WebSocketsReader(websocket)
writer = WebSocketsWriter(websocket)
elif not self.session.broker_uri:

Wyświetl plik

@ -1,4 +1,5 @@
import asyncio
from decimal import ROUND_HALF_UP, Decimal
from struct import pack, unpack
from amqtt.adapters import ReaderAdapter
@ -139,3 +140,10 @@ def int_to_bytes_str(value: int) -> bytes:
:return: bytes array.
"""
return str(value).encode("utf-8")
def float_to_bytes_str(value: float, places:int=3) -> bytes:
"""Convert an float value to a bytes array containing the numeric character."""
quant = Decimal(f"0.{''.join(['0' for i in range(places-1)])}1")
rounded = Decimal(value).quantize(quant, rounding=ROUND_HALF_UP)
return str(rounded).encode("utf-8")

Wyświetl plik

@ -1,3 +1,4 @@
from enum import Enum
import logging
from typing import TYPE_CHECKING, Any
@ -11,3 +12,10 @@ class BaseContext:
self.loop: asyncio.AbstractEventLoop | None = None
self.logger: logging.Logger = _LOGGER
self.config: dict[str, Any] | None = None
class Action(Enum):
"""Actions issued by the broker."""
SUBSCRIBE = "subscribe"
PUBLISH = "publish"

Wyświetl plik

@ -20,6 +20,7 @@ import logging
from typing import Generic, TypeVar, cast
from amqtt.adapters import ReaderAdapter, WriterAdapter
from amqtt.contexts import BaseContext
from amqtt.errors import AMQTTError, MQTTError, NoDataError, ProtocolHandlerError
from amqtt.events import MQTTEvents
from amqtt.mqtt import packet_class
@ -57,7 +58,6 @@ from amqtt.mqtt.suback import SubackPacket
from amqtt.mqtt.subscribe import SubscribePacket
from amqtt.mqtt.unsuback import UnsubackPacket
from amqtt.mqtt.unsubscribe import UnsubscribePacket
from amqtt.plugins.contexts import BaseContext
from amqtt.plugins.manager import PluginManager
from amqtt.session import INCOMING, OUTGOING, ApplicationMessage, IncomingApplicationMessage, OutgoingApplicationMessage, Session
@ -154,7 +154,7 @@ class ProtocolHandler(Generic[C]):
except asyncio.CancelledError:
# canceling the task is the expected result
self.logger.debug("Writer close was cancelled.")
except TimeoutError:
except asyncio.TimeoutError:
self.logger.debug("Writer close operation timed out.", exc_info=True)
except OSError:
self.logger.debug("Writer close failed due to I/O error.", exc_info=True)
@ -322,7 +322,7 @@ class ProtocolHandler(Generic[C]):
self._puback_waiters[app_message.packet_id] = waiter
try:
app_message.puback_packet = await asyncio.wait_for(waiter, timeout=5)
except TimeoutError:
except asyncio.TimeoutError:
msg = f"Timeout waiting for PUBACK for packet ID {app_message.packet_id}"
self.logger.warning(msg)
raise TimeoutError(msg) from None
@ -529,7 +529,7 @@ class ProtocolHandler(Generic[C]):
except asyncio.CancelledError:
self.logger.debug("Task cancelled, reader loop ending")
break
except TimeoutError:
except asyncio.TimeoutError:
self.logger.debug(f"{self.session.client_id} Input stream read timeout")
self.handle_read_timeout()
except NoDataError:

Wyświetl plik

@ -1,14 +1,10 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar
from typing import Any, Generic, TypeVar
from amqtt.plugins.contexts import BaseContext
from amqtt.contexts import Action, BaseContext
from amqtt.session import Session
C = TypeVar("C", bound=BaseContext)
if TYPE_CHECKING:
from amqtt.broker import Action
class BasePlugin(Generic[C]):
"""The base from which all plugins should inherit."""
@ -30,31 +26,6 @@ class BasePlugin(Generic[C]):
async def close(self) -> None:
"""Override if plugin needs to clean up resources upon shutdown."""
@dataclass
class Config:
"""Override to define the configuration and defaults for plugin."""
class BaseAuthPlugin(BasePlugin[BaseContext]):
"""Base class for authentication plugins."""
def __init__(self, context: BaseContext) -> None:
super().__init__(context)
self.auth_config: dict[str, Any] | None = self._get_config_section("auth")
async def authenticate(self, *, session: Session) -> bool | None:
"""Logic for session authentication.
Args:
session: amqtt.session.Session
Returns:
- `True` if user is authentication succeed, `False` if user authentication fails
- `None` if authentication can't be achieved (then plugin result is then ignored)
"""
return bool(self.auth_config)
class BaseTopicPlugin(BasePlugin[BaseContext]):
"""Base class for topic plugins."""
@ -63,9 +34,11 @@ class BaseTopicPlugin(BasePlugin[BaseContext]):
super().__init__(context)
self.topic_config: dict[str, Any] | None = self._get_config_section("topic-check")
if self.topic_config is None:
self.context.logger.warning("'topic-check' section not found in context configuration")
async def topic_filtering(
self, *, session: Session | None = None, topic: str | None = None, action: Optional["Action"] = None
self, *, session: Session | None = None, topic: str | None = None, action: Action | None = None
) -> bool:
"""Logic for filtering out topics.
@ -78,4 +51,36 @@ class BaseTopicPlugin(BasePlugin[BaseContext]):
bool: `True` if topic is allowed, `False` otherwise
"""
return bool(self.topic_config)
if not self.topic_config:
# auth config section not found
self.context.logger.warning("'topic-check' section not found in context configuration")
return False
return True
class BaseAuthPlugin(BasePlugin[BaseContext]):
"""Base class for authentication plugins."""
def __init__(self, context: BaseContext) -> None:
super().__init__(context)
self.auth_config: dict[str, Any] | None = self._get_config_section("auth")
if not self.auth_config:
self.context.logger.warning("'auth' section not found in context configuration")
async def authenticate(self, *, session: Session) -> bool | None:
"""Logic for session authentication.
Args:
session: amqtt.session.Session
Returns:
- `True` if user is authentication succeed, `False` if user authentication fails
- `None` if authentication can't be achieved (then plugin result is then ignored)
"""
if not self.auth_config:
# auth config section not found
self.context.logger.warning("'auth' section not found in context configuration")
return False
return True

Wyświetl plik

@ -3,11 +3,11 @@ from functools import partial
import logging
from typing import Any, TypeAlias
from amqtt.contexts import BaseContext
from amqtt.events import BrokerEvents
from amqtt.mqtt import MQTTPacket
from amqtt.mqtt.packet import MQTTFixedHeader, MQTTPayload, MQTTVariableHeader
from amqtt.plugins.base import BasePlugin
from amqtt.plugins.contexts import BaseContext
from amqtt.session import Session
PACKET: TypeAlias = MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader]

Wyświetl plik

@ -8,20 +8,17 @@ import copy
from importlib.metadata import EntryPoint, EntryPoints, entry_points
from inspect import iscoroutinefunction
import logging
from typing import TYPE_CHECKING, Any, Generic, NamedTuple, Optional, TypeAlias, TypeVar, cast
from typing import Any, Generic, NamedTuple, Optional, TypeAlias, TypeVar, cast
from dacite import Config as DaciteConfig, DaciteError, from_dict
from amqtt.contexts import Action, BaseContext
from amqtt.errors import PluginCoroError, PluginImportError, PluginInitError, PluginLoadError
from amqtt.events import BrokerEvents, Events, MQTTEvents
from amqtt.plugins.base import BaseAuthPlugin, BasePlugin, BaseTopicPlugin
from amqtt.plugins.contexts import BaseContext
from amqtt.session import Session
from amqtt.utils import import_string
if TYPE_CHECKING:
from amqtt.broker import Action
class Plugin(NamedTuple):
name: str
@ -109,8 +106,8 @@ class PluginManager(Generic[C]):
topic_filter_list = []
if self.app_context.config and "auth" in self.app_context.config:
auth_filter_list = self.app_context.config["auth"].get("plugins", [])
if self.app_context.config and "topic" in self.app_context.config:
topic_filter_list = self.app_context.config["topic"].get("plugins", [])
if self.app_context.config and "topic-check" in self.app_context.config:
topic_filter_list = self.app_context.config["topic-check"].get("plugins", [])
ep: EntryPoints | list[EntryPoint] = []
if hasattr(entry_points(), "select"):

Wyświetl plik

@ -2,7 +2,7 @@ import json
import sqlite3
from typing import Any
from amqtt.plugins.contexts import BaseContext
from amqtt.contexts import BaseContext
from amqtt.session import Session

Wyświetl plik

@ -1,6 +1,8 @@
import asyncio
from collections import deque # pylint: disable=C0412
from typing import SupportsIndex, SupportsInt, TypeAlias # pylint: disable=C0412
from typing import Any, SupportsIndex, SupportsInt, TypeAlias # pylint: disable=C0412
import psutil
from amqtt.plugins.base import BasePlugin
from amqtt.session import Session
@ -26,7 +28,7 @@ except ImportError:
import amqtt
from amqtt.broker import BrokerContext
from amqtt.codecs_amqtt import int_to_bytes_str
from amqtt.codecs_amqtt import float_to_bytes_str, int_to_bytes_str
from amqtt.mqtt.packet import PUBLISH, MQTTFixedHeader, MQTTPacket, MQTTPayload, MQTTVariableHeader
DOLLAR_SYS_ROOT = "$SYS/broker/"
@ -40,17 +42,35 @@ STAT_START_TIME = "start_time"
STAT_CLIENTS_MAXIMUM = "clients_maximum"
STAT_CLIENTS_CONNECTED = "clients_connected"
STAT_CLIENTS_DISCONNECTED = "clients_disconnected"
MEMORY_USAGE_MAXIMUM = "memory_maximum"
CPU_USAGE_MAXIMUM = "cpu_usage_maximum"
CPU_USAGE_LAST = "cpu_usage_last"
PACKET: TypeAlias = MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader]
def val_to_bytes_str(value: Any) -> bytes:
"""Convert an int, float or string to byte string."""
match value:
case int():
return int_to_bytes_str(value)
case float():
return float_to_bytes_str(value)
case str():
return value.encode("utf-8")
case _:
msg = f"Unsupported type {type(value)}"
raise NotImplementedError(msg)
class BrokerSysPlugin(BasePlugin[BrokerContext]):
def __init__(self, context: BrokerContext) -> None:
super().__init__(context)
# Broker statistics initialization
self._stats: dict[str, int] = {}
self._sys_handle: asyncio.Handle | None = None
self._current_process = psutil.Process()
def _clear_stats(self) -> None:
"""Initialize broker statistics data structures."""
@ -64,6 +84,8 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]):
STAT_CLIENTS_DISCONNECTED,
STAT_PUBLISH_RECEIVED,
STAT_PUBLISH_SENT,
MEMORY_USAGE_MAXIMUM,
CPU_USAGE_MAXIMUM
):
self._stats[stat] = 0
@ -127,6 +149,14 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]):
messages_stored += session.retained_messages_count
messages_stored += len(self.context.retained_messages)
subscriptions_count = sum(len(sub) for sub in self.context.subscriptions.values())
self._stats[STAT_CLIENTS_MAXIMUM] = client_connected
cpu_usage = self._current_process.cpu_percent(interval=0)
self._stats[CPU_USAGE_MAXIMUM] = max(self._stats[CPU_USAGE_MAXIMUM], cpu_usage)
mem_info_usage = self._current_process.memory_full_info()
mem_size = mem_info_usage.rss / (1024 ** 2)
self._stats[MEMORY_USAGE_MAXIMUM] = max(self._stats[MEMORY_USAGE_MAXIMUM], mem_size)
# Broadcast updates
tasks: deque[asyncio.Task[None]] = deque()
@ -150,9 +180,13 @@ class BrokerSysPlugin(BasePlugin[BrokerContext]):
"messages/publish/sent": self._stats[STAT_PUBLISH_SENT],
"messages/retained/count": len(self.context.retained_messages),
"messages/subscriptions/count": subscriptions_count,
"heap/size": mem_size,
"heap/maximum": self._stats[MEMORY_USAGE_MAXIMUM],
"cpu/percent": cpu_usage,
"cpu/maximum": self._stats[CPU_USAGE_MAXIMUM],
}
for stat_name, stat_value in stats.items():
data: bytes = int_to_bytes_str(stat_value) if isinstance(stat_value, int) else stat_value.encode("utf-8")
data: bytes = val_to_bytes_str(stat_value)
tasks.append(self.schedule_broadcast_sys_topic(stat_name, data))
# Wait until broadcasting tasks end

Wyświetl plik

@ -1,8 +1,7 @@
from typing import Any
from amqtt.broker import Action
from amqtt.contexts import Action, BaseContext
from amqtt.plugins.base import BaseTopicPlugin
from amqtt.plugins.contexts import BaseContext
from amqtt.session import Session

Wyświetl plik

@ -1,5 +1,12 @@
# Changelog
## 0.11.1
- [PR #226](https://github.com/Yakifo/amqtt/pull/226) Consolidate super classes for plugins
- [PR #227](https://github.com/Yakifo/amqtt/pull/227) Update sample files
- [PR #229](https://github.com/Yakifo/amqtt/pull/229) & [PR #228](https://github.com/Yakifo/amqtt/pull/228) Broken pypi and test.amqtt.io links
- [PR #232](https://github.com/Yakifo/amqtt/pull/234) $SYS additions for cpu & mem.
## 0.11.0
- upgrades to support python 3.10, 3.11, 3.12 and 3.13

Wyświetl plik

@ -58,7 +58,7 @@ auth:
These plugins should subclass from `BaseAuthPlugin` and implement the `authenticate` method.
::: amqtt.plugins.authentication.BaseAuthPlugin
::: amqtt.plugins.base.BaseAuthPlugin
## Topic Filter Plugins
@ -75,4 +75,4 @@ topic-check:
These plugins should subclass from `BaseTopicPlugin` and implement the `topic_filtering` method.
::: amqtt.plugins.topic_checking.BaseTopicPlugin
::: amqtt.plugins.base.BaseTopicPlugin

Wyświetl plik

@ -0,0 +1,8 @@
{% extends "base.html" %}
{% block outdated %}
You're not viewing the latest version.
<a href="{{ '../' ~ base_url }}">
<strong>Click here to go to latest.</strong>
</a>
{% endblock %}

Wyświetl plik

@ -25,6 +25,11 @@ Default retain value to messages published. Defaults to `false`.
Enable or disable auto-reconnect if connection with the broker is interrupted. Defaults to `false`.
### `connect_timeout` *(int)*
If specified, the number of seconds before a connection times out
### `reconnect_retries` *(int)*
Maximum reconnection retries. Defaults to `2`. Negative value will cause client to reconnect infinitely.
@ -63,8 +68,15 @@ TLS certificates used to verify the broker's authenticity.
- `cafile` *(string)*: Path to a file of concatenated CA certificates in PEM format. See [Certificates](https://docs.python.org/3/library/ssl.html#ssl-certificates) for more info.
- `capath` *(string)*: Path to a directory containing several CA certificates in PEM format, following an [OpenSSL specific layout](https://docs.openssl.org/master/man3/SSL_CTX_load_verify_locations/).
- `cadata` *(string)*: Either an ASCII string of one or more PEM-encoded certificates or a bytes-like object of DER-encoded certificates.
-
-
### `certfile` *(string)*
Path to a single file in PEM format containing the certificate as well as any number of CA certificates needed to establish the server certificate's authenticity.
### `check_hostname` *(bool)*
Bypass ssl host certificate verification, allowing self-signed certificates
## Default Configuration

Wyświetl plik

@ -20,6 +20,7 @@ export default function MainGrid() {
const [received, setReceived] = useState<DataPoint[]>([]);
const [bytesIn, setBytesIn] = useState<DataPoint[]>([]);
const [bytesOut, setBytesOut] = useState<DataPoint[]>([]);
const [clientsConnected, setClientsConnected] = useState<DataPoint[]>([]);
const [serverStart, setServerStart] = useState<string>('');
const [serverUptime, setServerUptime] = useState<string>('');
@ -63,6 +64,7 @@ export default function MainGrid() {
mqttSubscribe('$SYS/broker/load/bytes/#');
mqttSubscribe('$SYS/broker/uptime/formatted');
mqttSubscribe('$SYS/broker/uptime');
mqttSubscribe('$SYS/broker/clients/connected');
}
}, [isConnected, mqttSubscribe]);
@ -97,6 +99,13 @@ export default function MainGrid() {
value: d
}
setBytesOut(bytesOut => [...bytesOut, newPoint]);
} else if (payload.topic === '$SYS/broker/clients/connected') {
const newPoint: DataPoint = {
timestamp: new Date().toISOString(),
value: d
}
setClientsConnected(clientsConnected => [...clientsConnected, newPoint]);
} else if (payload.topic === '$SYS/broker/uptime/formatted') {
const dt = new Date(d + "Z");
setServerStart(dt.toLocaleString());
@ -138,7 +147,7 @@ export default function MainGrid() {
<td style={{width: 250}}>
<p style={{textAlign: 'left'}}>
<FontAwesomeIcon icon={faGithub} size="xl"/> github: <a
href="https://github.com/Yakofo/amqtt">Yakifo/amqtt</a>
href="https://github.com/Yakifo/amqtt">Yakifo/amqtt</a>
</p>
<p style={{textAlign: 'left'}}>
<FontAwesomeIcon icon={faPython} size="xl"/> PyPi: <a
@ -237,6 +246,9 @@ export default function MainGrid() {
<Grid size={{xs: 12, md: 6}}>
<SessionsChart title={'Bytes In'} label={'Bytes'} data={bytesIn} isConnected={isConnected}/>
</Grid>
<Grid size={{xs: 12, md: 6}}>
<SessionsChart title={'Clients Connected'} label={''} data={clientsConnected} isConnected={isConnected}/>
</Grid>
</Grid>
<Grid container spacing={2} columns={12}>

Wyświetl plik

@ -20,7 +20,7 @@ classifiers = [
"Programming Language :: Python :: 3.13"
]
version = "0.11.0"
version = "0.11.1"
requires-python = ">=3.10.0"
readme = "README.md"
license = { text = "MIT" }
@ -34,6 +34,7 @@ dependencies = [
"PyYAML==6.0.2", # https://pypi.org/project/PyYAML
"typer==0.15.4",
"dacite>=1.9.2",
"psutil>=7.0.0",
]
[dependency-groups]

Wyświetl plik

@ -4,6 +4,10 @@ import os
from amqtt.broker import Broker
"""
This sample shows how to run a broker with the topic check acl plugin
"""
logger = logging.getLogger(__name__)
config = {
@ -38,15 +42,43 @@ config = {
},
}
broker = Broker(config)
async def main_loop():
broker = Broker(config)
try:
await broker.start()
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
await broker.shutdown()
async def test_coro() -> None:
await broker.start()
async def main() -> None:
t = asyncio.create_task(main_loop())
try:
await t
except asyncio.CancelledError:
pass
if __name__ == "__main__":
def __main__():
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())
asyncio.get_event_loop().run_forever()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = loop.create_task(main())
try:
loop.run_until_complete(task)
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received. Stopping server...")
task.cancel()
loop.run_until_complete(task) # Ensure task finishes cleanup
finally:
logger.info("Server stopped.")
loop.close()
if __name__ == "__main__":
__main__()

Wyświetl plik

@ -1,21 +1,31 @@
import asyncio
import logging
from asyncio import CancelledError
from amqtt.broker import Broker
broker = Broker()
"""
This sample shows how to run a broker
"""
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
async def test_coro() -> None:
await broker.start()
async def run_server() -> None:
broker = Broker()
try:
await broker.start()
while True:
await asyncio.sleep(1)
except CancelledError:
await broker.shutdown()
def __main__():
try:
asyncio.run(run_server())
except KeyboardInterrupt:
print("Server exiting...")
if __name__ == "__main__":
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())
try:
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
asyncio.get_event_loop().run_until_complete(broker.shutdown())
__main__()

Wyświetl plik

@ -4,6 +4,10 @@ import os
from amqtt.broker import Broker
"""
This sample shows how to run a broker without stacktraces on keyboard interrupt
"""
logger = logging.getLogger(__name__)
config = {
@ -30,19 +34,41 @@ config = {
"topic-check": {"enabled": False},
}
broker = Broker(config)
async def main_loop():
broker = Broker(config)
try:
await broker.start()
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
await broker.shutdown()
async def main():
t = asyncio.create_task(main_loop())
try:
await t
except asyncio.CancelledError:
pass
async def test_coro() -> None:
await broker.start()
# await asyncio.sleep(5)
# await broker.shutdown()
def __main__():
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = loop.create_task(main())
try:
loop.run_until_complete(task)
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received. Stopping server...")
task.cancel()
loop.run_until_complete(task) # Ensure task finishes cleanup
finally:
logger.info("Server stopped.")
loop.close()
if __name__ == "__main__":
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
# formatter = "%(asctime)s :: %(levelname)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())
asyncio.get_event_loop().run_forever()
__main__()

Wyświetl plik

@ -4,6 +4,10 @@ import os
from amqtt.broker import Broker
"""
This sample shows how to run a broker with the topic check taboo plugin
"""
logger = logging.getLogger(__name__)
config = {
@ -30,15 +34,44 @@ config = {
"topic-check": {"enabled": True, "plugins": ["topic_taboo"]},
}
broker = Broker(config)
async def main_loop():
broker = Broker(config)
try:
await broker.start()
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
await broker.shutdown()
async def test_coro() -> None:
await broker.start()
async def main():
t = asyncio.create_task(main_loop())
try:
await t
except asyncio.CancelledError:
pass
if __name__ == "__main__":
def __main__():
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())
asyncio.get_event_loop().run_forever()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = loop.create_task(main())
try:
loop.run_until_complete(task)
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received. Stopping server...")
task.cancel()
loop.run_until_complete(task) # Ensure task finishes cleanup
finally:
logger.info("Server stopped.")
loop.close()
if __name__ == "__main__":
__main__()

Wyświetl plik

@ -1,13 +1,12 @@
import asyncio
import logging
from asyncio import CancelledError
from amqtt.client import MQTTClient
#
# This sample shows a client running idle.
# Meanwhile, keepalive is managed through PING messages sent every 5 seconds
#
"""
This sample shows how to run an idle client
"""
logger = logging.getLogger(__name__)
@ -15,17 +14,26 @@ config = {
"keep_alive": 5,
"ping_delay": 1,
}
C = MQTTClient(config=config)
async def main() -> None:
client = MQTTClient(config=config)
try:
await client.connect("mqtt://test.mosquitto.org:1883/")
logger.info("client connected")
await asyncio.sleep(15)
except CancelledError:
pass
await client.disconnect()
async def test_coro() -> None:
await C.connect("mqtt://test.mosquitto.org:1883/")
await asyncio.sleep(18)
def __main__():
await C.disconnect()
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.run(main())
if __name__ == "__main__":
formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())
__main__()

Wyświetl plik

@ -4,10 +4,9 @@ import logging
from amqtt.client import ConnectError, MQTTClient
from amqtt.mqtt.constants import QOS_1, QOS_2
#
# This sample shows how to publish messages to broker using different QOS
# Debug outputs shows the message flows
#
"""
This sample shows how to publish messages to broker using different QOS
"""
logger = logging.getLogger(__name__)
@ -15,42 +14,45 @@ config = {
"will": {
"topic": "/will/client",
"message": b"Dead or alive",
"qos": 0x01,
"qos": QOS_1,
"retain": True,
},
}
async def test_coro() -> None:
C = MQTTClient()
await C.connect("mqtt://test.mosquitto.org/")
async def test_coro1() -> None:
client = MQTTClient()
await client.connect("mqtt://localhost:1883/")
tasks = [
asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_0")),
asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)),
asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)),
asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")),
asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)),
asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)),
]
await asyncio.wait(tasks)
logger.info("messages published")
await C.disconnect()
logger.info("test_coro1 messages published")
await client.disconnect()
async def test_coro2() -> None:
try:
C = MQTTClient()
await C.connect("mqtt://test.mosquitto.org:1883/")
await C.publish("a/b", b"TEST MESSAGE WITH QOS_0", qos=0x00)
await C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=0x01)
await C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=0x02)
logger.info("messages published")
await C.disconnect()
client = MQTTClient(config={'auto_reconnect': False, 'connection_timeout': 1})
await client.connect("mqtt://localhost:1884/")
await client.publish("a/b", b"TEST MESSAGE WITH QOS_0", qos=0x00)
await client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=0x01)
await client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=0x02)
logger.info("test_coro2 messages published")
await client.disconnect()
except ConnectError:
logger.exception(f"Connection failed", exc_info=True)
asyncio.get_event_loop().stop()
logger.info(f"Connection failed", exc_info=True)
def __main__():
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.run(test_coro1())
asyncio.run(test_coro2())
if __name__ == "__main__":
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
formatter = "%(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())
asyncio.get_event_loop().run_until_complete(test_coro2())
__main__()

Wyświetl plik

@ -2,37 +2,41 @@ import asyncio
import logging
from amqtt.client import ConnectError, MQTTClient
from amqtt.mqtt.constants import QOS_1
#
# This sample shows how to publish messages to broker using different QOS
# Debug outputs shows the message flows
#
"""
This sample shows how to publish messages to broker running either `samples/broker_acl.py`
or `samples/broker_taboo.py`.
"""
logger = logging.getLogger(__name__)
async def test_coro() -> None:
try:
C = MQTTClient()
await C.connect("mqtt://0.0.0.0:1883")
await C.publish("data/classified", b"TOP SECRET", qos=0x01)
await C.publish("data/memes", b"REAL FUN", qos=0x01)
await C.publish("repositories/amqtt/master", b"NEW STABLE RELEASE", qos=0x01)
await C.publish(
client = MQTTClient()
await client.connect("mqtt://0.0.0.0:1883")
await client.publish("data/classified", b"TOP SECRET", qos=QOS_1)
await client.publish("data/memes", b"REAL FUN", qos=QOS_1)
await client.publish("repositories/amqtt/master", b"NEW STABLE RELEASE", qos=QOS_1)
await client.publish(
"repositories/amqtt/devel",
b"THIS NEEDS TO BE CHECKED",
qos=0x01,
qos=QOS_1,
)
await C.publish("calendar/amqtt/releases", b"NEW RELEASE", qos=0x01)
await client.publish("calendar/amqtt/releases", b"NEW RELEASE", qos=QOS_1)
logger.info("messages published")
await C.disconnect()
await client.disconnect()
except ConnectError as ce:
logger.exception("Connection failed")
asyncio.get_event_loop().stop()
logger.exception("ERROR: Connection failed")
def __main__():
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.run(test_coro())
if __name__ == "__main__":
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
formatter = "%(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())
__main__()

Wyświetl plik

@ -1,41 +1,51 @@
import asyncio
import logging
from pathlib import Path
from amqtt.client import MQTTClient
from amqtt.mqtt.constants import QOS_1, QOS_2
#
# This sample shows how to publish messages to broker using different QOS
# Debug outputs shows the message flows
#
"""
This sample shows how to publish messages to secure broker.
Use `openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout key.pem -out cert.pem -subj "/CN=localhost"` to
generate a self-signed certificate for the broker to use.
"""
logger = logging.getLogger(__name__)
config = {
"will": {
"topic": "/will/client",
"message": b"Dead or alive",
"qos": 0x01,
"message": "Dead or alive",
"qos": QOS_1,
"retain": True,
},
"auto_reconnect": False,
"check_hostname": False,
"certfile": "cert.pem",
}
C = MQTTClient(config=config)
# C = MQTTClient()
client = MQTTClient(config=config)
async def test_coro() -> None:
await C.connect("mqtts://test.mosquitto.org/", cafile="mosquitto.org.crt")
await client.connect("mqtts://localhost:8883")
tasks = [
asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_0")),
asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)),
asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)),
asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")),
asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)),
asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)),
]
await asyncio.wait(tasks)
logger.info("messages published")
await C.disconnect()
await client.disconnect()
if __name__ == "__main__":
def __main__():
formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())
asyncio.run(test_coro())
if __name__ == "__main__":
__main__()

Wyświetl plik

@ -4,39 +4,39 @@ import logging
from amqtt.client import MQTTClient
from amqtt.mqtt.constants import QOS_1, QOS_2
#
# This sample shows how to publish messages to broker using different QOS
# Debug outputs shows the message flows
#
"""
This sample shows how to publish messages to secure websocket broker
"""
logger = logging.getLogger(__name__)
config = {
"will": {
"topic": "/will/client",
"message": b"Dead or alive",
"qos": 0x01,
"message": "Dead or alive",
"qos": QOS_1,
"retain": True,
},
"capath": ".",
}
}
C = MQTTClient(config=config)
# C = MQTTClient()
client = MQTTClient(config=config)
async def test_coro() -> None:
await C.connect("wss://test.mosquitto.org:8081/", cafile="mosquitto.org.crt")
await client.connect("ws://localhost:8080/")
tasks = [
asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_0")),
asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)),
asyncio.ensure_future(C.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)),
asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")),
asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)),
asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=QOS_2)),
]
await asyncio.wait(tasks)
logger.info("messages published")
await C.disconnect()
await client.disconnect()
if __name__ == "__main__":
def __main__():
formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())
asyncio.run(test_coro())
if __name__ == "__main__":
__main__()

Wyświetl plik

@ -4,20 +4,19 @@ import logging
from amqtt.client import ClientError, MQTTClient
from amqtt.mqtt.constants import QOS_1, QOS_2
#
# This sample shows how to subscbribe a topic and receive data from incoming messages
# It subscribes to '$SYS/broker/uptime' topic and displays the first ten values returned
# by the broker.
#
"""
This sample shows how to subscribe to different $SYS topics and how to receive incoming messages
"""
logger = logging.getLogger(__name__)
async def uptime_coro() -> None:
C = MQTTClient()
await C.connect("mqtt://test.mosquitto.org/")
# Subscribe to '$SYS/broker/uptime' with QOS=1
await C.subscribe(
client = MQTTClient()
await client.connect("mqtt://test.mosquitto.org/")
await client.subscribe(
[
("$SYS/broker/uptime", QOS_1),
("$SYS/broker/load/#", QOS_2),
@ -25,16 +24,20 @@ async def uptime_coro() -> None:
)
logger.info("Subscribed")
try:
for _i in range(1, 100):
await C.deliver_message()
await C.unsubscribe(["$SYS/broker/uptime", "$SYS/broker/load/#"])
for _i in range(1, 10):
if msg := await client.deliver_message():
logger.info(f"{msg.topic} >> {msg.data.decode()}")
await client.unsubscribe(["$SYS/broker/uptime", "$SYS/broker/load/#"])
logger.info("UnSubscribed")
await C.disconnect()
await client.disconnect()
except ClientError:
logger.exception("Client exception")
if __name__ == "__main__":
def __main__():
formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.get_event_loop().run_until_complete(uptime_coro())
asyncio.run(uptime_coro())
if __name__ == "__main__":
__main__()

Wyświetl plik

@ -4,41 +4,45 @@ import logging
from amqtt.client import ClientError, MQTTClient
from amqtt.mqtt.constants import QOS_1
#
# This sample shows how to subscbribe a topic and receive data from incoming messages
# It subscribes to '$SYS/broker/uptime' topic and displays the first ten values returned
# by the broker.
#
"""
Run `samples/broker_acl.py` or `samples/broker_taboo.py`
This sample shows how to subscribe to different topics, some of which are allowed.
"""
logger = logging.getLogger(__name__)
async def uptime_coro() -> None:
C = MQTTClient()
await C.connect("mqtt://test:test@0.0.0.0:1883")
# await C.connect('mqtt://0.0.0.0:1883')
# Subscribe to '$SYS/broker/uptime' with QOS=1
await C.subscribe(
client = MQTTClient()
await client.connect("mqtt://test:test@0.0.0.0:1883")
result = await client.subscribe(
[
("$SYS/#", QOS_1), # Topic forbidden when running `broker_acl.py`
("data/memes", QOS_1), # Topic allowed
("data/classified", QOS_1), # Topic forbidden
("repositories/amqtt/master", QOS_1), # Topic allowed
("repositories/amqtt/devel", QOS_1), # Topic forbidden
("repositories/amqtt/devel", QOS_1), # Topic forbidden when running `broker_acl.py`
("calendar/amqtt/releases", QOS_1), # Topic allowed
],
)
logger.info("Subscribed")
logger.info(f"Subscribed results: {result}")
try:
for _i in range(1, 100):
await C.deliver_message()
await C.unsubscribe(["$SYS/broker/uptime", "$SYS/broker/load/#"])
if msg := await client.deliver_message():
logger.info(f"{msg.topic} >> {msg.data.decode()}")
await client.unsubscribe(["$SYS/#", "data/memes"])
logger.info("UnSubscribed")
await C.disconnect()
await client.disconnect()
except ClientError as ce:
logger.exception("Client exception")
if __name__ == "__main__":
def __main__():
formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.get_event_loop().run_until_complete(uptime_coro())
if __name__ == "__main__":
__main__()

Wyświetl plik

@ -1,18 +1,24 @@
-----BEGIN CERTIFICATE-----
MIIC8DCCAlmgAwIBAgIJAOD63PlXjJi8MA0GCSqGSIb3DQEBBQUAMIGQMQswCQYD
VQQGEwJHQjEXMBUGA1UECAwOVW5pdGVkIEtpbmdkb20xDjAMBgNVBAcMBURlcmJ5
MRIwEAYDVQQKDAlNb3NxdWl0dG8xCzAJBgNVBAsMAkNBMRYwFAYDVQQDDA1tb3Nx
dWl0dG8ub3JnMR8wHQYJKoZIhvcNAQkBFhByb2dlckBhdGNob28ub3JnMB4XDTEy
MDYyOTIyMTE1OVoXDTIyMDYyNzIyMTE1OVowgZAxCzAJBgNVBAYTAkdCMRcwFQYD
VQQIDA5Vbml0ZWQgS2luZ2RvbTEOMAwGA1UEBwwFRGVyYnkxEjAQBgNVBAoMCU1v
c3F1aXR0bzELMAkGA1UECwwCQ0ExFjAUBgNVBAMMDW1vc3F1aXR0by5vcmcxHzAd
BgkqhkiG9w0BCQEWEHJvZ2VyQGF0Y2hvby5vcmcwgZ8wDQYJKoZIhvcNAQEBBQAD
gY0AMIGJAoGBAMYkLmX7SqOT/jJCZoQ1NWdCrr/pq47m3xxyXcI+FLEmwbE3R9vM
rE6sRbP2S89pfrCt7iuITXPKycpUcIU0mtcT1OqxGBV2lb6RaOT2gC5pxyGaFJ+h
A+GIbdYKO3JprPxSBoRponZJvDGEZuM3N7p3S/lRoi7G5wG5mvUmaE5RAgMBAAGj
UDBOMB0GA1UdDgQWBBTad2QneVztIPQzRRGj6ZHKqJTv5jAfBgNVHSMEGDAWgBTa
d2QneVztIPQzRRGj6ZHKqJTv5jAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUA
A4GBAAqw1rK4NlRUCUBLhEFUQasjP7xfFqlVbE2cRy0Rs4o3KS0JwzQVBwG85xge
REyPOFdGdhBY2P1FNRy0MDr6xr+D2ZOwxs63dG1nnAnWZg7qwoLgpZ4fESPD3PkA
1ZgKJc2zbSQ9fCPxt2W3mdVav66c6fsb7els2W2Iz7gERJSX
MIIEAzCCAuugAwIBAgIUBY1hlCGvdj4NhBXkZ/uLUZNILAwwDQYJKoZIhvcNAQEL
BQAwgZAxCzAJBgNVBAYTAkdCMRcwFQYDVQQIDA5Vbml0ZWQgS2luZ2RvbTEOMAwG
A1UEBwwFRGVyYnkxEjAQBgNVBAoMCU1vc3F1aXR0bzELMAkGA1UECwwCQ0ExFjAU
BgNVBAMMDW1vc3F1aXR0by5vcmcxHzAdBgkqhkiG9w0BCQEWEHJvZ2VyQGF0Y2hv
by5vcmcwHhcNMjAwNjA5MTEwNjM5WhcNMzAwNjA3MTEwNjM5WjCBkDELMAkGA1UE
BhMCR0IxFzAVBgNVBAgMDlVuaXRlZCBLaW5nZG9tMQ4wDAYDVQQHDAVEZXJieTES
MBAGA1UECgwJTW9zcXVpdHRvMQswCQYDVQQLDAJDQTEWMBQGA1UEAwwNbW9zcXVp
dHRvLm9yZzEfMB0GCSqGSIb3DQEJARYQcm9nZXJAYXRjaG9vLm9yZzCCASIwDQYJ
KoZIhvcNAQEBBQADggEPADCCAQoCggEBAME0HKmIzfTOwkKLT3THHe+ObdizamPg
UZmD64Tf3zJdNeYGYn4CEXbyP6fy3tWc8S2boW6dzrH8SdFf9uo320GJA9B7U1FW
Te3xda/Lm3JFfaHjkWw7jBwcauQZjpGINHapHRlpiCZsquAthOgxW9SgDgYlGzEA
s06pkEFiMw+qDfLo/sxFKB6vQlFekMeCymjLCbNwPJyqyhFmPWwio/PDMruBTzPH
3cioBnrJWKXc3OjXdLGFJOfj7pP0j/dr2LH72eSvv3PQQFl90CZPFhrCUcRHSSxo
E6yjGOdnz7f6PveLIB574kQORwt8ePn0yidrTC1ictikED3nHYhMUOUCAwEAAaNT
MFEwHQYDVR0OBBYEFPVV6xBUFPiGKDyo5V3+Hbh4N9YSMB8GA1UdIwQYMBaAFPVV
6xBUFPiGKDyo5V3+Hbh4N9YSMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL
BQADggEBAGa9kS21N70ThM6/Hj9D7mbVxKLBjVWe2TPsGfbl3rEDfZ+OKRZ2j6AC
6r7jb4TZO3dzF2p6dgbrlU71Y/4K0TdzIjRj3cQ3KSm41JvUQ0hZ/c04iGDg/xWf
+pp58nfPAYwuerruPNWmlStWAXf0UTqRtg4hQDWBuUFDJTuWuuBvEXudz74eh/wK
sMwfu1HFvjy5Z0iMDU8PUDepjVolOCue9ashlS4EB5IECdSR2TItnAIiIwimx839
LdUdRudafMu5T5Xma182OC0/u/xRlEm+tvKGGmfFcN0piqVl8OrSPBgIlb+1IKJE
m/XriWr/Cq4h/JfB7NTsezVslgkBaoU=
-----END CERTIFICATE-----

Wyświetl plik

@ -2,10 +2,8 @@ import logging
from dataclasses import dataclass
from amqtt.broker import Action
from amqtt.plugins.base import BasePlugin, BaseAuthPlugin, BaseTopicPlugin
from amqtt.plugins.contexts import BaseContext
from amqtt.contexts import BaseContext, Action
from amqtt.session import Session

Wyświetl plik

@ -4,7 +4,7 @@ from pathlib import Path
import unittest
from amqtt.plugins.authentication import AnonymousAuthPlugin, FileAuthPlugin
from amqtt.plugins.contexts import BaseContext
from amqtt.contexts import BaseContext
from amqtt.session import Session
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"

Wyświetl plik

@ -2,11 +2,11 @@ import asyncio
import logging
import unittest
from amqtt.broker import Action
from amqtt.events import BrokerEvents
from amqtt.plugins.base import BaseAuthPlugin, BaseTopicPlugin
from amqtt.plugins.manager import PluginManager
from amqtt.plugins.contexts import BaseContext
from amqtt.contexts import BaseContext, Action
from amqtt.session import Session
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"

Wyświetl plik

@ -4,7 +4,7 @@ from pathlib import Path
import sqlite3
import unittest
from amqtt.plugins.contexts import BaseContext
from amqtt.contexts import BaseContext
from amqtt.plugins.persistence import SQLitePlugin
from amqtt.session import Session

Wyświetl plik

@ -13,11 +13,11 @@ import pytest
import amqtt.plugins
from amqtt.broker import Broker, BrokerContext
from amqtt.client import MQTTClient
from amqtt.errors import PluginError, PluginInitError, PluginImportError
from amqtt.errors import PluginInitError, PluginImportError
from amqtt.events import MQTTEvents, BrokerEvents
from amqtt.mqtt.constants import QOS_0
from amqtt.plugins.base import BasePlugin
from amqtt.plugins.contexts import BaseContext
from amqtt.contexts import BaseContext
_INVALID_METHOD: str = "invalid_foo"
_PLUGIN: str = "Plugin"

Wyświetl plik

@ -31,7 +31,11 @@ all_sys_topics = [
'$SYS/broker/messages/publish/received',
'$SYS/broker/messages/publish/sent',
'$SYS/broker/messages/retained/count',
'$SYS/broker/messages/subscriptions/count'
'$SYS/broker/messages/subscriptions/count',
'$SYS/broker/heap/size',
'$SYS/broker/heap/maximum',
'$SYS/broker/cpu/percent',
'$SYS/broker/cpu/maximum',
]

Wyświetl plik

@ -2,8 +2,9 @@ import logging
import pytest
from amqtt.broker import Action, BrokerContext, Broker
from amqtt.plugins.contexts import BaseContext
from amqtt.broker import BrokerContext, Broker
from amqtt.contexts import BaseContext, Action
from amqtt.plugins.topic_checking import TopicAccessControlListPlugin, TopicTabooPlugin
from amqtt.plugins.base import BaseTopicPlugin
from amqtt.session import Session

Wyświetl plik

@ -446,6 +446,15 @@ async def test_connect_incorrect_scheme():
await client.connect('"mq://someplace')
@pytest.mark.asyncio
@pytest.mark.timeout(3)
async def test_connect_timeout():
config = {"auto_reconnect": False, "connection_timeout": 2}
client = MQTTClient(config=config)
with pytest.raises(ClientError):
await client.connect("mqtt://localhost:8888")
async def test_client_no_auth():
class MockEntryPoints:

Wyświetl plik

@ -0,0 +1,242 @@
import asyncio
import logging
import signal
import subprocess
from pathlib import Path
import pytest
from amqtt.broker import Broker
from samples.client_publish import __main__ as client_publish_main
from samples.client_subscribe import __main__ as client_subscribe_main
from samples.client_keepalive import __main__ as client_keepalive_main
from samples.broker_acl import config as broker_acl_config
from samples.broker_taboo import config as broker_taboo_config
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_broker_acl():
broker_acl_script = Path(__file__).parent.parent / "samples/broker_acl.py"
process = subprocess.Popen(["python", broker_acl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Send the interrupt signal
await asyncio.sleep(5)
process.send_signal(signal.SIGINT)
stdout, stderr = process.communicate()
logger.debug(stderr.decode("utf-8"))
assert "Broker closed" in stderr.decode("utf-8")
assert "ERROR" not in stderr.decode("utf-8")
assert "Exception" not in stderr.decode("utf-8")
@pytest.mark.asyncio
async def test_broker_simple():
broker_simple_script = Path(__file__).parent.parent / "samples/broker_simple.py"
process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
await asyncio.sleep(5)
# Send the interrupt signal
process.send_signal(signal.SIGINT)
stdout, stderr = process.communicate()
logger.debug(stderr.decode("utf-8"))
has_broker_closed = "Broker closed" in stderr.decode("utf-8")
has_loop_stopped = "Broadcast loop stopped by exception" in stderr.decode("utf-8")
assert has_broker_closed or has_loop_stopped, "Broker didn't close correctly."
@pytest.mark.asyncio
async def test_broker_start():
broker_start_script = Path(__file__).parent.parent / "samples/broker_start.py"
process = subprocess.Popen(["python", broker_start_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
await asyncio.sleep(5)
# Send the interrupt signal to stop broker
process.send_signal(signal.SIGINT)
stdout, stderr = process.communicate()
logger.debug(stderr.decode("utf-8"))
assert "Broker closed" in stderr.decode("utf-8")
assert "ERROR" not in stderr.decode("utf-8")
assert "Exception" not in stderr.decode("utf-8")
@pytest.mark.asyncio
async def test_broker_taboo():
broker_taboo_script = Path(__file__).parent.parent / "samples/broker_taboo.py"
process = subprocess.Popen(["python", broker_taboo_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
await asyncio.sleep(5)
# Send the interrupt signal to stop broker
process.send_signal(signal.SIGINT)
stdout, stderr = process.communicate()
logger.debug(stderr.decode("utf-8"))
assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8")
assert "ERROR" not in stderr.decode("utf-8")
assert "Exception" not in stderr.decode("utf-8")
@pytest.mark.timeout(25)
@pytest.mark.asyncio
async def test_client_keepalive():
broker = Broker()
await broker.start()
await asyncio.sleep(2)
keep_alive_script = Path(__file__).parent.parent / "samples/client_keepalive.py"
process = subprocess.Popen(["python", keep_alive_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
await asyncio.sleep(2)
stdout, stderr = process.communicate()
assert "ERROR" not in stderr.decode("utf-8")
assert "Exception" not in stderr.decode("utf-8")
await broker.shutdown()
@pytest.mark.asyncio
async def test_client_publish():
broker = Broker()
await broker.start()
await asyncio.sleep(2)
client_publish = Path(__file__).parent.parent / "samples/client_publish.py"
process = subprocess.Popen(["python", client_publish], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
await asyncio.sleep(2)
stdout, stderr = process.communicate()
assert "ERROR" not in stderr.decode("utf-8")
assert "Exception" not in stderr.decode("utf-8")
await broker.shutdown()
broker_ssl_config = {
"listeners": {
"default": {
"type": "tcp",
"bind": "0.0.0.0:8883",
"ssl": True,
"certfile": "cert.pem",
"keyfile": "key.pem",
}
},
"auth": {
"allow-anonymous": True,
"plugins": ["auth_anonymous"]
}
}
@pytest.mark.asyncio
async def test_client_publish_ssl():
# generate a self-signed certificate for this test
cmd = 'openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout key.pem -out cert.pem -subj "/CN=localhost"'
subprocess.run(cmd, shell=True, capture_output=True, text=True)
# start a secure broker
broker = Broker(config=broker_ssl_config)
await broker.start()
await asyncio.sleep(2)
# run the sample
client_publish_ssl_script = Path(__file__).parent.parent / "samples/client_publish_ssl.py"
process = subprocess.Popen(["python", client_publish_ssl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
await asyncio.sleep(2)
stdout, stderr = process.communicate()
assert "ERROR" not in stderr.decode("utf-8")
assert "Exception" not in stderr.decode("utf-8")
await broker.shutdown()
@pytest.mark.asyncio
async def test_client_publish_acl():
broker = Broker()
await broker.start()
await asyncio.sleep(2)
broker_simple_script = Path(__file__).parent.parent / "samples/client_publish_acl.py"
process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Send the interrupt signal
await asyncio.sleep(2)
stdout, stderr = process.communicate()
logger.debug(stderr.decode("utf-8"))
assert "ERROR" not in stderr.decode("utf-8")
assert "Exception" not in stderr.decode("utf-8")
await broker.shutdown()
broker_ws_config = {
"listeners": {
"default": {
"type": "ws",
"bind": "0.0.0.0:8080",
}
},
"auth": {
"allow-anonymous": True,
"plugins": ["auth_anonymous"]
}
}
@pytest.mark.asyncio
async def test_client_publish_ws():
# start a secure broker
broker = Broker(config=broker_ws_config)
await broker.start()
await asyncio.sleep(2)
# run the sample
client_publish_ssl_script = Path(__file__).parent.parent / "samples/client_publish_ws.py"
process = subprocess.Popen(["python", client_publish_ssl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
await asyncio.sleep(2)
stdout, stderr = process.communicate()
assert "ERROR" not in stderr.decode("utf-8")
assert "Exception" not in stderr.decode("utf-8")
await broker.shutdown()
def test_client_subscribe():
client_subscribe_main()
@pytest.mark.asyncio
async def test_client_subscribe_plugin_acl():
broker = Broker(config=broker_acl_config)
await broker.start()
broker_simple_script = Path(__file__).parent.parent / "samples/client_subscribe_acl.py"
process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Send the interrupt signal
await asyncio.sleep(5)
process.send_signal(signal.SIGINT)
stdout, stderr = process.communicate()
logger.debug(stderr.decode("utf-8"))
assert "Subscribed results: [128, 1, 128, 1, 128, 1]" in stderr.decode("utf-8")
assert "ERROR" not in stderr.decode("utf-8")
assert "Exception" not in stderr.decode("utf-8")
await broker.shutdown()
@pytest.mark.asyncio
async def test_client_subscribe_plugin_taboo():
broker = Broker(config=broker_taboo_config)
await broker.start()
broker_simple_script = Path(__file__).parent.parent / "samples/client_subscribe_acl.py"
process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Send the interrupt signal
await asyncio.sleep(5)
process.send_signal(signal.SIGINT)
stdout, stderr = process.communicate()
logger.debug(stderr.decode("utf-8"))
assert "Subscribed results: [1, 1, 128, 1, 1, 1]" in stderr.decode("utf-8")
assert "ERROR" not in stderr.decode("utf-8")
assert "Exception" not in stderr.decode("utf-8")
await broker.shutdown()

Wyświetl plik

@ -9,11 +9,12 @@ resolution-markers = [
[[package]]
name = "amqtt"
version = "0.11.0"
version = "0.11.1"
source = { editable = "." }
dependencies = [
{ name = "dacite" },
{ name = "passlib" },
{ name = "psutil" },
{ name = "pyyaml" },
{ name = "transitions" },
{ name = "typer" },
@ -69,6 +70,7 @@ requires-dist = [
{ name = "coveralls", marker = "extra == 'ci'", specifier = "==4.0.1" },
{ name = "dacite", specifier = ">=1.9.2" },
{ name = "passlib", specifier = "==1.7.4" },
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pyyaml", specifier = "==6.0.2" },
{ name = "transitions", specifier = "==0.9.2" },
{ name = "typer", specifier = "==0.15.4" },