# Copyright (c) 2015 Nicolas JOUANIN # # See the file license.txt for copying permission. import unittest import asyncio from amqtt.mqtt.packet import CONNECT, MQTTFixedHeader from amqtt.errors import MQTTException from amqtt.adapters import BufferReader class TestMQTTFixedHeaderTest(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() def test_from_bytes(self): data = b"\x10\x7f" stream = BufferReader(data) header = self.loop.run_until_complete(MQTTFixedHeader.from_stream(stream)) self.assertEqual(header.packet_type, CONNECT) self.assertFalse(header.flags & 0x08) self.assertEqual((header.flags & 0x06) >> 1, 0) self.assertFalse(header.flags & 0x01) self.assertEqual(header.remaining_length, 127) def test_from_bytes_with_length(self): data = b"\x10\xff\xff\xff\x7f" stream = BufferReader(data) header = self.loop.run_until_complete(MQTTFixedHeader.from_stream(stream)) self.assertEqual(header.packet_type, CONNECT) self.assertFalse(header.flags & 0x08) self.assertEqual((header.flags & 0x06) >> 1, 0) self.assertFalse(header.flags & 0x01) self.assertEqual(header.remaining_length, 268435455) def test_from_bytes_ko_with_length(self): data = b"\x10\xff\xff\xff\xff\x7f" stream = BufferReader(data) with self.assertRaises(MQTTException): self.loop.run_until_complete(MQTTFixedHeader.from_stream(stream)) def test_to_bytes(self): header = MQTTFixedHeader(CONNECT, 0x00, 0) data = header.to_bytes() self.assertEqual(data, b"\x10\x00") def test_to_bytes_2(self): header = MQTTFixedHeader(CONNECT, 0x00, 268435455) data = header.to_bytes() self.assertEqual(data, b"\x10\xff\xff\xff\x7f")