From bb76dbeec99a05c73eff947d4e7eedfbef4280ab Mon Sep 17 00:00:00 2001 From: Andrew Mirsky Date: Wed, 25 Jun 2025 22:23:20 -0400 Subject: [PATCH] fixing config parameters for MQTTClient and updating docs to match. refining sample scripts to rely less on accessing external broker for successful completion --- .gitignore | 1 + amqtt/client.py | 6 ++- docs/references/client_config.md | 7 +++ samples/client_keepalive.py | 2 +- samples/client_publish_ssl.py | 10 +++- samples/client_publish_ws.py | 2 +- tests/test_samples.py | 85 +++++++++++++++++++++++++++----- 7 files changed, 96 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index dd30c86..1d775a3 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ __pycache__ node_modules .vite +*.pem #------- Environment Files ------- .python-version diff --git a/amqtt/client.py b/amqtt/client.py index 5154a98..dcb61d6 100644 --- a/amqtt/client.py +++ b/amqtt/client.py @@ -463,10 +463,12 @@ class MQTTClient: capath=self.session.capath, cadata=self.session.cadata, ) - if "certfile" in self.config and "keyfile" in self.config: - sc.load_cert_chain(self.config["certfile"], self.config["keyfile"]) + + if "certfile" in self.config: + sc.load_verify_locations(cafile=self.config["certfile"]) if "check_hostname" in self.config and isinstance(self.config["check_hostname"], bool): sc.check_hostname = self.config["check_hostname"] + sc.verify_mode = ssl.CERT_REQUIRED kwargs["ssl"] = sc try: diff --git a/docs/references/client_config.md b/docs/references/client_config.md index 24985d1..146b8dd 100644 --- a/docs/references/client_config.md +++ b/docs/references/client_config.md @@ -63,8 +63,15 @@ TLS certificates used to verify the broker's authenticity. - `cafile` *(string)*: Path to a file of concatenated CA certificates in PEM format. See [Certificates](https://docs.python.org/3/library/ssl.html#ssl-certificates) for more info. - `capath` *(string)*: Path to a directory containing several CA certificates in PEM format, following an [OpenSSL specific layout](https://docs.openssl.org/master/man3/SSL_CTX_load_verify_locations/). - `cadata` *(string)*: Either an ASCII string of one or more PEM-encoded certificates or a bytes-like object of DER-encoded certificates. +- +- +### `certfile` *(string)* +Path to a single file in PEM format containing the certificate as well as any number of CA certificates needed to establish the server certificate's authenticity. +### `check_hostname` *(bool)* + +Bypass ssl host certificate verification, allowing self-signed certificates ## Default Configuration diff --git a/samples/client_keepalive.py b/samples/client_keepalive.py index 1bb960c..6f02f17 100644 --- a/samples/client_keepalive.py +++ b/samples/client_keepalive.py @@ -21,7 +21,7 @@ async def main() -> None: try: await client.connect("mqtt://test.mosquitto.org:1883/") logger.info("client connected") - await asyncio.sleep(18) + await asyncio.sleep(15) except CancelledError: pass diff --git a/samples/client_publish_ssl.py b/samples/client_publish_ssl.py index 60c3acc..528c347 100644 --- a/samples/client_publish_ssl.py +++ b/samples/client_publish_ssl.py @@ -7,6 +7,9 @@ from amqtt.mqtt.constants import QOS_1, QOS_2 """ This sample shows how to publish messages to secure broker. + +Use `openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout key.pem -out cert.pem -subj "/CN=localhost"` to +generate a self-signed certificate for the broker to use. """ logger = logging.getLogger(__name__) @@ -17,7 +20,10 @@ config = { "message": "Dead or alive", "qos": QOS_1, "retain": True, - } + }, + "auto_reconnect": False, + "check_hostname": False, + "certfile": "cert.pem", } client = MQTTClient(config=config) @@ -25,7 +31,7 @@ client = MQTTClient(config=config) async def test_coro() -> None: - await client.connect("mqtts://broker.hivemq.com:8883") + await client.connect("mqtts://localhost:8883") tasks = [ asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), diff --git a/samples/client_publish_ws.py b/samples/client_publish_ws.py index f9d96c9..e02c73e 100644 --- a/samples/client_publish_ws.py +++ b/samples/client_publish_ws.py @@ -22,7 +22,7 @@ client = MQTTClient(config=config) async def test_coro() -> None: - await client.connect("wss://test.mosquitto.org:8081/") + await client.connect("ws://localhost:8080/") tasks = [ asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_0")), asyncio.ensure_future(client.publish("a/b", b"TEST MESSAGE WITH QOS_1", qos=QOS_1)), diff --git a/tests/test_samples.py b/tests/test_samples.py index 1932bc5..8a6e37e 100644 --- a/tests/test_samples.py +++ b/tests/test_samples.py @@ -8,8 +8,6 @@ import pytest from amqtt.broker import Broker from samples.client_publish import __main__ as client_publish_main -from samples.client_publish_ssl import __main__ as client_publish_ssl_main -from samples.client_publish_ws import __main__ as client_publish_ws_main from samples.client_subscribe import __main__ as client_subscribe_main from samples.client_keepalive import __main__ as client_keepalive_main from samples.broker_acl import config as broker_acl_config @@ -35,8 +33,9 @@ async def test_broker_acl(): async def test_broker_simple(): broker_simple_script = Path(__file__).parent.parent / "samples/broker_simple.py" process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - # Send the interrupt signal await asyncio.sleep(5) + + # Send the interrupt signal process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() logger.debug(stderr.decode("utf-8")) @@ -50,8 +49,9 @@ async def test_broker_simple(): async def test_broker_start(): broker_start_script = Path(__file__).parent.parent / "samples/broker_start.py" process = subprocess.Popen(["python", broker_start_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - # Send the interrupt signal await asyncio.sleep(5) + + # Send the interrupt signal to stop broker process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() logger.debug(stderr.decode("utf-8")) @@ -64,8 +64,9 @@ async def test_broker_start(): async def test_broker_taboo(): broker_taboo_script = Path(__file__).parent.parent / "samples/broker_taboo.py" process = subprocess.Popen(["python", broker_taboo_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - # Send the interrupt signal await asyncio.sleep(5) + + # Send the interrupt signal to stop broker process.send_signal(signal.SIGINT) stdout, stderr = process.communicate() logger.debug(stderr.decode("utf-8")) @@ -82,9 +83,43 @@ def test_client_keepalive(): def test_client_publish(): client_publish_main() +broker_ssl_config = { + "listeners": { + "default": { + "type": "tcp", + "bind": "0.0.0.0:8883", + "ssl": True, + "certfile": "cert.pem", + "keyfile": "key.pem", + } + }, + "auth": { + "allow-anonymous": True, + "plugins": ["auth_anonymous"] + } +} -def test_client_publish_ssl(): - client_publish_ssl_main() +@pytest.mark.asyncio +async def test_client_publish_ssl(): + + # generate a self-signed certificate for this test + cmd = 'openssl req -x509 -nodes -days 365 -newkey rsa:2048 -keyout key.pem -out cert.pem -subj "/CN=localhost"' + subprocess.run(cmd, shell=True, capture_output=True, text=True) + + # start a secure broker + broker = Broker(config=broker_ssl_config) + await broker.start() + await asyncio.sleep(2) + # run the sample + client_publish_ssl_script = Path(__file__).parent.parent / "samples/client_publish_ssl.py" + process = subprocess.Popen(["python", client_publish_ssl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + await asyncio.sleep(2) + stdout, stderr = process.communicate() + + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() @pytest.mark.asyncio @@ -92,12 +127,13 @@ async def test_client_publish_acl(): broker = Broker() await broker.start() + await asyncio.sleep(2) broker_simple_script = Path(__file__).parent.parent / "samples/client_publish_acl.py" process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the interrupt signal - await asyncio.sleep(5) - process.send_signal(signal.SIGINT) + await asyncio.sleep(2) + stdout, stderr = process.communicate() logger.debug(stderr.decode("utf-8")) assert "ERROR" not in stderr.decode("utf-8") @@ -105,9 +141,36 @@ async def test_client_publish_acl(): await broker.shutdown() +broker_ws_config = { + "listeners": { + "default": { + "type": "ws", + "bind": "0.0.0.0:8080", + } + }, + "auth": { + "allow-anonymous": True, + "plugins": ["auth_anonymous"] + } +} -def test_client_publish_ws(): - client_publish_ws_main() +@pytest.mark.asyncio +async def test_client_publish_ws(): + # start a secure broker + broker = Broker(config=broker_ws_config) + await broker.start() + await asyncio.sleep(2) + # run the sample + + client_publish_ssl_script = Path(__file__).parent.parent / "samples/client_publish_ws.py" + process = subprocess.Popen(["python", client_publish_ssl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + await asyncio.sleep(2) + stdout, stderr = process.communicate() + + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() def test_client_subscribe():