Merge pull request #118 from romancardenas/master

Added topic filtering Plugin
pull/8/head
Nicolas 2018-03-21 16:05:02 +01:00 zatwierdzone przez GitHub
commit 53fd32eb8c
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
6 zmienionych plików z 204 dodań i 2 usunięć

Wyświetl plik

@ -465,7 +465,8 @@ class Broker:
subscriptions = subscribe_waiter.result()
return_codes = []
for subscription in subscriptions['topics']:
return_codes.append(self.add_subscription(subscription, client_session))
result = yield from self.add_subscription(subscription, client_session)
return_codes.append(result)
yield from handler.mqtt_acknowledge_subscription(subscriptions['packet_id'], return_codes)
for index, subscription in enumerate(subscriptions['topics']):
if return_codes[index] != 0x80:
@ -559,6 +560,41 @@ class Broker:
# If all plugins returned True, authentication is success
return auth_result
@asyncio.coroutine
def topic_filtering(self, session: Session, topic):
"""
This method call the topic_filtering method on registered plugins to check that the subscription is allowed.
User is considered allowed if all plugins called return True.
Plugins topic_filtering() method are supposed to return :
- True if MQTT client can be subscribed to the topic
- False if MQTT client is not allowed to subscribe to the topic
- None if topic filtering can't be achieved (then plugin result is then ignored)
:param session:
:param listener:
:param topic: Topic in which the client wants to subscribe
:return:
"""
topic_plugins = None
topic_config = self.config.get('topic-check', None)
if topic_config and topic_config.get('enabled', False):
topic_plugins = topic_config.get('plugins', None)
returns = yield from self.plugins_manager.map_plugin_coro(
"topic_filtering",
session=session,
topic=topic,
filter_plugins=topic_plugins)
topic_result = True
if returns:
for plugin in returns:
res = returns[plugin]
if res is False:
topic_result = False
self.logger.debug("Topic filtering failed due to '%s' plugin result: %s" % (plugin.name, res))
else:
self.logger.debug("'%s' plugin result: %s" % (plugin.name, res))
# If all plugins returned True, authentication is success
return topic_result
def retain_message(self, source_session, topic_name, data, qos=None):
if data is not None and data != b'':
# If retained flag set, store the message for further subscriptions
@ -571,6 +607,7 @@ class Broker:
self.logger.debug("Clear retained messages for topic '%s'" % topic_name)
del self._retained_messages[topic_name]
@asyncio.coroutine
def add_subscription(self, subscription, session):
try:
a_filter = subscription[0]
@ -582,7 +619,10 @@ class Broker:
if "/+" not in a_filter and "+/" not in a_filter:
# [MQTT-4.7.1-3] + wildcard character must occupy entire level
return 0x80
# Check if the client is authorised to connect to the topic
permitted = yield from self.topic_filtering(session, topic=a_filter)
if not permitted:
return 0x80
qos = subscription[1]
if 'max-qos' in self.config and qos > self.config['max-qos']:
qos = self.config['max-qos']

Wyświetl plik

@ -0,0 +1,37 @@
import asyncio
class BaseTopicPlugin:
def __init__(self, context):
self.context = context
try:
self.topic_config = self.context.config['topic-check']
except KeyError:
self.context.logger.warning("'topic-check' section not found in context configuration")
def topic_filtering(self, *args, **kwargs):
if not self.topic_config:
# auth config section not found
self.context.logger.warning("'auth' section not found in context configuration")
return False
return True
class TopicTabooPlugin(BaseTopicPlugin):
def __init__(self, context):
super().__init__(context)
self._taboo = ['prohibited', 'top-secret', 'data/classified']
@asyncio.coroutine
def topic_filtering(self, *args, **kwargs):
filter_result = super().topic_filtering(*args, **kwargs)
if filter_result:
session = kwargs.get('session', None)
topic = kwargs.get('topic', None)
if session.username and topic:
if session.username != 'admin' and topic in self._taboo:
return False
return True
else:
return False
return filter_result

Wyświetl plik

@ -0,0 +1,49 @@
import logging
import asyncio
import os
from hbmqtt.broker import Broker
logger = logging.getLogger(__name__)
config = {
'listeners': {
'default': {
'type': 'tcp',
'bind': '0.0.0.0:1883',
},
'ws-mqtt': {
'bind': '127.0.0.1:8080',
'type': 'ws',
'max_connections': 10,
},
},
'sys_interval': 10,
'auth': {
'allow-anonymous': True,
'password-file': os.path.join(os.path.dirname(os.path.realpath(__file__)), "passwd"),
'plugins': [
'auth_file', 'auth_anonymous'
]
},
'topic-check': {
'enabled': True,
'plugins': [
'topic_taboo'
]
}
}
broker = Broker(config)
@asyncio.coroutine
def test_coro():
yield from broker.start()
if __name__ == '__main__':
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())
asyncio.get_event_loop().run_forever()

Wyświetl plik

@ -0,0 +1,33 @@
import logging
import asyncio
from hbmqtt.client import MQTTClient, ConnectException
#
# This sample shows how to publish messages to broker using different QOS
# Debug outputs shows the message flows
#
logger = logging.getLogger(__name__)
@asyncio.coroutine
def test_coro():
try:
C = MQTTClient()
yield from C.connect('mqtt://0.0.0.0:1883')
yield from C.publish('data/classified', b'TOP SECRET', qos=0x01)
yield from C.publish('data/memes', b'REAL FUN', qos=0x01)
logger.info("messages published")
yield from C.disconnect()
except ConnectException as ce:
logger.error("Connection failed: %s" % ce)
asyncio.get_event_loop().stop()
if __name__ == '__main__':
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
formatter = "%(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())

Wyświetl plik

@ -0,0 +1,42 @@
import logging
import asyncio
from hbmqtt.client import MQTTClient, ClientException
from hbmqtt.mqtt.constants import QOS_1
#
# This sample shows how to subscbribe a topic and receive data from incoming messages
# It subscribes to '$SYS/broker/uptime' topic and displays the first ten values returned
# by the broker.
#
logger = logging.getLogger(__name__)
@asyncio.coroutine
def uptime_coro():
C = MQTTClient()
yield from C.connect('mqtt://test:test@0.0.0.0:1883')
# Subscribe to '$SYS/broker/uptime' with QOS=1
yield from C.subscribe([
('data/memes', QOS_1), # Topic allowed
('data/classified', QOS_1), # Topic forbidden
])
logger.info("Subscribed")
try:
for i in range(1, 100):
message = yield from C.deliver_message()
packet = message.publish_packet
print("%d: %s => %s" % (i, packet.variable_header.topic_name, str(packet.payload.data)))
yield from C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
logger.info("UnSubscribed")
yield from C.disconnect()
except ClientException as ce:
logger.error("Client exception: %s" % ce)
if __name__ == '__main__':
formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.get_event_loop().run_until_complete(uptime_coro())

Wyświetl plik

@ -47,6 +47,7 @@ setup(
'packet_logger_plugin = hbmqtt.plugins.logging:PacketLoggerPlugin',
'auth_anonymous = hbmqtt.plugins.authentication:AnonymousAuthPlugin',
'auth_file = hbmqtt.plugins.authentication:FileAuthPlugin',
'topic_taboo = hbmqtt.plugins.topic_checking:TopicTabooPlugin',
'broker_sys = hbmqtt.plugins.sys.broker:BrokerSysPlugin',
],
'hbmqtt.client.plugins': [