kopia lustrzana https://github.com/Yakifo/amqtt
337 wiersze
11 KiB
Python
337 wiersze
11 KiB
Python
import asyncio
|
|
import logging
|
|
import signal
|
|
import subprocess
|
|
|
|
from multiprocessing import Process
|
|
from pathlib import Path
|
|
|
|
from typer.testing import CliRunner
|
|
|
|
from samples.http_server_integration import main as http_server_main
|
|
from samples.unix_sockets import app as unix_sockets_app
|
|
|
|
import pytest
|
|
|
|
from amqtt.broker import Broker
|
|
from amqtt.client import MQTTClient
|
|
from samples.broker_acl import config as broker_acl_config
|
|
from samples.broker_taboo import config as broker_taboo_config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_broker_acl():
|
|
broker_acl_script = Path(__file__).parent.parent / "samples/broker_acl.py"
|
|
process = subprocess.Popen(["python", broker_acl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
# Send the interrupt signal
|
|
await asyncio.sleep(2)
|
|
process.send_signal(signal.SIGINT)
|
|
stdout, stderr = process.communicate()
|
|
logger.debug(stderr.decode("utf-8"))
|
|
assert "Broker closed" in stderr.decode("utf-8")
|
|
assert "ERROR" not in stderr.decode("utf-8")
|
|
assert "Exception" not in stderr.decode("utf-8")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_broker_simple():
|
|
broker_simple_script = Path(__file__).parent.parent / "samples/broker_simple.py"
|
|
process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
await asyncio.sleep(2)
|
|
|
|
# Send the interrupt signal
|
|
process.send_signal(signal.SIGINT)
|
|
stdout, stderr = process.communicate()
|
|
logger.debug(stderr.decode("utf-8"))
|
|
has_broker_closed = "Broker closed" in stderr.decode("utf-8")
|
|
has_loop_stopped = "Broadcast loop stopped by exception" in stderr.decode("utf-8")
|
|
|
|
assert has_broker_closed or has_loop_stopped, "Broker didn't close correctly."
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_broker_start():
|
|
broker_start_script = Path(__file__).parent.parent / "samples/broker_start.py"
|
|
process = subprocess.Popen(["python", broker_start_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
await asyncio.sleep(2)
|
|
|
|
# Send the interrupt signal to stop broker
|
|
process.send_signal(signal.SIGINT)
|
|
stdout, stderr = process.communicate()
|
|
logger.debug(stderr.decode("utf-8"))
|
|
assert "Broker closed" in stderr.decode("utf-8")
|
|
assert "ERROR" not in stderr.decode("utf-8")
|
|
assert "Exception" not in stderr.decode("utf-8")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_broker_taboo():
|
|
broker_taboo_script = Path(__file__).parent.parent / "samples/broker_taboo.py"
|
|
process = subprocess.Popen(["python", broker_taboo_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
await asyncio.sleep(2)
|
|
|
|
# Send the interrupt signal to stop broker
|
|
process.send_signal(signal.SIGINT)
|
|
stdout, stderr = process.communicate()
|
|
logger.debug(stderr.decode("utf-8"))
|
|
assert "INFO :: amqtt.broker :: Broker closed" in stderr.decode("utf-8")
|
|
assert "ERROR" not in stderr.decode("utf-8")
|
|
assert "Exception" not in stderr.decode("utf-8")
|
|
|
|
|
|
@pytest.mark.timeout(25)
|
|
@pytest.mark.asyncio
|
|
async def test_client_keepalive():
|
|
|
|
broker = Broker()
|
|
await broker.start()
|
|
await asyncio.sleep(2)
|
|
|
|
keep_alive_script = Path(__file__).parent.parent / "samples/client_keepalive.py"
|
|
process = subprocess.Popen(["python", keep_alive_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
await asyncio.sleep(1)
|
|
|
|
stdout, stderr = process.communicate()
|
|
assert "ERROR" not in stderr.decode("utf-8")
|
|
assert "Exception" not in stderr.decode("utf-8")
|
|
|
|
await broker.shutdown()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_publish():
|
|
broker = Broker()
|
|
await broker.start()
|
|
await asyncio.sleep(2)
|
|
|
|
client_publish = Path(__file__).parent.parent / "samples/client_publish.py"
|
|
process = subprocess.Popen(["python", client_publish], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
await asyncio.sleep(2)
|
|
|
|
stdout, stderr = process.communicate()
|
|
assert "ERROR" not in stderr.decode("utf-8")
|
|
assert "Exception" not in stderr.decode("utf-8")
|
|
|
|
await broker.shutdown()
|
|
|
|
|
|
@pytest.fixture
|
|
def broker_ssl_config(rsa_keys):
|
|
certfile, keyfile = rsa_keys
|
|
return {
|
|
"listeners": {
|
|
"default": {
|
|
"type": "tcp",
|
|
"bind": "0.0.0.0:8883",
|
|
"ssl": True,
|
|
"certfile": certfile,
|
|
"keyfile": keyfile,
|
|
}
|
|
},
|
|
"auth": {
|
|
"allow-anonymous": True,
|
|
"plugins": ["auth_anonymous"]
|
|
}
|
|
}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_publish_ssl(broker_ssl_config, rsa_keys):
|
|
certfile, _ = rsa_keys
|
|
# generate a self-signed certificate for this test
|
|
|
|
# start a secure broker
|
|
broker = Broker(config=broker_ssl_config)
|
|
await broker.start()
|
|
await asyncio.sleep(2)
|
|
# run the sample
|
|
client_publish_ssl_script = Path(__file__).parent.parent / "samples/client_publish_ssl.py"
|
|
process = subprocess.Popen(["python", client_publish_ssl_script, '--cert', certfile], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
await asyncio.sleep(2)
|
|
stdout, stderr = process.communicate()
|
|
|
|
assert "ERROR" not in stderr.decode("utf-8")
|
|
assert "Exception" not in stderr.decode("utf-8")
|
|
|
|
await broker.shutdown()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_publish_acl():
|
|
|
|
broker = Broker()
|
|
await broker.start()
|
|
await asyncio.sleep(2)
|
|
|
|
broker_simple_script = Path(__file__).parent.parent / "samples/client_publish_acl.py"
|
|
process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
# Send the interrupt signal
|
|
await asyncio.sleep(2)
|
|
|
|
stdout, stderr = process.communicate()
|
|
logger.debug(stderr.decode("utf-8"))
|
|
assert "ERROR" not in stderr.decode("utf-8")
|
|
assert "Exception" not in stderr.decode("utf-8")
|
|
|
|
await broker.shutdown()
|
|
|
|
broker_ws_config = {
|
|
"listeners": {
|
|
"default": {
|
|
"type": "ws",
|
|
"bind": "0.0.0.0:8080",
|
|
}
|
|
},
|
|
"auth": {
|
|
"allow-anonymous": True,
|
|
"plugins": ["auth_anonymous"]
|
|
}
|
|
}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_publish_ws():
|
|
# start a secure broker
|
|
broker = Broker(config=broker_ws_config)
|
|
await broker.start()
|
|
await asyncio.sleep(2)
|
|
# run the sample
|
|
|
|
client_publish_ssl_script = Path(__file__).parent.parent / "samples/client_publish_ws.py"
|
|
process = subprocess.Popen(["python", client_publish_ssl_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
await asyncio.sleep(2)
|
|
stdout, stderr = process.communicate()
|
|
|
|
assert "ERROR" not in stderr.decode("utf-8")
|
|
assert "Exception" not in stderr.decode("utf-8")
|
|
|
|
await broker.shutdown()
|
|
|
|
|
|
broker_std_config = {
|
|
"listeners": {
|
|
"default": {
|
|
"type": "tcp",
|
|
"bind": "0.0.0.0:1883",
|
|
}
|
|
},
|
|
'sys_interval':2,
|
|
"auth": {
|
|
"allow-anonymous": True,
|
|
"plugins": ["auth_anonymous"]
|
|
}
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_subscribe():
|
|
|
|
# start a standard broker
|
|
broker = Broker(config=broker_std_config)
|
|
await broker.start()
|
|
await asyncio.sleep(1)
|
|
|
|
# run the sample
|
|
client_subscribe_script = Path(__file__).parent.parent / "samples/client_subscribe.py"
|
|
|
|
process = await asyncio.create_subprocess_shell(
|
|
" ".join(["python", str(client_subscribe_script)]),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
stdout, stderr = await process.communicate()
|
|
|
|
assert "ERROR" not in stdout.decode("utf-8")
|
|
assert "Exception" not in stdout.decode("utf-8")
|
|
assert "ERROR" not in stderr.decode("utf-8")
|
|
assert "Exception" not in stderr.decode("utf-8")
|
|
|
|
await broker.shutdown()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_subscribe_plugin_acl():
|
|
broker = Broker(config=broker_acl_config)
|
|
await broker.start()
|
|
|
|
broker_simple_script = Path(__file__).parent.parent / "samples/client_subscribe_acl.py"
|
|
process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
# Send the interrupt signal
|
|
await asyncio.sleep(2)
|
|
process.send_signal(signal.SIGINT)
|
|
stdout, stderr = process.communicate()
|
|
logger.debug(stderr.decode("utf-8"))
|
|
assert "Subscribed results: [128, 1, 128, 1, 128, 1]" in stderr.decode("utf-8")
|
|
assert "ERROR" not in stderr.decode("utf-8")
|
|
assert "Exception" not in stderr.decode("utf-8")
|
|
|
|
await broker.shutdown()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_subscribe_plugin_taboo():
|
|
broker = Broker(config=broker_taboo_config)
|
|
await broker.start()
|
|
|
|
broker_simple_script = Path(__file__).parent.parent / "samples/client_subscribe_acl.py"
|
|
process = subprocess.Popen(["python", broker_simple_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
# Send the interrupt signal
|
|
await asyncio.sleep(2)
|
|
process.send_signal(signal.SIGINT)
|
|
stdout, stderr = process.communicate()
|
|
logger.debug(stderr.decode("utf-8"))
|
|
assert "Subscribed results: [1, 1, 128, 1, 1, 1]" in stderr.decode("utf-8")
|
|
assert "ERROR" not in stderr.decode("utf-8")
|
|
assert "Exception" not in stderr.decode("utf-8")
|
|
|
|
await broker.shutdown()
|
|
|
|
|
|
@pytest.fixture
|
|
def external_http_server():
|
|
p = Process(target=http_server_main)
|
|
p.start()
|
|
yield p
|
|
p.terminate()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_external_http_server(external_http_server):
|
|
|
|
await asyncio.sleep(1)
|
|
client = MQTTClient(config={'auto_reconnect': False})
|
|
await client.connect("ws://127.0.0.1:8080/mqtt")
|
|
assert client.session is not None
|
|
await client.publish("my/topic", b'test message')
|
|
await client.disconnect()
|
|
# Send the interrupt signal
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unix_connection():
|
|
|
|
unix_socket_script = Path(__file__).parent.parent / "samples/unix_sockets.py"
|
|
broker_process = subprocess.Popen(["python", unix_socket_script, "broker", "-s", "/tmp/mqtt"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
# start the broker
|
|
await asyncio.sleep(1)
|
|
|
|
# start the client
|
|
client_process = subprocess.Popen(["python", unix_socket_script, "client", "-s", "/tmp/mqtt"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
|
await asyncio.sleep(3)
|
|
|
|
# stop the client (ctrl-c)
|
|
client_process.send_signal(signal.SIGINT)
|
|
_ = client_process.communicate()
|
|
|
|
# stop the broker (ctrl-c)
|
|
broker_process.send_signal(signal.SIGINT)
|
|
broker_stdout, broker_stderr = broker_process.communicate()
|
|
|
|
logger.debug(broker_stderr.decode("utf-8"))
|
|
|
|
# verify that the broker received client connected/disconnected
|
|
assert "on_broker_client_connected" in broker_stderr.decode("utf-8")
|
|
assert "on_broker_client_disconnected" in broker_stderr.decode("utf-8")
|