diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index 0b4a94a..1e89ae9 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -465,7 +465,8 @@ class Broker: subscriptions = subscribe_waiter.result() return_codes = [] for subscription in subscriptions['topics']: - return_codes.append(self.add_subscription(subscription, client_session)) + result = yield from self.add_subscription(subscription, client_session) + return_codes.append(result) yield from handler.mqtt_acknowledge_subscription(subscriptions['packet_id'], return_codes) for index, subscription in enumerate(subscriptions['topics']): if return_codes[index] != 0x80: @@ -559,6 +560,41 @@ class Broker: # If all plugins returned True, authentication is success return auth_result + @asyncio.coroutine + def topic_filtering(self, session: Session, topic): + """ + This method call the topic_filtering method on registered plugins to check that the subscription is allowed. + User is considered allowed if all plugins called return True. + Plugins topic_filtering() method are supposed to return : + - True if MQTT client can be subscribed to the topic + - False if MQTT client is not allowed to subscribe to the topic + - None if topic filtering can't be achieved (then plugin result is then ignored) + :param session: + :param listener: + :param topic: Topic in which the client wants to subscribe + :return: + """ + topic_plugins = None + topic_config = self.config.get('topic-check', None) + if topic_config and topic_config.get('enabled', False): + topic_plugins = topic_config.get('plugins', None) + returns = yield from self.plugins_manager.map_plugin_coro( + "topic_filtering", + session=session, + topic=topic, + filter_plugins=topic_plugins) + topic_result = True + if returns: + for plugin in returns: + res = returns[plugin] + if res is False: + topic_result = False + self.logger.debug("Topic filtering failed due to '%s' plugin result: %s" % (plugin.name, res)) + else: + self.logger.debug("'%s' plugin result: %s" % (plugin.name, res)) + # If all plugins returned True, authentication is success + return topic_result + def retain_message(self, source_session, topic_name, data, qos=None): if data is not None and data != b'': # If retained flag set, store the message for further subscriptions @@ -571,6 +607,7 @@ class Broker: self.logger.debug("Clear retained messages for topic '%s'" % topic_name) del self._retained_messages[topic_name] + @asyncio.coroutine def add_subscription(self, subscription, session): try: a_filter = subscription[0] @@ -582,7 +619,10 @@ class Broker: if "/+" not in a_filter and "+/" not in a_filter: # [MQTT-4.7.1-3] + wildcard character must occupy entire level return 0x80 - + # Check if the client is authorised to connect to the topic + permitted = yield from self.topic_filtering(session, topic=a_filter) + if not permitted: + return 0x80 qos = subscription[1] if 'max-qos' in self.config and qos > self.config['max-qos']: qos = self.config['max-qos'] diff --git a/hbmqtt/plugins/topic_checking.py b/hbmqtt/plugins/topic_checking.py new file mode 100644 index 0000000..19e5e5a --- /dev/null +++ b/hbmqtt/plugins/topic_checking.py @@ -0,0 +1,37 @@ +import asyncio + + +class BaseTopicPlugin: + def __init__(self, context): + self.context = context + try: + self.topic_config = self.context.config['topic-check'] + except KeyError: + self.context.logger.warning("'topic-check' section not found in context configuration") + + def topic_filtering(self, *args, **kwargs): + if not self.topic_config: + # auth config section not found + self.context.logger.warning("'auth' section not found in context configuration") + return False + return True + + +class TopicTabooPlugin(BaseTopicPlugin): + def __init__(self, context): + super().__init__(context) + self._taboo = ['prohibited', 'top-secret', 'data/classified'] + + @asyncio.coroutine + def topic_filtering(self, *args, **kwargs): + filter_result = super().topic_filtering(*args, **kwargs) + if filter_result: + session = kwargs.get('session', None) + topic = kwargs.get('topic', None) + if session.username and topic: + if session.username != 'admin' and topic in self._taboo: + return False + return True + else: + return False + return filter_result diff --git a/samples/broker_taboo.py b/samples/broker_taboo.py new file mode 100644 index 0000000..ba07122 --- /dev/null +++ b/samples/broker_taboo.py @@ -0,0 +1,49 @@ +import logging +import asyncio +import os +from hbmqtt.broker import Broker + +logger = logging.getLogger(__name__) + +config = { + 'listeners': { + 'default': { + 'type': 'tcp', + 'bind': '0.0.0.0:1883', + }, + 'ws-mqtt': { + 'bind': '127.0.0.1:8080', + 'type': 'ws', + 'max_connections': 10, + }, + }, + 'sys_interval': 10, + 'auth': { + 'allow-anonymous': True, + 'password-file': os.path.join(os.path.dirname(os.path.realpath(__file__)), "passwd"), + 'plugins': [ + 'auth_file', 'auth_anonymous' + ] + + }, + 'topic-check': { + 'enabled': True, + 'plugins': [ + 'topic_taboo' + ] + } +} + +broker = Broker(config) + + +@asyncio.coroutine +def test_coro(): + yield from broker.start() + + +if __name__ == '__main__': + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + asyncio.get_event_loop().run_until_complete(test_coro()) + asyncio.get_event_loop().run_forever() diff --git a/samples/client_publish_taboo.py b/samples/client_publish_taboo.py new file mode 100644 index 0000000..40e9a4f --- /dev/null +++ b/samples/client_publish_taboo.py @@ -0,0 +1,33 @@ +import logging +import asyncio + +from hbmqtt.client import MQTTClient, ConnectException + + +# +# This sample shows how to publish messages to broker using different QOS +# Debug outputs shows the message flows +# + +logger = logging.getLogger(__name__) + + +@asyncio.coroutine +def test_coro(): + try: + C = MQTTClient() + yield from C.connect('mqtt://0.0.0.0:1883') + yield from C.publish('data/classified', b'TOP SECRET', qos=0x01) + yield from C.publish('data/memes', b'REAL FUN', qos=0x01) + logger.info("messages published") + yield from C.disconnect() + except ConnectException as ce: + logger.error("Connection failed: %s" % ce) + asyncio.get_event_loop().stop() + + +if __name__ == '__main__': + formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" + formatter = "%(message)s" + logging.basicConfig(level=logging.DEBUG, format=formatter) + asyncio.get_event_loop().run_until_complete(test_coro()) diff --git a/samples/client_subscribe_taboo.py b/samples/client_subscribe_taboo.py new file mode 100644 index 0000000..ff6b15a --- /dev/null +++ b/samples/client_subscribe_taboo.py @@ -0,0 +1,42 @@ +import logging +import asyncio + +from hbmqtt.client import MQTTClient, ClientException +from hbmqtt.mqtt.constants import QOS_1 + + +# +# This sample shows how to subscbribe a topic and receive data from incoming messages +# It subscribes to '$SYS/broker/uptime' topic and displays the first ten values returned +# by the broker. +# + +logger = logging.getLogger(__name__) + + +@asyncio.coroutine +def uptime_coro(): + C = MQTTClient() + yield from C.connect('mqtt://test:test@0.0.0.0:1883') + # Subscribe to '$SYS/broker/uptime' with QOS=1 + yield from C.subscribe([ + ('data/memes', QOS_1), # Topic allowed + ('data/classified', QOS_1), # Topic forbidden + ]) + logger.info("Subscribed") + try: + for i in range(1, 100): + message = yield from C.deliver_message() + packet = message.publish_packet + print("%d: %s => %s" % (i, packet.variable_header.topic_name, str(packet.payload.data))) + yield from C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#']) + logger.info("UnSubscribed") + yield from C.disconnect() + except ClientException as ce: + logger.error("Client exception: %s" % ce) + + +if __name__ == '__main__': + formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + asyncio.get_event_loop().run_until_complete(uptime_coro()) diff --git a/setup.py b/setup.py index 6198f5b..62945a2 100644 --- a/setup.py +++ b/setup.py @@ -47,6 +47,7 @@ setup( 'packet_logger_plugin = hbmqtt.plugins.logging:PacketLoggerPlugin', 'auth_anonymous = hbmqtt.plugins.authentication:AnonymousAuthPlugin', 'auth_file = hbmqtt.plugins.authentication:FileAuthPlugin', + 'topic_taboo = hbmqtt.plugins.topic_checking:TopicTabooPlugin', 'broker_sys = hbmqtt.plugins.sys.broker:BrokerSysPlugin', ], 'hbmqtt.client.plugins': [