diff --git a/hbmqtt/plugins/__init__.py b/hbmqtt/plugins/__init__.py new file mode 100644 index 0000000..e1bd617 --- /dev/null +++ b/hbmqtt/plugins/__init__.py @@ -0,0 +1 @@ +__author__ = 'nico' diff --git a/hbmqtt/plugins/manager.py b/hbmqtt/plugins/manager.py new file mode 100644 index 0000000..201a323 --- /dev/null +++ b/hbmqtt/plugins/manager.py @@ -0,0 +1,130 @@ +# Copyright (c) 2015 Nicolas JOUANIN +# +# See the file license.txt for copying permission. + +import pkg_resources +import logging +import asyncio + +from collections import namedtuple + +Plugin = namedtuple('Plugin', ['name', 'ep', 'object']) + + +class PluginManager: + """ + Wraps setuptools Entry point mechanism to provide a basic plugin system. + Plugins are loaded for a given namespace (group). + This plugin manager uses coroutines to run plugin call asynchrounously in an event queue + """ + def __init__(self, namespace, context, loop=None): + if loop is not None: + self._loop = loop + else: + self._loop = asyncio.get_event_loop() + + self.logger = logging.getLogger(__name__) + self.context = context + self._plugins = [] + self._load_plugins(namespace) + + @property + def app_context(self): + return self.context + + def _load_plugins(self, namespace): + self.logger.info("Loading plugins for namespace %s" % namespace) + for ep in pkg_resources.iter_entry_points(group=namespace): + plugin = self._load_plugin(ep) + self._plugins.append(plugin) + self.logger.info(" Plugin %s ready" % plugin.ep.name) + + def _load_plugin(self, ep: pkg_resources.EntryPoint): + try: + self.logger.debug(" Loading plugin %s" % ep) + plugin = ep.load(require=True) + self.logger.debug(" Initializing plugin %s" % ep) + obj = plugin(self) + return Plugin(ep.name, ep, obj) + except ImportError as ie: + self.logger.warn("Plugin %r import failed: %s" % (ep, ie)) + except pkg_resources.UnknownExtra as ue: + self.logger.warn("Plugin %r dependencies resolution failed: %s" % (ep, ue)) + + def get_plugin(self, name): + """ + Get a plugin by its name from the plugins loaded for the current namespace + :param name: + :return: + """ + for p in self._plugins: + if p.name == name: + return p + return None + + @property + def plugins(self): + """ + Get the loaded plugins list + :return: + """ + return self._plugins + + def _schedule_coro(self, coro): + return asyncio.Task(coro, loop=self._loop) + + @asyncio.coroutine + def fire_event(self, event_name, *args, **kwargs): + """ + Fire an event to plugins. + PluginManager schedule async calls for each plugin on method called "on_" + event_name + For example, on_connect will be called on event 'connect' + :param event_name: + :param args: + :param kwargs: + :return: + """ + tasks = [] + event_method_name = "on_" + event_name + for plugin in self._plugins: + event_method = getattr(plugin.object, event_method_name, None) + if event_method: + tasks.append(self._schedule_coro(event_method(*args, **kwargs))) + yield from asyncio.wait(tasks, loop=self._loop) + + @asyncio.coroutine + def map(self, coro, *args, **kwargs): + """ + Schedule a given coroutine call for each plugin. + The coro called get the Plugin instance as first argument of its method call + :param coro: + :param args: + :param kwargs: + :return: + """ + tasks = [] + for plugin in self._plugins: + coro_instance = coro(plugin, *args, **kwargs) + if coro_instance: + tasks.append(self._schedule_coro(coro_instance)) + ret = yield from asyncio.gather(*tasks, loop=self._loop) + return ret + + @staticmethod + def _get_coro(plugin, coro_name, *args, **kwargs): + try: + return getattr(plugin.object, coro_name, None)(*args, **kwargs) + except TypeError as te: + # Plugin doesn't implement coro_name + return None + + @asyncio.coroutine + def map_plugin_coro(self, coro_name, *args, **kwargs): + """ + Call a plugin declared by plugin by its name + :param coro_name: + :param args: + :param kwargs: + :return: + """ + return (yield from self.map(self._get_coro, coro_name, *args, **kwargs)) diff --git a/setup.py b/setup.py index def2f43..b2c8927 100644 --- a/setup.py +++ b/setup.py @@ -30,5 +30,11 @@ setup( 'Programming Language :: Python :: 3.4', 'Topic :: Communications', 'Topic :: Internet' - ] + ], + entry_points = { + 'hbmqtt.test.plugins': [ + 'test_plugin = tests.plugins.test_manager:TestPlugin', + 'event_plugin = tests.plugins.test_manager:EventTestPlugin' + ] + } ) \ No newline at end of file diff --git a/tests/plugins/__init__.py b/tests/plugins/__init__.py new file mode 100644 index 0000000..e1bd617 --- /dev/null +++ b/tests/plugins/__init__.py @@ -0,0 +1 @@ +__author__ = 'nico' diff --git a/tests/plugins/test_manager.py b/tests/plugins/test_manager.py new file mode 100644 index 0000000..e713eec --- /dev/null +++ b/tests/plugins/test_manager.py @@ -0,0 +1,58 @@ +# Copyright (c) 2015 Nicolas JOUANIN +# +# See the file license.txt for copying permission. +import unittest +import logging +import asyncio +from hbmqtt.plugins.manager import PluginManager + +formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" +logging.basicConfig(level=logging.DEBUG, format=formatter) + + +class TestPlugin: + def __init__(self, manager): + pass + + +class EventTestPlugin: + def __init__(self, manager: PluginManager): + self.test_flag = False + self.coro_flag = False + + @asyncio.coroutine + def on_test(self): + self.test_flag = True + + @asyncio.coroutine + def test_coro(self): + self.coro_flag = True + + +class TestPluginManager(unittest.TestCase): + def setUp(self): + self.loop = asyncio.new_event_loop() + + def test_load_plugin(self): + manager = PluginManager("hbmqtt.test.plugins", context=None) + self.assertTrue(len(manager._plugins) > 0) + + def test_fire_event(self): + @asyncio.coroutine + def fire_event(): + yield from manager.fire_event("test") + + manager = PluginManager("hbmqtt.test.plugins", context=None, loop=self.loop) + self.loop.run_until_complete(fire_event()) + plugin = manager.get_plugin("event_plugin") + self.assertTrue(plugin.object.test_flag) + + def test_map_coro(self): + @asyncio.coroutine + def call_coro(): + yield from manager.map_plugin_coro('test_coro') + + manager = PluginManager("hbmqtt.test.plugins", context=None, loop=self.loop) + self.loop.run_until_complete(call_coro()) + plugin = manager.get_plugin("event_plugin") + self.assertTrue(plugin.object.test_coro)