Merge pull request #236 from ajmirsky/refine_sample_tests

improve refining sample script testing
pull/238/head^2
Andrew Mirsky 2025-06-26 13:46:02 -04:00 zatwierdzone przez GitHub
commit 1813628f19
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
9 zmienionych plików z 137 dodań i 44 usunięć

1
.gitignore vendored
Wyświetl plik

@ -3,6 +3,7 @@
__pycache__ __pycache__
node_modules node_modules
.vite .vite
*.pem
#------- Environment Files ------- #------- Environment Files -------
.python-version .python-version

Wyświetl plik

@ -465,10 +465,12 @@ class MQTTClient:
capath=self.session.capath, capath=self.session.capath,
cadata=self.session.cadata, cadata=self.session.cadata,
) )
if "certfile" in self.config and "keyfile" in self.config:
sc.load_cert_chain(self.config["certfile"], self.config["keyfile"]) if "certfile" in self.config:
sc.load_verify_locations(cafile=self.config["certfile"])
if "check_hostname" in self.config and isinstance(self.config["check_hostname"], bool): if "check_hostname" in self.config and isinstance(self.config["check_hostname"], bool):
sc.check_hostname = self.config["check_hostname"] sc.check_hostname = self.config["check_hostname"]
sc.verify_mode = ssl.CERT_REQUIRED
kwargs["ssl"] = sc kwargs["ssl"] = sc
try: try:

Wyświetl plik

@ -68,8 +68,15 @@ TLS certificates used to verify the broker's authenticity.
- `cafile` *(string)*: Path to a file of concatenated CA certificates in PEM format. See [Certificates](https://docs.python.org/3/library/ssl.html#ssl-certificates) for more info. - `cafile` *(string)*: Path to a file of concatenated CA certificates in PEM format. See [Certificates](https://docs.python.org/3/library/ssl.html#ssl-certificates) for more info.
- `capath` *(string)*: Path to a directory containing several CA certificates in PEM format, following an [OpenSSL specific layout](https://docs.openssl.org/master/man3/SSL_CTX_load_verify_locations/). - `capath` *(string)*: Path to a directory containing several CA certificates in PEM format, following an [OpenSSL specific layout](https://docs.openssl.org/master/man3/SSL_CTX_load_verify_locations/).
- `cadata` *(string)*: Either an ASCII string of one or more PEM-encoded certificates or a bytes-like object of DER-encoded certificates. - `cadata` *(string)*: Either an ASCII string of one or more PEM-encoded certificates or a bytes-like object of DER-encoded certificates.
-
-
### `certfile` *(string)*
Path to a single file in PEM format containing the certificate as well as any number of CA certificates needed to establish the server certificate's authenticity.
### `check_hostname` *(bool)*
Bypass ssl host certificate verification, allowing self-signed certificates
## Default Configuration ## Default Configuration

Wyświetl plik

@ -21,7 +21,7 @@ async def main() -> None:
try: try:
await client.connect("mqtt://test.mosquitto.org:1883/") await client.connect("mqtt://test.mosquitto.org:1883/")
logger.info("client connected") logger.info("client connected")
await asyncio.sleep(18) await asyncio.sleep(15)
except CancelledError: except CancelledError:
pass pass
@ -32,21 +32,8 @@ def __main__():
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter) logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.run(main())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = loop.create_task(main())
try:
loop.run_until_complete(task)
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received. Stopping client...")
task.cancel()
loop.run_until_complete(task) # Ensure task finishes cleanup
finally:
loop.close()
if __name__ == "__main__": if __name__ == "__main__":
__main__() __main__()

Wyświetl plik

@ -14,7 +14,7 @@ config = {
"will": { "will": {
"topic": "/will/client", "topic": "/will/client",
"message": b"Dead or alive", "message": b"Dead or alive",
"qos": 0x01, "qos": QOS_1,
"retain": True, "retain": True,
}, },
} }
@ -22,7 +22,7 @@ config = {
async def test_coro1() -> None: async def test_coro1() -> None:
client = MQTTClient() client = MQTTClient()
await client.connect("mqtt://test.mosquitto.org/") await client.connect("mqtt://localhost:1883/")
tasks = [ tasks = [
asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")),
asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)),
@ -35,15 +35,15 @@ async def test_coro1() -> None:
async def test_coro2() -> None: async def test_coro2() -> None:
try: try:
client = MQTTClient() client = MQTTClient(config={'auto_reconnect': False, 'connection_timeout': 1})
await client.connect("mqtt://test.mosquitto.org:1883/") await client.connect("mqtt://localhost:1884/")
await client.publish("a/b", b"TEST MESSAGE WITH QOS_0", qos=0x00) await client.publish("a/b", b"TEST MESSAGE WITH QOS_0", qos=0x00)
await client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=0x01) await client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=0x01)
await client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=0x02) await client.publish("a/b", b"TEST MESSAGE WITH QOS_2", qos=0x02)
logger.info("test_coro2 messages published") logger.info("test_coro2 messages published")
await client.disconnect() await client.disconnect()
except ConnectError: except ConnectError:
logger.exception(f"Connection failed", exc_info=True) logger.info(f"Connection failed", exc_info=True)
def __main__(): def __main__():

Wyświetl plik

@ -7,6 +7,9 @@ from amqtt.mqtt.constants import QOS_1, QOS_2
""" """
This sample shows how to publish messages to secure broker. This sample shows how to publish messages to secure broker.
Use `openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout key.pem -out cert.pem -subj "/CN=localhost"` to
generate a self-signed certificate for the broker to use.
""" """
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -17,7 +20,10 @@ config = {
"message": "Dead or alive", "message": "Dead or alive",
"qos": QOS_1, "qos": QOS_1,
"retain": True, "retain": True,
} },
"auto_reconnect": False,
"check_hostname": False,
"certfile": "cert.pem",
} }
client = MQTTClient(config=config) client = MQTTClient(config=config)
@ -25,7 +31,7 @@ client = MQTTClient(config=config)
async def test_coro() -> None: async def test_coro() -> None:
await client.connect("mqtts://broker.hivemq.com:8883") await client.connect("mqtts://localhost:8883")
tasks = [ tasks = [
asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")),
asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)),

Wyświetl plik

@ -22,7 +22,7 @@ client = MQTTClient(config=config)
async def test_coro() -> None: async def test_coro() -> None:
await client.connect("wss://test.mosquitto.org:8081/") await client.connect("ws://localhost:8080/")
tasks = [ tasks = [
asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")),
asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)),

Wyświetl plik

@ -132,7 +132,7 @@ async def test_plugin_exception_while_loading() -> None:
_ = Broker(plugin_namespace='tests.mock_plugins', config=config) _ = Broker(plugin_namespace='tests.mock_plugins', config=config)
class TestAllEventsPlugin(BasePlugin[BaseContext]): class AllEventsPlugin(BasePlugin[BaseContext]):
"""A plugin to verify all events get sent to plugins.""" """A plugin to verify all events get sent to plugins."""
def __init__(self, context: BaseContext) -> None: def __init__(self, context: BaseContext) -> None:
super().__init__(context) super().__init__(context)
@ -161,7 +161,7 @@ async def test_all_plugin_events():
match group: match group:
case 'tests.mock_plugins': case 'tests.mock_plugins':
return [ return [
EntryPoint(name='TestAllEventsPlugin', group='tests.mock_plugins', value='tests.plugins.test_plugins:TestAllEventsPlugin'), EntryPoint(name='AllEventsPlugin', group='tests.mock_plugins', value='tests.plugins.test_plugins:AllEventsPlugin'),
] ]
case _: case _:
return list() return list()
@ -192,7 +192,7 @@ async def test_all_plugin_events():
await asyncio.sleep(1) await asyncio.sleep(1)
# get the plugin so it doesn't get gc on shutdown # get the plugin so it doesn't get gc on shutdown
test_plugin = broker.plugins_manager.get_plugin('TestAllEventsPlugin') test_plugin = broker.plugins_manager.get_plugin('AllEventsPlugin')
await broker.shutdown() await broker.shutdown()
await asyncio.sleep(1) await asyncio.sleep(1)

Wyświetl plik

@ -8,8 +8,6 @@ import pytest
from amqtt.broker import Broker from amqtt.broker import Broker
from samples.client_publish import __main__ as client_publish_main from samples.client_publish import __main__ as client_publish_main
from samples.client_publish_ssl import __main__ as client_publish_ssl_main
from samples.client_publish_ws import __main__ as client_publish_ws_main
from samples.client_subscribe import __main__ as client_subscribe_main from samples.client_subscribe import __main__ as client_subscribe_main
from samples.client_keepalive import __main__ as client_keepalive_main from samples.client_keepalive import __main__ as client_keepalive_main
from samples.broker_acl import config as broker_acl_config from samples.broker_acl import config as broker_acl_config
@ -35,8 +33,9 @@ async def test_broker_acl():
async def test_broker_simple(): async def test_broker_simple():
broker_simple_script = Path(__file__).parent.parent / "samples/broker_simple.py" broker_simple_script = Path(__file__).parent.parent / "samples/broker_simple.py"
process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Send the interrupt signal
await asyncio.sleep(5) await asyncio.sleep(5)
# Send the interrupt signal
process.send_signal(signal.SIGINT) process.send_signal(signal.SIGINT)
stdout, stderr = process.communicate() stdout, stderr = process.communicate()
logger.debug(stderr.decode("utf-8")) logger.debug(stderr.decode("utf-8"))
@ -50,8 +49,9 @@ async def test_broker_simple():
async def test_broker_start(): async def test_broker_start():
broker_start_script = Path(__file__).parent.parent / "samples/broker_start.py" broker_start_script = Path(__file__).parent.parent / "samples/broker_start.py"
process = subprocess.Popen(["python", broker_start_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(["python", broker_start_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Send the interrupt signal
await asyncio.sleep(5) await asyncio.sleep(5)
# Send the interrupt signal to stop broker
process.send_signal(signal.SIGINT) process.send_signal(signal.SIGINT)
stdout, stderr = process.communicate() stdout, stderr = process.communicate()
logger.debug(stderr.decode("utf-8")) logger.debug(stderr.decode("utf-8"))
@ -64,8 +64,9 @@ async def test_broker_start():
async def test_broker_taboo(): async def test_broker_taboo():
broker_taboo_script = Path(__file__).parent.parent / "samples/broker_taboo.py" broker_taboo_script = Path(__file__).parent.parent / "samples/broker_taboo.py"
process = subprocess.Popen(["python", broker_taboo_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(["python", broker_taboo_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Send the interrupt signal
await asyncio.sleep(5) await asyncio.sleep(5)
# Send the interrupt signal to stop broker
process.send_signal(signal.SIGINT) process.send_signal(signal.SIGINT)
stdout, stderr = process.communicate() stdout, stderr = process.communicate()
logger.debug(stderr.decode("utf-8")) logger.debug(stderr.decode("utf-8"))
@ -74,17 +75,78 @@ async def test_broker_taboo():
assert "Exception" not in stderr.decode("utf-8") assert "Exception" not in stderr.decode("utf-8")
@pytest.mark.timeout(20) @pytest.mark.timeout(25)
def test_client_keepalive(): @pytest.mark.asyncio
client_keepalive_main() async def test_client_keepalive():
broker = Broker()
await broker.start()
await asyncio.sleep(2)
keep_alive_script = Path(__file__).parent.parent / "samples/client_keepalive.py"
process = subprocess.Popen(["python", keep_alive_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
await asyncio.sleep(2)
stdout, stderr = process.communicate()
assert "ERROR" not in stderr.decode("utf-8")
assert "Exception" not in stderr.decode("utf-8")
await broker.shutdown()
def test_client_publish(): @pytest.mark.asyncio
client_publish_main() async def test_client_publish():
broker = Broker()
await broker.start()
await asyncio.sleep(2)
client_publish = Path(__file__).parent.parent / "samples/client_publish.py"
process = subprocess.Popen(["python", client_publish], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
await asyncio.sleep(2)
def test_client_publish_ssl(): stdout, stderr = process.communicate()
client_publish_ssl_main() assert "ERROR" not in stderr.decode("utf-8")
assert "Exception" not in stderr.decode("utf-8")
await broker.shutdown()
broker_ssl_config = {
"listeners": {
"default": {
"type": "tcp",
"bind": "0.0.0.0:8883",
"ssl": True,
"certfile": "cert.pem",
"keyfile": "key.pem",
}
},
"auth": {
"allow-anonymous": True,
"plugins": ["auth_anonymous"]
}
}
@pytest.mark.asyncio
async def test_client_publish_ssl():
# generate a self-signed certificate for this test
cmd = 'openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout key.pem -out cert.pem -subj "/CN=localhost"'
subprocess.run(cmd, shell=True, capture_output=True, text=True)
# start a secure broker
broker = Broker(config=broker_ssl_config)
await broker.start()
await asyncio.sleep(2)
# run the sample
client_publish_ssl_script = Path(__file__).parent.parent / "samples/client_publish_ssl.py"
process = subprocess.Popen(["python", client_publish_ssl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
await asyncio.sleep(2)
stdout, stderr = process.communicate()
assert "ERROR" not in stderr.decode("utf-8")
assert "Exception" not in stderr.decode("utf-8")
await broker.shutdown()
@pytest.mark.asyncio @pytest.mark.asyncio
@ -92,12 +154,13 @@ async def test_client_publish_acl():
broker = Broker() broker = Broker()
await broker.start() await broker.start()
await asyncio.sleep(2)
broker_simple_script = Path(__file__).parent.parent / "samples/client_publish_acl.py" broker_simple_script = Path(__file__).parent.parent / "samples/client_publish_acl.py"
process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Send the interrupt signal # Send the interrupt signal
await asyncio.sleep(5) await asyncio.sleep(2)
process.send_signal(signal.SIGINT)
stdout, stderr = process.communicate() stdout, stderr = process.communicate()
logger.debug(stderr.decode("utf-8")) logger.debug(stderr.decode("utf-8"))
assert "ERROR" not in stderr.decode("utf-8") assert "ERROR" not in stderr.decode("utf-8")
@ -105,9 +168,36 @@ async def test_client_publish_acl():
await broker.shutdown() await broker.shutdown()
broker_ws_config = {
"listeners": {
"default": {
"type": "ws",
"bind": "0.0.0.0:8080",
}
},
"auth": {
"allow-anonymous": True,
"plugins": ["auth_anonymous"]
}
}
def test_client_publish_ws(): @pytest.mark.asyncio
client_publish_ws_main() async def test_client_publish_ws():
# start a secure broker
broker = Broker(config=broker_ws_config)
await broker.start()
await asyncio.sleep(2)
# run the sample
client_publish_ssl_script = Path(__file__).parent.parent / "samples/client_publish_ws.py"
process = subprocess.Popen(["python", client_publish_ssl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
await asyncio.sleep(2)
stdout, stderr = process.communicate()
assert "ERROR" not in stderr.decode("utf-8")
assert "Exception" not in stderr.decode("utf-8")
await broker.shutdown()
def test_client_subscribe(): def test_client_subscribe():