kopia lustrzana https://github.com/Yakifo/amqtt
rodzic
360d5b03c7
commit
5dc5293063
|
@ -458,6 +458,9 @@ class Broker:
|
||||||
if self.logger.isEnabledFor(logging.DEBUG):
|
if self.logger.isEnabledFor(logging.DEBUG):
|
||||||
self.logger.debug("%s handling message delivery" % client_session.client_id)
|
self.logger.debug("%s handling message delivery" % client_session.client_id)
|
||||||
app_message = wait_deliver.result()
|
app_message = wait_deliver.result()
|
||||||
|
if "#" in app_message.topic or "+" in app_message.topic:
|
||||||
|
self.logger.warn("[MQTT-3.3.2-2] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id)
|
||||||
|
break
|
||||||
yield from self.plugins_manager.fire_event(EVENT_BROKER_MESSAGE_RECEIVED,
|
yield from self.plugins_manager.fire_event(EVENT_BROKER_MESSAGE_RECEIVED,
|
||||||
client_id=client_session.client_id,
|
client_id=client_session.client_id,
|
||||||
message=app_message)
|
message=app_message)
|
||||||
|
|
|
@ -259,6 +259,34 @@ class BrokerTest(unittest.TestCase):
|
||||||
if future.exception():
|
if future.exception():
|
||||||
raise future.exception()
|
raise future.exception()
|
||||||
|
|
||||||
|
@patch('hbmqtt.broker.PluginManager')
|
||||||
|
def test_client_publish_invalid_topic(self, MockPluginManager):
|
||||||
|
@asyncio.coroutine
|
||||||
|
def test_coro():
|
||||||
|
try:
|
||||||
|
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
|
||||||
|
yield from broker.start()
|
||||||
|
self.assertTrue(broker.transitions.is_started())
|
||||||
|
pub_client = MQTTClient()
|
||||||
|
ret = yield from pub_client.connect('mqtt://localhost/')
|
||||||
|
self.assertEqual(ret, 0)
|
||||||
|
|
||||||
|
ret_message = yield from pub_client.publish('/+', b'data', QOS_0)
|
||||||
|
yield from asyncio.sleep(0.1)
|
||||||
|
yield from pub_client.disconnect()
|
||||||
|
|
||||||
|
yield from asyncio.sleep(0.1)
|
||||||
|
yield from broker.shutdown()
|
||||||
|
self.assertTrue(broker.transitions.is_stopped())
|
||||||
|
future.set_result(True)
|
||||||
|
except Exception as ae:
|
||||||
|
future.set_exception(ae)
|
||||||
|
|
||||||
|
future = asyncio.Future(loop=self.loop)
|
||||||
|
self.loop.run_until_complete(test_coro())
|
||||||
|
if future.exception():
|
||||||
|
raise future.exception()
|
||||||
|
|
||||||
@patch('hbmqtt.broker.PluginManager')
|
@patch('hbmqtt.broker.PluginManager')
|
||||||
def test_client_publish_big(self, MockPluginManager):
|
def test_client_publish_big(self, MockPluginManager):
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
|
|
Ładowanie…
Reference in New Issue