From 4c079f9a66125efac83090ac8ee53fd128ba7f92 Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Sat, 30 May 2015 14:15:17 +0200 Subject: [PATCH] Make codes methods statics --- hbmqtt/codecs/header.py | 75 +++++++++++++++++++------------------ tests/codecs/test_header.py | 74 +++++++++++------------------------- 2 files changed, 59 insertions(+), 90 deletions(-) diff --git a/hbmqtt/codecs/header.py b/hbmqtt/codecs/header.py index d79f477..40f84ee 100644 --- a/hbmqtt/codecs/header.py +++ b/hbmqtt/codecs/header.py @@ -16,54 +16,55 @@ class FixedHeaderException(CodecException): pass class FixedHeaderCodec: - def __init__(self): - pass - + @staticmethod @asyncio.coroutine - def decode(self, reader) -> FixedHeader: + def decode(reader) -> FixedHeader: """ Decode MQTT message fixed header from stream :param reader: Stream to read :return: FixedHeader instance """ + def get_message_type(byte): + byte_type = (bytes_to_int(byte) & 0xf0) >> 4 + return MessageType(byte_type) + + def get_flags(data): + byte = bytes_to_int(data) + return byte & 0x0f + + @asyncio.coroutine + def decode_remaining_length(): + """ + Decode message length according to MQTT specifications + :return: + """ + multiplier = 1 + value = 0 + length_bytes = b'' + while True: + encoded_byte = yield from read_or_raise(reader, 1) + length_bytes += encoded_byte + int_byte = bytes_to_int(encoded_byte) + value += (int_byte & 0x7f) * multiplier + if (int_byte & 0x80) == 0: + break + else: + multiplier *= 128 + if multiplier > 128*128*128: + raise FixedHeaderException("Invalid remaining length bytes:%s" % bytes_to_hex_str(length_bytes)) + return value + b1 = yield from read_or_raise(reader, 1) - msg_type = FixedHeaderCodec.get_message_type(b1) + msg_type = get_message_type(b1) if msg_type is MessageType.RESERVED_0 or msg_type is MessageType.RESERVED_15: raise FixedHeaderException("Usage of control packet type %s is forbidden" % msg_type) - flags = FixedHeaderCodec.get_flags(b1) + flags = get_flags(b1) - remain_length = yield from self.decode_remaining_length(reader) + remain_length = yield from decode_remaining_length() return FixedHeader(msg_type, flags, remain_length) @staticmethod - def get_message_type(byte): - byte_type = (bytes_to_int(byte) & 0xf0) >> 4 - return MessageType(byte_type) - - @staticmethod - def get_flags(data): - byte = bytes_to_int(data) - return byte & 0x0f - @asyncio.coroutine - def decode_remaining_length(self, reader): - """ - Decode message length according to MQTT specifications - :param reader: - :return: - """ - multiplier = 1 - value = 0 - length_bytes = b'' - while True: - encoded_byte = yield from read_or_raise(reader, 1) - length_bytes += encoded_byte - int_byte = bytes_to_int(encoded_byte) - value += (int_byte & 0x7f) * multiplier - if (int_byte & 0x80) == 0: - break - else: - multiplier *= 128 - if multiplier > 128*128*128: - raise FixedHeaderException("Invalid remaining length bytes:%s" % bytes_to_hex_str(length_bytes)) - return value + def encode(header: FixedHeader, writer): + # To be done + pass diff --git a/tests/codecs/test_header.py b/tests/codecs/test_header.py index 3223311..74ef062 100644 --- a/tests/codecs/test_header.py +++ b/tests/codecs/test_header.py @@ -10,66 +10,34 @@ class TestFixedHeader(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() - def test_get_message_type(self): - m_type = FixedHeaderCodec.get_message_type(b'\x10') - self.assertEqual(m_type, MessageType.CONNECT) - - def test_get_flags(self): - flags = FixedHeaderCodec.get_flags(b'\x1f') - self.assertTrue(flags & 0x08) - self.assertTrue(flags & 0x04) - self.assertTrue(flags & 0x02) - self.assertTrue(flags & 0x01) - self.assertFalse(flags & 0x10) - - def test_decode_remaining_length1(self): - stream = asyncio.StreamReader(loop=self.loop) - stream.feed_data(b'\x7f') - s = FixedHeaderCodec() - length = self.loop.run_until_complete(s.decode_remaining_length(stream)) - self.assertEqual(length, 127) - - def test_decode_remaining_length2(self): - stream = asyncio.StreamReader(loop=self.loop) - stream.feed_data(b'\xff\x7f') - s = FixedHeaderCodec() - length = self.loop.run_until_complete(s.decode_remaining_length(stream)) - self.assertEqual(length, 16383) - - def test_decode_remaining_length3(self): - stream = asyncio.StreamReader(loop=self.loop) - stream.feed_data(b'\xff\xff\x7f') - s = FixedHeaderCodec() - length = self.loop.run_until_complete(s.decode_remaining_length(stream)) - self.assertEqual(length, 2097151) - - def test_decode_remaining_length4(self): - stream = asyncio.StreamReader(loop=self.loop) - stream.feed_data(b'\xff\xff\xff\x7f') - s = FixedHeaderCodec() - length = self.loop.run_until_complete(s.decode_remaining_length(stream)) - self.assertEqual(length, 268435455) - - def test_decode_remaining_length5(self): - stream = asyncio.StreamReader(loop=self.loop) - stream.feed_data(b'\xff\xff\xff\xff\x7f') - s = FixedHeaderCodec() - with self.assertRaises(FixedHeaderException): - self.loop.run_until_complete(s.decode_remaining_length(stream)) - def test_decode_ok(self): stream = asyncio.StreamReader(loop=self.loop) stream.feed_data(b'\x10\x7f') - s = FixedHeaderCodec() - header = self.loop.run_until_complete(s.decode(stream)) + header = self.loop.run_until_complete(FixedHeaderCodec.decode(stream)) self.assertEqual(header.message_type, MessageType.CONNECT) self.assertFalse(header.flags & 0x08) - self.assertEqual((header.flags & 0x06) >> 1 , 0) + self.assertEqual((header.flags & 0x06) >> 1, 0) self.assertFalse(header.flags & 0x01) + self.assertEqual(header.remaining_length, 127) - def test_decode_ko(self): + def test_decode_ok_with_length(self): + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'\x10\xff\xff\xff\x7f') + header = self.loop.run_until_complete(FixedHeaderCodec.decode(stream)) + self.assertEqual(header.message_type, MessageType.CONNECT) + self.assertFalse(header.flags & 0x08) + self.assertEqual((header.flags & 0x06) >> 1, 0) + self.assertFalse(header.flags & 0x01) + self.assertEqual(header.remaining_length, 268435455) + + def test_decode_reserved(self): stream = asyncio.StreamReader(loop=self.loop) stream.feed_data(b'\x0f\x7f') - s = FixedHeaderCodec() with self.assertRaises(FixedHeaderException): - self.loop.run_until_complete(s.decode(stream)) + self.loop.run_until_complete(FixedHeaderCodec.decode(stream)) + + def test_decode_ko_with_length(self): + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'\x10\xff\xff\xff\xff\x7f') + with self.assertRaises(FixedHeaderException): + self.loop.run_until_complete(FixedHeaderCodec.decode(stream))