kopia lustrzana https://github.com/Yakifo/amqtt
Implement core PluginManager
rodzic
d2142644b7
commit
a732a5ee72
|
@ -0,0 +1 @@
|
|||
__author__ = 'nico'
|
|
@ -0,0 +1,130 @@
|
|||
# Copyright (c) 2015 Nicolas JOUANIN
|
||||
#
|
||||
# See the file license.txt for copying permission.
|
||||
|
||||
import pkg_resources
|
||||
import logging
|
||||
import asyncio
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
Plugin = namedtuple('Plugin', ['name', 'ep', 'object'])
|
||||
|
||||
|
||||
class PluginManager:
|
||||
"""
|
||||
Wraps setuptools Entry point mechanism to provide a basic plugin system.
|
||||
Plugins are loaded for a given namespace (group).
|
||||
This plugin manager uses coroutines to run plugin call asynchrounously in an event queue
|
||||
"""
|
||||
def __init__(self, namespace, context, loop=None):
|
||||
if loop is not None:
|
||||
self._loop = loop
|
||||
else:
|
||||
self._loop = asyncio.get_event_loop()
|
||||
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.context = context
|
||||
self._plugins = []
|
||||
self._load_plugins(namespace)
|
||||
|
||||
@property
|
||||
def app_context(self):
|
||||
return self.context
|
||||
|
||||
def _load_plugins(self, namespace):
|
||||
self.logger.info("Loading plugins for namespace %s" % namespace)
|
||||
for ep in pkg_resources.iter_entry_points(group=namespace):
|
||||
plugin = self._load_plugin(ep)
|
||||
self._plugins.append(plugin)
|
||||
self.logger.info(" Plugin %s ready" % plugin.ep.name)
|
||||
|
||||
def _load_plugin(self, ep: pkg_resources.EntryPoint):
|
||||
try:
|
||||
self.logger.debug(" Loading plugin %s" % ep)
|
||||
plugin = ep.load(require=True)
|
||||
self.logger.debug(" Initializing plugin %s" % ep)
|
||||
obj = plugin(self)
|
||||
return Plugin(ep.name, ep, obj)
|
||||
except ImportError as ie:
|
||||
self.logger.warn("Plugin %r import failed: %s" % (ep, ie))
|
||||
except pkg_resources.UnknownExtra as ue:
|
||||
self.logger.warn("Plugin %r dependencies resolution failed: %s" % (ep, ue))
|
||||
|
||||
def get_plugin(self, name):
|
||||
"""
|
||||
Get a plugin by its name from the plugins loaded for the current namespace
|
||||
:param name:
|
||||
:return:
|
||||
"""
|
||||
for p in self._plugins:
|
||||
if p.name == name:
|
||||
return p
|
||||
return None
|
||||
|
||||
@property
|
||||
def plugins(self):
|
||||
"""
|
||||
Get the loaded plugins list
|
||||
:return:
|
||||
"""
|
||||
return self._plugins
|
||||
|
||||
def _schedule_coro(self, coro):
|
||||
return asyncio.Task(coro, loop=self._loop)
|
||||
|
||||
@asyncio.coroutine
|
||||
def fire_event(self, event_name, *args, **kwargs):
|
||||
"""
|
||||
Fire an event to plugins.
|
||||
PluginManager schedule async calls for each plugin on method called "on_" + event_name
|
||||
For example, on_connect will be called on event 'connect'
|
||||
:param event_name:
|
||||
:param args:
|
||||
:param kwargs:
|
||||
:return:
|
||||
"""
|
||||
tasks = []
|
||||
event_method_name = "on_" + event_name
|
||||
for plugin in self._plugins:
|
||||
event_method = getattr(plugin.object, event_method_name, None)
|
||||
if event_method:
|
||||
tasks.append(self._schedule_coro(event_method(*args, **kwargs)))
|
||||
yield from asyncio.wait(tasks, loop=self._loop)
|
||||
|
||||
@asyncio.coroutine
|
||||
def map(self, coro, *args, **kwargs):
|
||||
"""
|
||||
Schedule a given coroutine call for each plugin.
|
||||
The coro called get the Plugin instance as first argument of its method call
|
||||
:param coro:
|
||||
:param args:
|
||||
:param kwargs:
|
||||
:return:
|
||||
"""
|
||||
tasks = []
|
||||
for plugin in self._plugins:
|
||||
coro_instance = coro(plugin, *args, **kwargs)
|
||||
if coro_instance:
|
||||
tasks.append(self._schedule_coro(coro_instance))
|
||||
ret = yield from asyncio.gather(*tasks, loop=self._loop)
|
||||
return ret
|
||||
|
||||
@staticmethod
|
||||
def _get_coro(plugin, coro_name, *args, **kwargs):
|
||||
try:
|
||||
return getattr(plugin.object, coro_name, None)(*args, **kwargs)
|
||||
except TypeError as te:
|
||||
# Plugin doesn't implement coro_name
|
||||
return None
|
||||
|
||||
@asyncio.coroutine
|
||||
def map_plugin_coro(self, coro_name, *args, **kwargs):
|
||||
"""
|
||||
Call a plugin declared by plugin by its name
|
||||
:param coro_name:
|
||||
:param args:
|
||||
:param kwargs:
|
||||
:return:
|
||||
"""
|
||||
return (yield from self.map(self._get_coro, coro_name, *args, **kwargs))
|
8
setup.py
8
setup.py
|
@ -30,5 +30,11 @@ setup(
|
|||
'Programming Language :: Python :: 3.4',
|
||||
'Topic :: Communications',
|
||||
'Topic :: Internet'
|
||||
]
|
||||
],
|
||||
entry_points = {
|
||||
'hbmqtt.test.plugins': [
|
||||
'test_plugin = tests.plugins.test_manager:TestPlugin',
|
||||
'event_plugin = tests.plugins.test_manager:EventTestPlugin'
|
||||
]
|
||||
}
|
||||
)
|
|
@ -0,0 +1 @@
|
|||
__author__ = 'nico'
|
|
@ -0,0 +1,58 @@
|
|||
# Copyright (c) 2015 Nicolas JOUANIN
|
||||
#
|
||||
# See the file license.txt for copying permission.
|
||||
import unittest
|
||||
import logging
|
||||
import asyncio
|
||||
from hbmqtt.plugins.manager import PluginManager
|
||||
|
||||
formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
||||
logging.basicConfig(level=logging.DEBUG, format=formatter)
|
||||
|
||||
|
||||
class TestPlugin:
|
||||
def __init__(self, manager):
|
||||
pass
|
||||
|
||||
|
||||
class EventTestPlugin:
|
||||
def __init__(self, manager: PluginManager):
|
||||
self.test_flag = False
|
||||
self.coro_flag = False
|
||||
|
||||
@asyncio.coroutine
|
||||
def on_test(self):
|
||||
self.test_flag = True
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_coro(self):
|
||||
self.coro_flag = True
|
||||
|
||||
|
||||
class TestPluginManager(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.loop = asyncio.new_event_loop()
|
||||
|
||||
def test_load_plugin(self):
|
||||
manager = PluginManager("hbmqtt.test.plugins", context=None)
|
||||
self.assertTrue(len(manager._plugins) > 0)
|
||||
|
||||
def test_fire_event(self):
|
||||
@asyncio.coroutine
|
||||
def fire_event():
|
||||
yield from manager.fire_event("test")
|
||||
|
||||
manager = PluginManager("hbmqtt.test.plugins", context=None, loop=self.loop)
|
||||
self.loop.run_until_complete(fire_event())
|
||||
plugin = manager.get_plugin("event_plugin")
|
||||
self.assertTrue(plugin.object.test_flag)
|
||||
|
||||
def test_map_coro(self):
|
||||
@asyncio.coroutine
|
||||
def call_coro():
|
||||
yield from manager.map_plugin_coro('test_coro')
|
||||
|
||||
manager = PluginManager("hbmqtt.test.plugins", context=None, loop=self.loop)
|
||||
self.loop.run_until_complete(call_coro())
|
||||
plugin = manager.get_plugin("event_plugin")
|
||||
self.assertTrue(plugin.object.test_coro)
|
Ładowanie…
Reference in New Issue