using 'n' messages to force amqtt_sub to exit, instead of SIGINT which seems to cause the process to force quit instead of causing a keyboard interupt

pull/192/head
Andrew Mirsky 2025-06-03 15:18:34 -04:00
rodzic dd666811a5
commit 9d3d9bd7e2
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: A98E67635CDF2C39
7 zmienionych plików z 77 dodań i 72 usunięć

Wyświetl plik

@ -70,7 +70,7 @@ jobs:
run: uv sync --locked --dev
- name: Run pytest
run: uv run --frozen pytest tests --cov=./ --cov-report=xml --junitxml=pytest-report.xml
run: uv run --frozen pytest tests/ --cov=./ --cov-report=xml --junitxml=pytest-report.xml
# https://github.com/actions/upload-artifact
- name: Upload test report

Wyświetl plik

@ -16,8 +16,6 @@ repos:
hooks:
- id: ruff
args:
- --fix
- --unsafe-fixes
- --line-length=130
- --exit-non-zero-on-fix

Wyświetl plik

@ -156,7 +156,7 @@ class MQTTClient:
msg = "Future or Task was cancelled"
raise ConnectError(msg) from e
except Exception as e:
self.logger.warning(f"Connection failed: {e}")
self.logger.warning(f"Connection failed: {e!r}")
if not self.config.get("auto_reconnect", False):
raise
return await self.reconnect()
@ -230,9 +230,11 @@ class MQTTClient:
msg = "Future or Task was cancelled"
raise ConnectError(msg) from e
except Exception as e:
self.logger.warning(f"Reconnection attempt failed: {e}", exc_info=True)
self.logger.warning(f"Reconnection attempt failed: {e!r}")
self.logger.debug("", exc_info=True)
if reconnect_retries < nb_attempt: # reconnect_retries >= 0 and
self.logger.exception("Maximum connection attempts reached. Reconnection aborted.")
self.logger.debug("", exc_info=True)
msg = "Too many failed attempts"
raise ConnectError(msg) from e
delay = min(reconnect_max_interval, 2**nb_attempt)
@ -475,6 +477,7 @@ class MQTTClient:
self.session.remote_port,
**kwargs,
)
reader = StreamReaderAdapter(conn_reader)
writer = StreamWriterAdapter(conn_writer)
elif scheme in ("ws", "wss") and self.session.broker_uri:
@ -511,7 +514,7 @@ class MQTTClient:
self.logger.debug(f"Connected to {self.session.remote_address}:{self.session.remote_port}")
except (InvalidURI, InvalidHandshake, ProtocolHandlerError, ConnectionError, OSError) as e:
self.logger.warning(f"Connection failed : {self.session.broker_uri} : {e}")
self.logger.debug(f"Connection failed : {self.session.broker_uri} [{e!r}]")
self.session.transitions.disconnect()
raise ConnectError(e) from e
return return_code

Wyświetl plik

@ -87,7 +87,8 @@ class PluginManager:
obj = plugin(plugin_context)
return Plugin(ep.name, ep, obj)
except ImportError:
self.logger.warning(f"Plugin {ep!r} import failed", exc_info=True)
self.logger.warning(f"Plugin {ep!r} import failed")
self.logger.debug("", exc_info=True)
return None

Wyświetl plik

@ -112,7 +112,7 @@ async def do_pub(
await client.disconnect()
logger.info(f"{client.client_id} Disconnected from broker")
except ConnectError as ce:
logger.fatal(f"Connection to '{url}' failed: {ce!r}")
logger.fatal(f"Connection to '{client.session.broker_uri if client.session else url}' failed")
raise ConnectError from ce
except asyncio.CancelledError as ce:
logger.fatal("Publish canceled due to previous error")

Wyświetl plik

@ -38,7 +38,7 @@ def broker_config():
@pytest.fixture
def broker_config_file(broker_config, tmp_path):
config_path = tmp_path / "config.yaml"
config_path = tmp_path / "broker.yaml"
with config_path.open("w") as f:
yaml.dump(broker_config, f)
return str(config_path)
@ -53,13 +53,12 @@ async def broker(broker_config_file):
stderr=subprocess.PIPE,
)
# Give broker time to start
await asyncio.sleep(1)
await asyncio.sleep(4)
yield proc
proc.terminate()
proc.wait()
@pytest.mark.timeout(12)
def test_cli_help_messages():
"""Test that help messages are displayed correctly."""
env = os.environ.copy()
@ -79,7 +78,7 @@ def test_cli_help_messages():
output = subprocess.check_output([amqtt_pub_path, "--help"], env=env, text=True)
assert "Usage: amqtt_pub" in output
@pytest.mark.timeout(12)
def test_broker_version():
"""Test broker version command."""
output = subprocess.check_output(["amqtt", "--version"])
@ -87,7 +86,6 @@ def test_broker_version():
@pytest.mark.asyncio
@pytest.mark.timeout(12)
async def test_broker_start_stop(broker_config_file):
"""Test broker start and stop with config file."""
proc = subprocess.Popen(
@ -97,7 +95,7 @@ async def test_broker_start_stop(broker_config_file):
)
# Give broker time to start
await asyncio.sleep(1)
# Verify broker is running by connecting a client
client = MQTTClient()
await client.connect("mqtt://127.0.0.1:1884")
@ -109,7 +107,6 @@ async def test_broker_start_stop(broker_config_file):
@pytest.mark.asyncio
@pytest.mark.timeout(12)
async def test_publish_subscribe(broker):
"""Test pub/sub CLI tools with running broker."""
@ -162,7 +159,6 @@ async def test_publish_subscribe(broker):
@pytest.mark.asyncio
@pytest.mark.timeout(12)
async def test_pub_sub_retain(broker):
"""Test various pub/sub will retain options."""
# Test publishing with retain flag
@ -195,32 +191,38 @@ async def test_pub_sub_retain(broker):
@pytest.mark.asyncio
@pytest.mark.timeout(12)
async def test_pub_errors():
async def test_pub_errors(client_config_file):
"""Test error handling in pub/sub tools."""
# Test connection to non-existent broker
pub_proc = subprocess.run(
[
cmd = [
"amqtt_pub",
"--url", "mqtt://127.0.0.1:9999", # Wrong port
"-t", "test/topic",
"-m", "test",
],
capture_output=True,
"-c", client_config_file,
]
proc = await asyncio.create_subprocess_shell(
" ".join(cmd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
assert pub_proc.returncode != 0, f"publisher error code: {pub_proc.returncode}"
assert "Connection failed" in str(pub_proc.stderr)
stdout, stderr = await proc.communicate()
logger.debug(f"Command: {cmd}")
logger.debug(f"Stdout: {stdout.decode()}")
logger.debug(f"Stderr: {stderr.decode()}")
assert proc.returncode != 0, f"publisher error code: {proc.returncode}"
assert "Connection failed" in str(stderr)
@pytest.mark.asyncio
@pytest.mark.timeout(12)
async def test_sub_errors():
async def test_sub_errors(client_config_file):
# Test invalid URL format
sub_proc = subprocess.run(
[
"amqtt_sub",
"--url", "invalid://url",
"-t", "test/topic",
"-c", client_config_file
],
capture_output=True,
)
@ -234,18 +236,7 @@ def client_config():
"ping_delay": 1,
"default_qos": 0,
"default_retain": False,
"auto_reconnect": True,
"reconnect_max_interval": 5,
"reconnect_retries": 10,
"topics": {
"test": {
"qos": 0
},
"some_topic": {
"qos": 2,
"retain": True
}
},
"auto_reconnect": False,
"will": {
"topic": "test/will/topic",
"message": "client ABC has disconnected",
@ -260,49 +251,56 @@ def client_config():
@pytest.fixture
def client_config_file(client_config, tmp_path):
config_path = tmp_path / "config.yaml"
config_path = tmp_path / "client.yaml"
with config_path.open("w") as f:
yaml.dump(client_config, f)
return str(config_path)
@pytest.mark.timeout(12)
def test_pub_client_config(broker, client_config_file):
pub_proc = subprocess.run(
[
@pytest.mark.asyncio
async def test_pub_client_config(broker, client_config_file):
await asyncio.sleep(1)
cmd = [
"amqtt_pub",
"-t", "test/topic",
"-m", "test",
"-c", client_config_file
],
capture_output=True,
]
proc = await asyncio.create_subprocess_shell(
" ".join(cmd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
assert pub_proc.returncode == 0, f"publisher error code: {pub_proc.returncode}"
stdout, stderr = await proc.communicate()
logger.debug(f"Command: {cmd}")
logger.debug(f"Stdout: {stdout.decode()}")
logger.debug(f"Stderr: {stderr.decode()}")
assert proc.returncode == 0, f"publisher error code: {proc.returncode}"
@pytest.mark.asyncio
@pytest.mark.timeout(12)
async def test_pub_client_config_will(broker, client_config, client_config_file):
async def test_pub_client_config_will(broker, client_config_file):
# verifying client script functionality of will topic (publisher)
# https://github.com/Yakifo/amqtt/issues/159
await asyncio.sleep(1)
client1 = MQTTClient(client_id="client1")
await client1.connect('mqtt://localhost:1884')
await client1.subscribe([
("test/will/topic", QOS_0)
])
pub_proc = subprocess.run(
[
"amqtt_pub",
cmd = ["amqtt_pub",
"-t", "test/topic",
"-m", "test of regular topic",
"-c", client_config_file
],
capture_output=True,
"-m", "\"test of regular topic\"",
"-c", client_config_file]
proc = await asyncio.create_subprocess_shell(
" ".join(cmd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
assert pub_proc.returncode == 0, f"publisher error code: {pub_proc.returncode}"
stdout, stderr = await proc.communicate()
logger.debug(f"Command: {cmd}")
logger.debug(f"Stdout: {stdout.decode()}")
logger.debug(f"Stderr: {stderr.decode()}")
message = await client1.deliver_message(timeout_duration=1)
assert message.topic == 'test/will/topic'
@ -310,8 +308,8 @@ async def test_pub_client_config_will(broker, client_config, client_config_file)
await client1.disconnect()
@pytest.mark.asyncio
@pytest.mark.timeout(12)
async def test_sub_client_config_will(broker, client_config_file):
@pytest.mark.timeout(20)
async def test_sub_client_config_will(broker, client_config, client_config_file):
# verifying client script functionality of will topic (subscriber)
# https://github.com/Yakifo/amqtt/issues/159
@ -322,23 +320,29 @@ async def test_sub_client_config_will(broker, client_config_file):
("test/will/topic", QOS_0)
])
sub_proc = subprocess.Popen(
[
"amqtt_sub",
cmd = ["amqtt_sub",
"-t", "test/topic",
"-c", client_config_file
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
"-c", client_config_file,
"-n", "1"]
proc = await asyncio.create_subprocess_shell(
" ".join(cmd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await asyncio.sleep(0.5)
sub_proc.send_signal(signal.SIGINT)
_, _ = sub_proc.communicate()
await asyncio.sleep(2)
assert sub_proc.returncode == 0, f"subscriber error code: {sub_proc.returncode}"
# cause `amqtt_sub` to exit after receiving this one message
await client1.publish("test/topic", b'my test message')
# validate the 'will' message was received correctly
message = await client1.deliver_message(timeout_duration=3)
assert message.topic == 'test/will/topic'
assert message.data == b'client ABC has disconnected'
await client1.disconnect()
stdout, stderr = await proc.communicate()
logger.debug(f"Command: {cmd}")
logger.debug(f"Stdout: {stdout.decode()}")
logger.debug(f"Stderr: {stderr.decode()}")

Wyświetl plik

@ -265,7 +265,6 @@ def client_config():
@pytest.mark.asyncio
@pytest.mark.timeout(12)
async def test_client_publish_will_with_retain(broker_fixture, client_config):
# verifying client functionality of will topic