kopia lustrzana https://github.com/Yakifo/amqtt
Added test for broadcast loop cancellation
rodzic
85e3320da5
commit
604c45ca7a
|
@ -3,7 +3,7 @@
|
|||
# See the file license.txt for copying permission.
|
||||
import asyncio
|
||||
import logging
|
||||
from unittest.mock import call, MagicMock
|
||||
from unittest.mock import call, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
@ -31,6 +31,7 @@ from amqtt.mqtt import (
|
|||
)
|
||||
from amqtt.mqtt.connect import ConnectVariableHeader, ConnectPayload
|
||||
from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
|
||||
from amqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler
|
||||
|
||||
|
||||
formatter = (
|
||||
|
@ -625,3 +626,30 @@ def test_matches_single_level_wildcard(broker):
|
|||
"sport/tennis/player2",
|
||||
]:
|
||||
assert broker.matches(good_topic, test_filter)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broker_broadcast_cancellation(broker):
|
||||
topic = "test"
|
||||
data = b"data"
|
||||
qos = QOS_0
|
||||
|
||||
sub_client = MQTTClient()
|
||||
await sub_client.connect("mqtt://127.0.0.1")
|
||||
await sub_client.subscribe([(topic, qos)])
|
||||
|
||||
with patch.object(
|
||||
BrokerProtocolHandler, "mqtt_publish", side_effect=asyncio.CancelledError
|
||||
) as mocked_mqtt_publish:
|
||||
await _client_publish(topic, data, qos)
|
||||
|
||||
# Second publish triggers the awaiting of first `mqtt_publish` task
|
||||
await _client_publish(topic, data, qos)
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
mocked_mqtt_publish.assert_awaited()
|
||||
|
||||
# Ensure broadcast loop is still functional and can deliver the message
|
||||
await _client_publish(topic, data, qos)
|
||||
message = await asyncio.wait_for(sub_client.deliver_message(), timeout=1)
|
||||
assert message
|
||||
|
|
Ładowanie…
Reference in New Issue