kopia lustrzana https://github.com/Yakifo/amqtt
provide plugin manager context so that it identifies auth and topic check plugins correctly
rodzic
ee7250c720
commit
03bf1c34d9
|
@ -84,8 +84,11 @@ class TestPluginManager(unittest.TestCase):
|
|||
assert plugin.test_close_flag
|
||||
|
||||
def test_plugin_auth_coro(self) -> None:
|
||||
# provide context that activates auth plugins
|
||||
context = BaseContext()
|
||||
context.config = {'auth':{}}
|
||||
|
||||
manager = PluginManager("amqtt.test.plugins", context=None)
|
||||
manager = PluginManager("amqtt.test.plugins", context=context)
|
||||
self.loop.run_until_complete(manager.map_plugin_auth(session=Session()))
|
||||
self.loop.run_until_complete(asyncio.sleep(0.5))
|
||||
plugin = manager.get_plugin("EventTestPlugin")
|
||||
|
@ -93,8 +96,11 @@ class TestPluginManager(unittest.TestCase):
|
|||
assert plugin.test_auth_flag
|
||||
|
||||
def test_plugin_topic_coro(self) -> None:
|
||||
# provide context that activates topic check plugins
|
||||
context = BaseContext()
|
||||
context.config = {'topic-check':{}}
|
||||
|
||||
manager = PluginManager("amqtt.test.plugins", context=None)
|
||||
manager = PluginManager("amqtt.test.plugins", context=context)
|
||||
self.loop.run_until_complete(manager.map_plugin_topic(session=Session(), topic="test", action=Action.PUBLISH))
|
||||
self.loop.run_until_complete(asyncio.sleep(0.5))
|
||||
plugin = manager.get_plugin("EventTestPlugin")
|
||||
|
|
Ładowanie…
Reference in New Issue