Add publish retain test

pull/8/head
Nico 2015-10-10 15:01:17 +02:00
rodzic a456d9cf21
commit 6b7b962b16
3 zmienionych plików z 88 dodań i 19 usunięć

Wyświetl plik

@ -57,6 +57,7 @@ EVENT_BROKER_CLIENT_CONNECTED = 'broker_client_connected'
EVENT_BROKER_CLIENT_DISCONNECTED = 'broker_client_disconnected'
EVENT_BROKER_CLIENT_SUBSCRIBED = 'broker_client_subscribed'
EVENT_BROKER_CLIENT_UNSUBSCRIBED = 'broker_client_unsubscribed'
EVENT_BROKER_MESSAGE_RECEIVED = 'broker_message_received'
class BrokerException(BaseException):
@ -176,7 +177,7 @@ class Broker:
self._init_states()
self._sessions = dict()
self._subscriptions = dict()
self._global_retained_messages = dict()
self._retained_messages = dict()
# Broker statistics initialization
self._stats = dict()
@ -220,7 +221,7 @@ class Broker:
try:
self._sessions = dict()
self._subscriptions = dict()
self._global_retained_messages = dict()
self._retained_messages = dict()
self.transitions.start()
self.logger.debug("Broker starting")
except MachineError as me:
@ -295,7 +296,7 @@ class Broker:
try:
self._sessions = dict()
self._subscriptions = dict()
self._global_retained_messages = dict()
self._retained_messages = dict()
self.transitions.shutdown()
except MachineError as me:
self.logger.debug("Invalid method call at this moment: %s" % me)
@ -358,7 +359,7 @@ class Broker:
inflight_in += session.inflight_in_count
inflight_out += session.inflight_out_count
messages_stored += session.retained_messages_count
messages_stored += len(self._global_retained_messages)
messages_stored += len(self._retained_messages)
subscriptions_count = 0
for topic in self._subscriptions:
subscriptions_count += len(self._subscriptions[topic])
@ -382,7 +383,7 @@ class Broker:
self._broadcast_sys_topic('messages/inflight/stored', int_to_bytes_str(messages_stored)),
self._broadcast_sys_topic('messages/publish/received', int_to_bytes_str(self._stats[STAT_PUBLISH_RECEIVED])),
self._broadcast_sys_topic('messages/publish/sent', int_to_bytes_str(self._stats[STAT_PUBLISH_SENT])),
self._broadcast_sys_topic('messages/retained/count', int_to_bytes_str(len(self._global_retained_messages))),
self._broadcast_sys_topic('messages/retained/count', int_to_bytes_str(len(self._retained_messages))),
self._broadcast_sys_topic('messages/subscriptions/count', int_to_bytes_str(subscriptions_count)),
]
@ -531,15 +532,13 @@ class Broker:
self.logger.debug(repr(self._subscriptions))
if wait_deliver in done:
self.logger.debug("%s handling message delivery" % client_session.client_id)
publish_packet = wait_deliver.result()
packet_id = publish_packet.variable_header.packet_id
topic_name = publish_packet.variable_header.topic_name
data = publish_packet.payload.data
yield from self.broadcast_application_message(client_session, topic_name, data)
if publish_packet.retain_flag:
self.retain_message(client_session, topic_name, data)
# Acknowledge message delivery
yield from handler.mqtt_acknowledge_delivery(packet_id)
app_message = wait_deliver.result()
yield from self.plugins_manager.fire_event(EVENT_BROKER_MESSAGE_RECEIVED,
client_id=client_session.client_id,
message=app_message)
yield from self.broadcast_application_message(client_session, app_message.topic, app_message.data)
if app_message.publish_packet.retain_flag:
self.retain_message(client_session, app_message.topic, app_message.data, app_message.qos)
wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message(), loop=self._loop)
disconnect_waiter.cancel()
subscribe_waiter.cancel()
@ -615,11 +614,11 @@ class Broker:
# If retained flag set, store the message for further subscriptions
self.logger.debug("Retaining message on topic %s" % topic_name)
retained_message = RetainedApplicationMessage(source_session, topic_name, data, qos)
self._global_retained_messages[topic_name] = retained_message
self._retained_messages[topic_name] = retained_message
else:
# [MQTT-3.3.1-10]
self.logger.debug("Clear retained messages for topic '%s'" % topic_name)
del self._global_retained_messages[topic_name]
del self._retained_messages[topic_name]
def add_subscription(self, subscription, session):
import re
@ -688,7 +687,7 @@ class Broker:
self.logger.debug("broadcasting application message from %s on topic '%s' to %s" %
(format_client_message(session=source_session),
topic, format_client_message(session=target_session)))
handler = _get_handler(target_session)
handler = self._get_handler(target_session)
publish_tasks.append(
asyncio.Task(handler.mqtt_publish(topic, data, qos, retain=False), loop=self._loop)
)
@ -728,11 +727,11 @@ class Broker:
self.logger.debug("Begin broadcasting messages retained due to subscription on '%s' from %s" %
(subscription[0], format_client_message(session=session)))
publish_tasks = []
for d_topic in self._global_retained_messages:
for d_topic in self._retained_messages:
self.logger.debug("matching : %s %s" % (d_topic, subscription[0]))
if self.matches(d_topic, subscription[0]):
self.logger.debug("%s and %s match" % (d_topic, subscription[0]))
retained = self._global_retained_messages[d_topic]
retained = self._retained_messages[d_topic]
publish_tasks.append(asyncio.Task(
session.handler.mqtt_publish(
retained.topic, retained.data, subscription[1], True), loop=self._loop))

Wyświetl plik

@ -27,6 +27,8 @@ class ApplicationMessage:
def build_publish_packet(self, dup=False):
return PublishPacket.build(self.topic, self.data, self.packet_id, dup, self.qos, self.retain)
def __eq__(self, other):
return self.packet_id == other.packet_id
class IncomingApplicationMessage(ApplicationMessage):
pass

Wyświetl plik

@ -214,3 +214,71 @@ class BrokerTest(unittest.TestCase):
self.loop.run_until_complete(test_coro())
if future.exception():
raise future.exception()
@patch('hbmqtt.broker.PluginManager')
def test_client_publish(self, MockPluginManager):
def test_coro():
try:
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
yield from broker.start()
self.assertTrue(broker.transitions.is_started())
pub_client = MQTTClient()
ret = yield from pub_client.connect('mqtt://localhost/')
self.assertEqual(ret, 0)
ret_message = yield from pub_client.publish('/topic', b'data', QOS_0)
yield from pub_client.disconnect()
self.assertEquals(broker._retained_messages, {})
yield from asyncio.sleep(0.1)
yield from broker.shutdown()
self.assertTrue(broker.transitions.is_stopped())
MockPluginManager.assert_has_calls(
[call().fire_event(EVENT_BROKER_MESSAGE_RECEIVED,
client_id=pub_client.session.client_id,
message=ret_message),
], any_order=True)
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
future = asyncio.Future(loop=self.loop)
self.loop.run_until_complete(test_coro())
if future.exception():
raise future.exception()
@patch('hbmqtt.broker.PluginManager')
def test_client_publish_retain(self, MockPluginManager):
def test_coro():
try:
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
yield from broker.start()
self.assertTrue(broker.transitions.is_started())
pub_client = MQTTClient()
ret = yield from pub_client.connect('mqtt://localhost/')
self.assertEqual(ret, 0)
ret_message = yield from pub_client.publish('/topic', b'data', QOS_0, retain=True)
yield from pub_client.disconnect()
yield from asyncio.sleep(0.1)
self.assertIn('/topic', broker._retained_messages)
retained_message = broker._retained_messages['/topic']
self.assertEquals(retained_message.source_session, pub_client.session)
self.assertEquals(retained_message.topic, '/topic')
self.assertEquals(retained_message.data, b'data')
self.assertEquals(retained_message.qos, QOS_0)
yield from broker.shutdown()
self.assertTrue(broker.transitions.is_stopped())
MockPluginManager.assert_has_calls(
[call().fire_event(EVENT_BROKER_MESSAGE_RECEIVED,
client_id=pub_client.session.client_id,
message=ret_message),
], any_order=True)
future.set_result(True)
except Exception as ae:
future.set_exception(ae)
future = asyncio.Future(loop=self.loop)
self.loop.run_until_complete(test_coro())
if future.exception():
raise future.exception()