kopia lustrzana https://github.com/Yakifo/amqtt
				
				
				
			
		
			
				
	
	
		
			700 wiersze
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			700 wiersze
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
| # Copyright (c) 2015 Nicolas JOUANIN
 | |
| #
 | |
| # See the file license.txt for copying permission.
 | |
| import unittest
 | |
| from unittest.mock import patch, call, MagicMock
 | |
| from hbmqtt.broker import *
 | |
| from hbmqtt.mqtt.constants import *
 | |
| from hbmqtt.client import MQTTClient, ConnectException
 | |
| from hbmqtt.mqtt import ConnectPacket, ConnackPacket, PublishPacket, PubrecPacket, \
 | |
|     PubrelPacket, PubcompPacket, DisconnectPacket
 | |
| from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPayload
 | |
| 
 | |
| formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
 | |
| logging.basicConfig(level=logging.DEBUG, format=formatter)
 | |
| log = logging.getLogger(__name__)
 | |
| 
 | |
| test_config = {
 | |
|     'listeners': {
 | |
|         'default': {
 | |
|             'type': 'tcp',
 | |
|             'bind': 'localhost:1883',
 | |
|             'max_connections': 10
 | |
|         },
 | |
|     },
 | |
|     'sys_interval': 0,
 | |
|     'auth': {
 | |
|         'allow-anonymous': True,
 | |
|     }
 | |
| }
 | |
| 
 | |
| 
 | |
| #class AsyncMock(MagicMock):
 | |
| #    def __yield from__(self, *args, **kwargs):
 | |
| #            future = asyncio.Future()
 | |
| #            future.set_result(self)
 | |
| #            result = yield from future
 | |
| #            return result
 | |
| 
 | |
| class BrokerTest(unittest.TestCase):
 | |
|     def setUp(self):
 | |
|         self.loop = asyncio.new_event_loop()
 | |
|         asyncio.set_event_loop(self.loop)
 | |
| 
 | |
|     def tearDown(self):
 | |
|         self.loop.close()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_start_stop(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
|                 self.assertDictEqual(broker._sessions, {})
 | |
|                 self.assertIn('default', broker._servers)
 | |
|                 MockPluginManager.assert_has_calls(
 | |
|                     [call().fire_event(EVENT_BROKER_PRE_START),
 | |
|                      call().fire_event(EVENT_BROKER_POST_START)], any_order=True)
 | |
|                 MockPluginManager.reset_mock()
 | |
|                 yield from broker.shutdown()
 | |
|                 MockPluginManager.assert_has_calls(
 | |
|                     [call().fire_event(EVENT_BROKER_PRE_SHUTDOWN),
 | |
|                      call().fire_event(EVENT_BROKER_POST_SHUTDOWN)], any_order=True)
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_connect(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
|                 client = MQTTClient()
 | |
|                 ret = yield from client.connect('mqtt://localhost/')
 | |
|                 self.assertEqual(ret, 0)
 | |
|                 self.assertIn(client.session.client_id, broker._sessions)
 | |
|                 yield from client.disconnect()
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from broker.shutdown()
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 self.assertDictEqual(broker._sessions, {})
 | |
|                 MockPluginManager.assert_has_calls(
 | |
|                     [call().fire_event(EVENT_BROKER_CLIENT_CONNECTED, client_id=client.session.client_id),
 | |
|                      call().fire_event(EVENT_BROKER_CLIENT_DISCONNECTED, client_id=client.session.client_id)],
 | |
|                     any_order=True)
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_connect_will_flag(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
| 
 | |
|                 conn_reader, conn_writer = \
 | |
|                     yield from asyncio.open_connection('localhost', 1883, loop=self.loop)
 | |
|                 reader = StreamReaderAdapter(conn_reader)
 | |
|                 writer = StreamWriterAdapter(conn_writer)
 | |
| 
 | |
|                 vh = ConnectVariableHeader()
 | |
|                 payload = ConnectPayload()
 | |
| 
 | |
|                 vh.keep_alive = 10
 | |
|                 vh.clean_session_flag = False
 | |
|                 vh.will_retain_flag = False
 | |
|                 vh.will_flag = True
 | |
|                 vh.will_qos = QOS_0
 | |
|                 payload.client_id = 'test_id'
 | |
|                 payload.will_message = b'test'
 | |
|                 payload.will_topic = '/topic'
 | |
|                 connect = ConnectPacket(vh=vh, payload=payload)
 | |
|                 yield from connect.to_stream(writer)
 | |
|                 yield from ConnackPacket.from_stream(reader)
 | |
| 
 | |
|                 yield from asyncio.sleep(0.1)
 | |
| 
 | |
|                 disconnect = DisconnectPacket()
 | |
|                 yield from disconnect.to_stream(writer)
 | |
| 
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from broker.shutdown()
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 self.assertDictEqual(broker._sessions, {})
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_connect_clean_session_false(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
|                 client = MQTTClient(client_id="", config={'auto_reconnect': False})
 | |
|                 return_code = None
 | |
|                 try:
 | |
|                     yield from client.connect('mqtt://localhost/', cleansession=False)
 | |
|                 except ConnectException as ce:
 | |
|                     return_code = ce.return_code
 | |
|                 self.assertEqual(return_code, 0x02)
 | |
|                 self.assertNotIn(client.session.client_id, broker._sessions)
 | |
|                 yield from client.disconnect()
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from broker.shutdown()
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_subscribe(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
|                 client = MQTTClient()
 | |
|                 ret = yield from client.connect('mqtt://localhost/')
 | |
|                 self.assertEqual(ret, 0)
 | |
|                 yield from client.subscribe([('/topic', QOS_0)])
 | |
| 
 | |
|                 # Test if the client test client subscription is registered
 | |
|                 self.assertIn('/topic', broker._subscriptions)
 | |
|                 subs = broker._subscriptions['/topic']
 | |
|                 self.assertEqual(len(subs), 1)
 | |
|                 (s, qos) = subs[0]
 | |
|                 self.assertEqual(s, client.session)
 | |
|                 self.assertEqual(qos, QOS_0)
 | |
| 
 | |
|                 yield from client.disconnect()
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from broker.shutdown()
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 MockPluginManager.assert_has_calls(
 | |
|                     [call().fire_event(EVENT_BROKER_CLIENT_SUBSCRIBED,
 | |
|                                        client_id=client.session.client_id,
 | |
|                                        topic='/topic', qos=QOS_0)], any_order=True)
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_subscribe_twice(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
|                 client = MQTTClient()
 | |
|                 ret = yield from client.connect('mqtt://localhost/')
 | |
|                 self.assertEqual(ret, 0)
 | |
|                 yield from client.subscribe([('/topic', QOS_0)])
 | |
| 
 | |
|                 # Test if the client test client subscription is registered
 | |
|                 self.assertIn('/topic', broker._subscriptions)
 | |
|                 subs = broker._subscriptions['/topic']
 | |
|                 self.assertEqual(len(subs), 1)
 | |
|                 (s, qos) = subs[0]
 | |
|                 self.assertEqual(s, client.session)
 | |
|                 self.assertEqual(qos, QOS_0)
 | |
| 
 | |
|                 yield from client.subscribe([('/topic', QOS_0)])
 | |
|                 self.assertEqual(len(subs), 1)
 | |
|                 (s, qos) = subs[0]
 | |
|                 self.assertEqual(s, client.session)
 | |
|                 self.assertEqual(qos, QOS_0)
 | |
| 
 | |
|                 yield from client.disconnect()
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from broker.shutdown()
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 MockPluginManager.assert_has_calls(
 | |
|                     [call().fire_event(EVENT_BROKER_CLIENT_SUBSCRIBED,
 | |
|                                        client_id=client.session.client_id,
 | |
|                                        topic='/topic', qos=QOS_0)], any_order=True)
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_unsubscribe(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
|                 client = MQTTClient()
 | |
|                 ret = yield from client.connect('mqtt://localhost/')
 | |
|                 self.assertEqual(ret, 0)
 | |
|                 yield from client.subscribe([('/topic', QOS_0)])
 | |
| 
 | |
|                 # Test if the client test client subscription is registered
 | |
|                 self.assertIn('/topic', broker._subscriptions)
 | |
|                 subs = broker._subscriptions['/topic']
 | |
|                 self.assertEqual(len(subs), 1)
 | |
|                 (s, qos) = subs[0]
 | |
|                 self.assertEqual(s, client.session)
 | |
|                 self.assertEqual(qos, QOS_0)
 | |
| 
 | |
|                 yield from client.unsubscribe(['/topic'])
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 self.assertEqual(broker._subscriptions['/topic'], [])
 | |
|                 yield from client.disconnect()
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from broker.shutdown()
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 MockPluginManager.assert_has_calls(
 | |
|                     [call().fire_event(EVENT_BROKER_CLIENT_SUBSCRIBED,
 | |
|                                        client_id=client.session.client_id,
 | |
|                                        topic='/topic', qos=QOS_0),
 | |
|                      call().fire_event(EVENT_BROKER_CLIENT_UNSUBSCRIBED,
 | |
|                                        client_id=client.session.client_id,
 | |
|                                        topic='/topic')
 | |
|                     ], any_order=True)
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_publish(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
|                 pub_client = MQTTClient()
 | |
|                 ret = yield from pub_client.connect('mqtt://localhost/')
 | |
|                 self.assertEqual(ret, 0)
 | |
| 
 | |
|                 ret_message = yield from pub_client.publish('/topic', b'data', QOS_0)
 | |
|                 yield from pub_client.disconnect()
 | |
|                 self.assertEqual(broker._retained_messages, {})
 | |
| 
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from broker.shutdown()
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 MockPluginManager.assert_has_calls(
 | |
|                     [call().fire_event(EVENT_BROKER_MESSAGE_RECEIVED,
 | |
|                                        client_id=pub_client.session.client_id,
 | |
|                                        message=ret_message),
 | |
|                     ], any_order=True)
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     #@patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_publish_dup(self):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
| 
 | |
|                 conn_reader, conn_writer = \
 | |
|                     yield from asyncio.open_connection('localhost', 1883, loop=self.loop)
 | |
|                 reader = StreamReaderAdapter(conn_reader)
 | |
|                 writer = StreamWriterAdapter(conn_writer)
 | |
| 
 | |
|                 vh = ConnectVariableHeader()
 | |
|                 payload = ConnectPayload()
 | |
| 
 | |
|                 vh.keep_alive = 10
 | |
|                 vh.clean_session_flag = False
 | |
|                 vh.will_retain_flag = False
 | |
|                 payload.client_id = 'test_id'
 | |
|                 connect = ConnectPacket(vh=vh, payload=payload)
 | |
|                 yield from connect.to_stream(writer)
 | |
|                 yield from ConnackPacket.from_stream(reader)
 | |
| 
 | |
|                 publish_1 = PublishPacket.build('/test', b'data', 1, False, QOS_2, False)
 | |
|                 yield from publish_1.to_stream(writer)
 | |
|                 ensure_future(PubrecPacket.from_stream(reader), loop=self.loop)
 | |
| 
 | |
|                 yield from asyncio.sleep(2)
 | |
| 
 | |
|                 publish_dup = PublishPacket.build('/test', b'data', 1, True, QOS_2, False)
 | |
|                 yield from publish_dup.to_stream(writer)
 | |
|                 pubrec2 = yield from PubrecPacket.from_stream(reader)
 | |
|                 pubrel = PubrelPacket.build(1)
 | |
|                 yield from pubrel.to_stream(writer)
 | |
|                 pubcomp = yield from PubcompPacket.from_stream(reader)
 | |
| 
 | |
|                 disconnect = DisconnectPacket()
 | |
|                 yield from disconnect.to_stream(writer)
 | |
| 
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from broker.shutdown()
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_publish_invalid_topic(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
|                 pub_client = MQTTClient()
 | |
|                 ret = yield from pub_client.connect('mqtt://localhost/')
 | |
|                 self.assertEqual(ret, 0)
 | |
| 
 | |
|                 ret_message = yield from pub_client.publish('/+', b'data', QOS_0)
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from pub_client.disconnect()
 | |
| 
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from broker.shutdown()
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_publish_big(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
|                 pub_client = MQTTClient()
 | |
|                 ret = yield from pub_client.connect('mqtt://localhost/')
 | |
|                 self.assertEqual(ret, 0)
 | |
| 
 | |
|                 ret_message = yield from pub_client.publish('/topic', bytearray(b'\x99' * 256 * 1024), QOS_2)
 | |
|                 yield from pub_client.disconnect()
 | |
|                 self.assertEqual(broker._retained_messages, {})
 | |
| 
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from broker.shutdown()
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 MockPluginManager.assert_has_calls(
 | |
|                     [call().fire_event(EVENT_BROKER_MESSAGE_RECEIVED,
 | |
|                                        client_id=pub_client.session.client_id,
 | |
|                                        message=ret_message),
 | |
|                     ], any_order=True)
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_publish_retain(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
| 
 | |
|                 pub_client = MQTTClient()
 | |
|                 ret = yield from pub_client.connect('mqtt://localhost/')
 | |
|                 self.assertEqual(ret, 0)
 | |
|                 ret_message = yield from pub_client.publish('/topic', b'data', QOS_0, retain=True)
 | |
|                 yield from pub_client.disconnect()
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 self.assertIn('/topic', broker._retained_messages)
 | |
|                 retained_message = broker._retained_messages['/topic']
 | |
|                 self.assertEqual(retained_message.source_session, pub_client.session)
 | |
|                 self.assertEqual(retained_message.topic, '/topic')
 | |
|                 self.assertEqual(retained_message.data, b'data')
 | |
|                 self.assertEqual(retained_message.qos, QOS_0)
 | |
|                 yield from broker.shutdown()
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_publish_retain_delete(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
| 
 | |
|                 pub_client = MQTTClient()
 | |
|                 ret = yield from pub_client.connect('mqtt://localhost/')
 | |
|                 self.assertEqual(ret, 0)
 | |
|                 ret_message = yield from pub_client.publish('/topic', b'', QOS_0, retain=True)
 | |
|                 yield from pub_client.disconnect()
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 self.assertNotIn('/topic', broker._retained_messages)
 | |
|                 yield from broker.shutdown()
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_subscribe_publish(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
|                 sub_client = MQTTClient()
 | |
|                 yield from sub_client.connect('mqtt://localhost')
 | |
|                 ret = yield from sub_client.subscribe([('/qos0', QOS_0), ('/qos1', QOS_1), ('/qos2', QOS_2)])
 | |
|                 self.assertEqual(ret, [QOS_0, QOS_1, QOS_2])
 | |
| 
 | |
|                 yield from self._client_publish('/qos0', b'data', QOS_0)
 | |
|                 yield from self._client_publish('/qos1', b'data', QOS_1)
 | |
|                 yield from self._client_publish('/qos2', b'data', QOS_2)
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 for qos in [QOS_0, QOS_1, QOS_2]:
 | |
|                     message = yield from sub_client.deliver_message()
 | |
|                     self.assertIsNotNone(message)
 | |
|                     self.assertEqual(message.topic, '/qos%s' % qos)
 | |
|                     self.assertEqual(message.data, b'data')
 | |
|                     self.assertEqual(message.qos, qos)
 | |
|                 yield from sub_client.disconnect()
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from broker.shutdown()
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_subscribe_invalid(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
|                 sub_client = MQTTClient()
 | |
|                 yield from sub_client.connect('mqtt://localhost')
 | |
|                 ret = yield from sub_client.subscribe(
 | |
|                     [('+', QOS_0), ('+/tennis/#', QOS_0), ('sport+', QOS_0), ('sport/+/player1', QOS_0)])
 | |
|                 self.assertEqual(ret, [QOS_0, QOS_0, 0x80, QOS_0])
 | |
| 
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from sub_client.disconnect()
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from broker.shutdown()
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_subscribe_publish_dollar_topic_1(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
|                 sub_client = MQTTClient()
 | |
|                 yield from sub_client.connect('mqtt://localhost')
 | |
|                 ret = yield from sub_client.subscribe([('#', QOS_0)])
 | |
|                 self.assertEqual(ret, [QOS_0])
 | |
| 
 | |
|                 yield from self._client_publish('/topic', b'data', QOS_0)
 | |
|                 message = yield from sub_client.deliver_message()
 | |
|                 self.assertIsNotNone(message)
 | |
| 
 | |
|                 yield from self._client_publish('$topic', b'data', QOS_0)
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 message = None
 | |
|                 try:
 | |
|                     message = yield from sub_client.deliver_message(timeout=2)
 | |
|                 except Exception as e:
 | |
|                     pass
 | |
|                 self.assertIsNone(message)
 | |
|                 yield from sub_client.disconnect()
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from broker.shutdown()
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_subscribe_publish_dollar_topic_2(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
|                 sub_client = MQTTClient()
 | |
|                 yield from sub_client.connect('mqtt://localhost')
 | |
|                 ret = yield from sub_client.subscribe([('+/monitor/Clients', QOS_0)])
 | |
|                 self.assertEqual(ret, [QOS_0])
 | |
| 
 | |
|                 yield from self._client_publish('/test/monitor/Clients', b'data', QOS_0)
 | |
|                 message = yield from sub_client.deliver_message()
 | |
|                 self.assertIsNotNone(message)
 | |
| 
 | |
|                 yield from self._client_publish('$SYS/monitor/Clients', b'data', QOS_0)
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 message = None
 | |
|                 try:
 | |
|                     message = yield from sub_client.deliver_message(timeout=2)
 | |
|                 except Exception as e:
 | |
|                     pass
 | |
|                 self.assertIsNone(message)
 | |
|                 yield from sub_client.disconnect()
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from broker.shutdown()
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @patch('hbmqtt.broker.PluginManager')
 | |
|     def test_client_publish_retain_subscribe(self, MockPluginManager):
 | |
|         @asyncio.coroutine
 | |
|         def test_coro():
 | |
|             try:
 | |
|                 broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
 | |
|                 yield from broker.start()
 | |
|                 self.assertTrue(broker.transitions.is_started())
 | |
|                 sub_client = MQTTClient()
 | |
|                 yield from sub_client.connect('mqtt://localhost', cleansession=False)
 | |
|                 ret = yield from sub_client.subscribe([('/qos0', QOS_0), ('/qos1', QOS_1), ('/qos2', QOS_2)])
 | |
|                 self.assertEqual(ret, [QOS_0, QOS_1, QOS_2])
 | |
|                 yield from sub_client.disconnect()
 | |
|                 yield from asyncio.sleep(0.1)
 | |
| 
 | |
|                 yield from self._client_publish('/qos0', b'data', QOS_0, retain=True)
 | |
|                 yield from self._client_publish('/qos1', b'data', QOS_1, retain=True)
 | |
|                 yield from self._client_publish('/qos2', b'data', QOS_2, retain=True)
 | |
|                 yield from sub_client.reconnect()
 | |
|                 for qos in [QOS_0, QOS_1, QOS_2]:
 | |
|                     log.debug("TEST QOS: %d" % qos)
 | |
|                     message = yield from sub_client.deliver_message()
 | |
|                     log.debug("Message: " + repr(message.publish_packet))
 | |
|                     self.assertIsNotNone(message)
 | |
|                     self.assertEqual(message.topic, '/qos%s' % qos)
 | |
|                     self.assertEqual(message.data, b'data')
 | |
|                     self.assertEqual(message.qos, qos)
 | |
|                 yield from sub_client.disconnect()
 | |
|                 yield from asyncio.sleep(0.1)
 | |
|                 yield from broker.shutdown()
 | |
|                 self.assertTrue(broker.transitions.is_stopped())
 | |
|                 future.set_result(True)
 | |
|             except Exception as ae:
 | |
|                 future.set_exception(ae)
 | |
| 
 | |
|         future = asyncio.Future(loop=self.loop)
 | |
|         self.loop.run_until_complete(test_coro())
 | |
|         if future.exception():
 | |
|             raise future.exception()
 | |
| 
 | |
|     @asyncio.coroutine
 | |
|     def _client_publish(self, topic, data, qos, retain=False):
 | |
|         pub_client = MQTTClient()
 | |
|         ret = yield from pub_client.connect('mqtt://localhost/')
 | |
|         self.assertEqual(ret, 0)
 | |
|         ret = yield from pub_client.publish(topic, data, qos, retain)
 | |
|         yield from pub_client.disconnect()
 | |
|         return ret
 |