kopia lustrzana https://github.com/Yakifo/amqtt
messages for reconnected sessions where clean session is false, need to send retained topic messages
rodzic
341c6c1732
commit
d4d5e1c670
|
@ -38,3 +38,4 @@ coverage.xml
|
||||||
#----- generated files -----
|
#----- generated files -----
|
||||||
*.log
|
*.log
|
||||||
*memray*
|
*memray*
|
||||||
|
.coverage*
|
||||||
|
|
|
@ -498,14 +498,14 @@ class Broker:
|
||||||
self.logger.debug(f"{client_session.client_id} Start messages handling")
|
self.logger.debug(f"{client_session.client_id} Start messages handling")
|
||||||
await handler.start()
|
await handler.start()
|
||||||
|
|
||||||
# publish messages that were retained because the client session was disconnecte
|
# publish messages that were retained because the client session was disconnected
|
||||||
self.logger.debug(f"Offline messages queue size: {client_session.retained_messages.qsize()}")
|
self.logger.debug(f"Offline messages queue size: {client_session.retained_messages.qsize()}")
|
||||||
await self._publish_session_retained_messages(client_session)
|
await self._publish_session_retained_messages(client_session)
|
||||||
|
|
||||||
# publish messages that were marked as retained for a specific
|
# if this is not a new session, there are subscriptions associated with them
|
||||||
# self.logger.debug(f"Publish messages that have been marked as retained.")
|
self.logger.debug(f"Publish retained messages to a pre-existing session's subscriptions.")
|
||||||
# for topic in self._subscriptions.keys():
|
for topic in self._subscriptions.keys():
|
||||||
# await self._publish_retained_messages_for_subscription( (topic, QOS_0), client_session)
|
await self._publish_retained_messages_for_subscription( (topic, QOS_0), client_session)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
async def uptime_coro() -> None:
|
async def uptime_coro() -> None:
|
||||||
client = MQTTClient()
|
client = MQTTClient()
|
||||||
await client.connect("mqtt://test.mosquitto.org/")
|
await client.connect("mqtt://localhost:1883")
|
||||||
|
|
||||||
await client.subscribe(
|
await client.subscribe(
|
||||||
[
|
[
|
||||||
|
|
|
@ -200,8 +200,40 @@ async def test_client_publish_ws():
|
||||||
await broker.shutdown()
|
await broker.shutdown()
|
||||||
|
|
||||||
|
|
||||||
def test_client_subscribe():
|
broker_std_config = {
|
||||||
client_subscribe_main()
|
"listeners": {
|
||||||
|
"default": {
|
||||||
|
"type": "tcp",
|
||||||
|
"bind": "0.0.0.0:1883", }
|
||||||
|
},
|
||||||
|
'sys_interval':1,
|
||||||
|
"auth": {
|
||||||
|
"allow-anonymous": True,
|
||||||
|
"plugins": ["auth_anonymous"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_client_subscribe():
|
||||||
|
|
||||||
|
# start a secure broker
|
||||||
|
broker = Broker(config=broker_std_config)
|
||||||
|
await broker.start()
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
# run the sample
|
||||||
|
client_subscribe_script = Path(__file__).parent.parent / "samples/client_subscribe.py"
|
||||||
|
process = subprocess.Popen(["python", client_subscribe_script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
stdout, stderr = process.communicate()
|
||||||
|
logger.debug("-------------------------------------------------")
|
||||||
|
logger.debug(stderr.decode("utf-8"))
|
||||||
|
logger.debug(stdout.decode("utf-8"))
|
||||||
|
assert "ERROR" not in stderr.decode("utf-8")
|
||||||
|
assert "Exception" not in stderr.decode("utf-8")
|
||||||
|
|
||||||
|
await broker.shutdown()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
|
Ładowanie…
Reference in New Issue