From 7c289dc46c4a64c936b0c45ff39eb0fdd6ed0670 Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Wed, 17 Jun 2015 22:06:11 +0200 Subject: [PATCH] Add subscribe message --- hbmqtt/mqtt/subscribe.py | 52 ++++++++++++++++++++++++++++++++++++ tests/mqtt/test_subscribe.py | 34 +++++++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 hbmqtt/mqtt/subscribe.py create mode 100644 tests/mqtt/test_subscribe.py diff --git a/hbmqtt/mqtt/subscribe.py b/hbmqtt/mqtt/subscribe.py new file mode 100644 index 0000000..772e5b4 --- /dev/null +++ b/hbmqtt/mqtt/subscribe.py @@ -0,0 +1,52 @@ +# Copyright (c) 2015 Nicolas JOUANIN +# +# See the file license.txt for copying permission. +from hbmqtt.mqtt.packet import MQTTPacket, MQTTFixedHeader, PacketType, PacketIdVariableHeader, MQTTPayload, MQTTVariableHeader +from hbmqtt.errors import HBMQTTException, MQTTException +from hbmqtt.codecs import * + + +class SubscribePayload(MQTTPayload): + def __init__(self, topics=[]): + super().__init__() + self.topics = topics + + def to_bytes(self, fixed_header: MQTTFixedHeader, variable_header: MQTTVariableHeader): + out = b'' + for topic in self.topics: + out += encode_string(topic['filter']) + out += int_to_bytes(topic['qos'], 1) + return out + + @classmethod + @asyncio.coroutine + def from_stream(cls, reader: asyncio.StreamReader, fixed_header: MQTTFixedHeader, + variable_header: MQTTVariableHeader): + topics = [] + while True: + try: + topic = yield from decode_string(reader) + qos_byte = yield from read_or_raise(reader, 1) + qos = bytes_to_int(qos_byte) + topics.append({'filter': topic, 'qos': qos}) + print(topic) + except NoDataException: + break + return cls(topics) + + +class SubscribePacket(MQTTPacket): + VARIABLE_HEADER = PacketIdVariableHeader + PAYLOAD = SubscribePayload + + def __init__(self, fixed: MQTTFixedHeader=None, variable_header: PacketIdVariableHeader=None, payload=None): + if fixed is None: + header = MQTTFixedHeader(PacketType.SUBSCRIBE, 0x00) + else: + if fixed.packet_type is not PacketType.SUBSCRIBE: + raise HBMQTTException("Invalid fixed packet type %s for SubscribePacket init" % fixed.packet_type) + header = fixed + + super().__init__(header) + self.variable_header = variable_header + self.payload = payload diff --git a/tests/mqtt/test_subscribe.py b/tests/mqtt/test_subscribe.py new file mode 100644 index 0000000..aced706 --- /dev/null +++ b/tests/mqtt/test_subscribe.py @@ -0,0 +1,34 @@ +# Copyright (c) 2015 Nicolas JOUANIN +# +# See the file license.txt for copying permission. +import unittest + +from hbmqtt.mqtt.subscribe import SubscribePacket, SubscribePayload +from hbmqtt.mqtt.packet import PacketIdVariableHeader +from hbmqtt.codecs import * + +class SubscribePacketTest(unittest.TestCase): + def setUp(self): + self.loop = asyncio.new_event_loop() + + def test_from_stream(self): + data = b'\x80\x0e\x00\x0a\x00\x03a/b\x01\x00\x03c/d\x02' + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(data) + stream.feed_eof() + message = self.loop.run_until_complete(SubscribePacket.from_stream(stream)) + self.assertEqual(message.payload.topics[0]['filter'], 'a/b') + self.assertEqual(message.payload.topics[0]['qos'], 0x01) + self.assertEqual(message.payload.topics[1]['filter'], 'c/d') + self.assertEqual(message.payload.topics[1]['qos'], 0x02) + + def test_to_stream(self): + variable_header = PacketIdVariableHeader(10) + payload = SubscribePayload( + [ + {'filter': 'a/b', 'qos': 0x01}, + {'filter': 'c/d', 'qos': 0x02} + ]) + publish = SubscribePacket(variable_header=variable_header, payload=payload) + out = publish.to_bytes() + self.assertEqual(out, b'\x80\x0e\x00\x0a\x00\x03a/b\x01\x00\x03c/d\x02') \ No newline at end of file