kopia lustrzana https://github.com/Yakifo/amqtt
Fix #28
rodzic
dd04b44a57
commit
360d5b03c7
|
|
@ -608,7 +608,7 @@ class Broker:
|
|||
|
||||
def matches(self, topic, a_filter):
|
||||
import re
|
||||
match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[\$\s\w\d]+'))
|
||||
match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[/\$\s\w\d]+'))
|
||||
if match_pattern.match(topic):
|
||||
return True
|
||||
else:
|
||||
|
|
@ -625,7 +625,9 @@ class Broker:
|
|||
if self.logger.isEnabledFor(logging.DEBUG):
|
||||
self.logger.debug("broadcasting %r" % broadcast)
|
||||
for k_filter in self._subscriptions:
|
||||
if self.matches(broadcast['topic'], k_filter):
|
||||
if broadcast['topic'].startswith("$") and (k_filter.startswith("+") or k_filter.startswith("#")):
|
||||
self.logger.debug("[MQTT-4.7.2-1] - ignoring brodcasting $ topic to subscriptions starting with + or #")
|
||||
elif self.matches(broadcast['topic'], k_filter):
|
||||
subscriptions = self._subscriptions[k_filter]
|
||||
for (target_session, qos) in subscriptions:
|
||||
if 'qos' in broadcast:
|
||||
|
|
|
|||
|
|
@ -360,6 +360,82 @@ class BrokerTest(unittest.TestCase):
|
|||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_subscribe_publish_dollar_topic_1(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
try:
|
||||
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
|
||||
yield from broker.start()
|
||||
self.assertTrue(broker.transitions.is_started())
|
||||
sub_client = MQTTClient()
|
||||
yield from sub_client.connect('mqtt://localhost')
|
||||
ret = yield from sub_client.subscribe([('#', QOS_0)])
|
||||
self.assertEquals(ret, [QOS_0])
|
||||
|
||||
yield from self._client_publish('/topic', b'data', QOS_0)
|
||||
message = yield from sub_client.deliver_message()
|
||||
self.assertIsNotNone(message)
|
||||
|
||||
yield from self._client_publish('$topic', b'data', QOS_0)
|
||||
yield from asyncio.sleep(0.1)
|
||||
message = None
|
||||
try:
|
||||
message = yield from sub_client.deliver_message(timeout=2)
|
||||
except Exception as e:
|
||||
pass
|
||||
self.assertIsNone(message)
|
||||
yield from sub_client.disconnect()
|
||||
yield from asyncio.sleep(0.1)
|
||||
yield from broker.shutdown()
|
||||
self.assertTrue(broker.transitions.is_stopped())
|
||||
future.set_result(True)
|
||||
except Exception as ae:
|
||||
future.set_exception(ae)
|
||||
|
||||
future = asyncio.Future(loop=self.loop)
|
||||
self.loop.run_until_complete(test_coro())
|
||||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_subscribe_publish_dollar_topic_2(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
try:
|
||||
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
|
||||
yield from broker.start()
|
||||
self.assertTrue(broker.transitions.is_started())
|
||||
sub_client = MQTTClient()
|
||||
yield from sub_client.connect('mqtt://localhost')
|
||||
ret = yield from sub_client.subscribe([('+/monitor/Clients', QOS_0)])
|
||||
self.assertEquals(ret, [QOS_0])
|
||||
|
||||
yield from self._client_publish('/test/monitor/Clients', b'data', QOS_0)
|
||||
message = yield from sub_client.deliver_message()
|
||||
self.assertIsNotNone(message)
|
||||
|
||||
yield from self._client_publish('$SYS/monitor/Clients', b'data', QOS_0)
|
||||
yield from asyncio.sleep(0.1)
|
||||
message = None
|
||||
try:
|
||||
message = yield from sub_client.deliver_message(timeout=2)
|
||||
except Exception as e:
|
||||
pass
|
||||
self.assertIsNone(message)
|
||||
yield from sub_client.disconnect()
|
||||
yield from asyncio.sleep(0.1)
|
||||
yield from broker.shutdown()
|
||||
self.assertTrue(broker.transitions.is_stopped())
|
||||
future.set_result(True)
|
||||
except Exception as ae:
|
||||
future.set_exception(ae)
|
||||
|
||||
future = asyncio.Future(loop=self.loop)
|
||||
self.loop.run_until_complete(test_coro())
|
||||
if future.exception():
|
||||
raise future.exception()
|
||||
|
||||
@patch('hbmqtt.broker.PluginManager')
|
||||
def test_client_publish_retain_subscribe(self, MockPluginManager):
|
||||
@asyncio.coroutine
|
||||
|
|
|
|||
Ładowanie…
Reference in New Issue