kopia lustrzana https://github.com/Yakifo/amqtt
rodzic
0e11d545d8
commit
f298a4a543
|
@ -548,8 +548,9 @@ class Broker:
|
|||
self._retained_messages[topic_name] = retained_message
|
||||
else:
|
||||
# [MQTT-3.3.1-10]
|
||||
self.logger.debug("Clear retained messages for topic '%s'" % topic_name)
|
||||
del self._retained_messages[topic_name]
|
||||
if topic_name in self._retained_messages:
|
||||
self.logger.debug("Clear retained messages for topic '%s'" % topic_name)
|
||||
del self._retained_messages[topic_name]
|
||||
|
||||
def add_subscription(self, subscription, session):
|
||||
import re
|
||||
|
|
|
@ -435,6 +435,33 @@ class BrokerTest(unittest.TestCase):
|
|||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_publish_retain_delete(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
try:
|
||||
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
|
||||
yield from broker.start()
|
||||
self.assertTrue(broker.transitions.is_started())
|
||||
|
||||
pub_client = MQTTClient()
|
||||
ret = yield from pub_client.connect('mqtt://localhost/')
|
||||
self.assertEqual(ret, 0)
|
||||
ret_message = yield from pub_client.publish('/topic', b'', QOS_0, retain=True)
|
||||
yield from pub_client.disconnect()
|
||||
yield from asyncio.sleep(0.1)
|
||||
self.assertNotIn('/topic', broker._retained_messages)
|
||||
yield from broker.shutdown()
|
||||
self.assertTrue(broker.transitions.is_stopped())
|
||||
future.set_result(True)
|
||||
except Exception as ae:
|
||||
future.set_exception(ae)
|
||||
|
||||
future = asyncio.Future(loop=self.loop)
|
||||
self.loop.run_until_complete(test_coro())
|
||||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_subscribe_publish(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
|
|
Ładowanie…
Reference in New Issue