diff --git a/.gitignore b/.gitignore index 10f365e..77680f3 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,4 @@ coverage.xml #----- generated files ----- *.log *memray* +.coverage* diff --git a/amqtt/broker.py b/amqtt/broker.py index 7c51a9e..c127a00 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -498,14 +498,14 @@ class Broker: self.logger.debug(f"{client_session.client_id} Start messages handling") await handler.start() - # publish messages that were retained because the client session was disconnecte + # publish messages that were retained because the client session was disconnected self.logger.debug(f"Offline messages queue size: {client_session.retained_messages.qsize()}") await self._publish_session_retained_messages(client_session) - # publish messages that were marked as retained for a specific - # self.logger.debug(f"Publish messages that have been marked as retained.") - # for topic in self._subscriptions.keys(): - # await self._publish_retained_messages_for_subscription( (topic, QOS_0), client_session) + # if this is not a new session, there are subscriptions associated with them + self.logger.debug(f"Publish retained messages to a pre-existing session's subscriptions.") + for topic in self._subscriptions.keys(): + await self._publish_retained_messages_for_subscription( (topic, QOS_0), client_session) diff --git a/samples/client_subscribe.py b/samples/client_subscribe.py index 642e66e..ccf8502 100644 --- a/samples/client_subscribe.py +++ b/samples/client_subscribe.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) async def uptime_coro() -> None: client = MQTTClient() - await client.connect("mqtt://test.mosquitto.org/") + await client.connect("mqtt://localhost:1883") await client.subscribe( [ diff --git a/tests/test_samples.py b/tests/test_samples.py index d2798da..d4be968 100644 --- a/tests/test_samples.py +++ b/tests/test_samples.py @@ -200,8 +200,40 @@ async def test_client_publish_ws(): await broker.shutdown() -def test_client_subscribe(): - client_subscribe_main() +broker_std_config = { + "listeners": { + "default": { + "type": "tcp", + "bind": "0.0.0.0:1883", } + }, + 'sys_interval':1, + "auth": { + "allow-anonymous": True, + "plugins": ["auth_anonymous"] + } +} + + +@pytest.mark.asyncio +async def test_client_subscribe(): + + # start a secure broker + broker = Broker(config=broker_std_config) + await broker.start() + await asyncio.sleep(1) + + # run the sample + client_subscribe_script = Path(__file__).parent.parent / "samples/client_subscribe.py" + process = subprocess.Popen(["python", client_subscribe_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + await asyncio.sleep(1) + stdout, stderr = process.communicate() + logger.debug("-------------------------------------------------") + logger.debug(stderr.decode("utf-8")) + logger.debug(stdout.decode("utf-8")) + assert "ERROR" not in stderr.decode("utf-8") + assert "Exception" not in stderr.decode("utf-8") + + await broker.shutdown() @pytest.mark.asyncio