Test connect packet

pull/8/head
Nicolas Jouanin 2015-06-13 15:05:42 +02:00
rodzic 6f9023ffe8
commit 5fa686de6a
2 zmienionych plików z 45 dodań i 50 usunięć

Wyświetl plik

@ -162,13 +162,13 @@ class ConnectPayload(MQTTPayload):
try:
payload.username = yield from decode_string(reader)
except NoDataException:
raise CodecException('username flag set, but username not present in payload')
raise MQTTException('username flag set, but username not present in payload')
if variable_header.password_flag:
try:
payload.password = yield from decode_string(reader)
except NoDataException:
raise CodecException('password flag set, but password not present in payload')
raise MQTTException('password flag set, but password not present in payload')
return payload

Wyświetl plik

@ -4,89 +4,84 @@
import unittest
import asyncio
from hbmqtt.codecs.connect import ConnectCodec, ConnectException
from hbmqtt.messages.packet import PacketType, MQTTHeader, ConnectMessage
from hbmqtt.mqtt.connect import ConnectPacket, ConnectVariableHeader, ConnectPayload
from hbmqtt.mqtt.packet import MQTTFixedHeader, PacketType
from hbmqtt.errors import MQTTException
class TestConnectCodec(unittest.TestCase):
class ConnectPacketTest(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
def test_decode_ok(self):
data = b'\x00\x04MQTT\x04\xce\x00\x00\x00\x0a0123456789\x00\x09WillTopic\x00\x0bWillMessage\x00\x04user\x00\x08password'
header = MQTTHeader(PacketType.CONNECT, 0x00, len(data))
data = b'\x10\x3e\x00\x04MQTT\x04\xce\x00\x00\x00\x0a0123456789\x00\x09WillTopic\x00\x0bWillMessage\x00\x04user\x00\x08password'
stream = asyncio.StreamReader(loop=self.loop)
stream.feed_data(data)
message = self.loop.run_until_complete(ConnectCodec.decode(header, stream))
self.assertEqual(message.proto_name, "MQTT")
self.assertEqual(message.proto_level, 4)
self.assertTrue(message.is_user_name_flag())
self.assertTrue(message.is_password_flag())
self.assertFalse(message.is_will_retain())
self.assertEqual(message.will_qos(), 1)
self.assertTrue(message.is_will_flag())
self.assertTrue(message.is_clean_session())
self.assertFalse(message.is_reserved_flag())
message = self.loop.run_until_complete(ConnectPacket.from_stream(stream))
self.assertEqual(message.variable_header.proto_name, "MQTT")
self.assertEqual(message.variable_header.proto_level, 4)
self.assertTrue(message.variable_header.username_flag)
self.assertTrue(message.variable_header.password_flag)
self.assertFalse(message.variable_header.will_retain_flag)
self.assertEqual(message.variable_header.will_qos, 1)
self.assertTrue(message.variable_header.will_flag)
self.assertTrue(message.variable_header.clean_session_flag)
self.assertFalse(message.variable_header.reserved_flag)
self.assertEqual(message.payload.client_id, '0123456789')
self.assertEqual(message.payload.will_topic, 'WillTopic')
self.assertEqual(message.payload.will_message, 'WillMessage')
self.assertEqual(message.payload.username, 'user')
self.assertEqual(message.payload.password, 'password')
def test_decode_fail_protocol_name(self):
data = b'\x00\x04TTQM\x04\xce\x00\x00\x00\x0a0123456789\x00\x09WillTopic\x00\x0bWillMessage\x00\x04user\x00\x08password'
header = MQTTHeader(PacketType.CONNECT, 0x00, len(data))
data = b'\x10\x3e\x00\x04TTQM\x04\xce\x00\x00\x00\x0a0123456789\x00\x09WillTopic\x00\x0bWillMessage\x00\x04user\x00\x08password'
stream = asyncio.StreamReader(loop=self.loop)
stream.feed_data(data)
with self.assertRaises(ConnectException):
self.loop.run_until_complete(ConnectCodec.decode(header, stream))
with self.assertRaises(MQTTException):
self.loop.run_until_complete(ConnectPacket.from_stream(stream))
def test_decode_fail_reserved_flag(self):
data = b'\x00\x04MQTT\x04\xcf\x00\x00\x00\x0a0123456789\x00\x09WillTopic\x00\x0bWillMessage\x00\x04user\x00\x08password'
header = MQTTHeader(PacketType.CONNECT, 0x00, len(data))
data = b'\x10\x3e\x00\x04MQTT\x04\xcf\x00\x00\x00\x0a0123456789\x00\x09WillTopic\x00\x0bWillMessage\x00\x04user\x00\x08password'
stream = asyncio.StreamReader(loop=self.loop)
stream.feed_data(data)
with self.assertRaises(ConnectException):
self.loop.run_until_complete(ConnectCodec.decode(header, stream))
with self.assertRaises(MQTTException):
self.loop.run_until_complete(ConnectPacket.from_stream(stream))
def test_decode_fail_miss_clientId(self):
data = b'\x00\x04MQTT\x04\xce\x00\x00'
header = MQTTHeader(PacketType.CONNECT, 0x00, len(data))
data = b'\x10\x0a\x00\x04MQTT\x04\xce\x00\x00'
stream = asyncio.StreamReader(loop=self.loop)
stream.feed_data(data)
stream.feed_eof()
with self.assertRaises(ConnectException):
self.loop.run_until_complete(ConnectCodec.decode(header, stream))
with self.assertRaises(MQTTException):
self.loop.run_until_complete(ConnectPacket.from_stream(stream))
def test_decode_fail_miss_willtopic(self):
data = b'\x00\x04MQTT\x04\xce\x00\x00\x00\x0a0123456789'
header = MQTTHeader(PacketType.CONNECT, 0x00, len(data))
data = b'\x10\x16\x00\x04MQTT\x04\xce\x00\x00\x00\x0a0123456789'
stream = asyncio.StreamReader(loop=self.loop)
stream.feed_data(data)
stream.feed_eof()
with self.assertRaises(ConnectException):
self.loop.run_until_complete(ConnectCodec.decode(header, stream))
with self.assertRaises(MQTTException):
self.loop.run_until_complete(ConnectPacket.from_stream(stream))
def test_decode_fail_miss_username(self):
data = b'\x00\x04MQTT\x04\xce\x00\x00\x00\x0a0123456789\x00\x09WillTopic\x00\x0bWillMessage'
header = MQTTHeader(PacketType.CONNECT, 0x00, len(data))
data = b'\x10\x2e\x00\x04MQTT\x04\xce\x00\x00\x00\x0a0123456789\x00\x09WillTopic\x00\x0bWillMessage'
stream = asyncio.StreamReader(loop=self.loop)
stream.feed_data(data)
stream.feed_eof()
with self.assertRaises(ConnectException):
self.loop.run_until_complete(ConnectCodec.decode(header, stream))
with self.assertRaises(MQTTException):
self.loop.run_until_complete(ConnectPacket.from_stream(stream))
def test_decode_fail_miss_password(self):
data = b'\x00\x04MQTT\x04\xce\x00\x00\x00\x0a0123456789\x00\x09WillTopic\x00\x0bWillMessage\x00\x04user'
header = MQTTHeader(PacketType.CONNECT, 0x00, len(data))
data = b'\x10\x34\x00\x04MQTT\x04\xce\x00\x00\x00\x0a0123456789\x00\x09WillTopic\x00\x0bWillMessage\x00\x04user'
stream = asyncio.StreamReader(loop=self.loop)
stream.feed_data(data)
stream.feed_eof()
with self.assertRaises(ConnectException):
self.loop.run_until_complete(ConnectCodec.decode(header, stream))
with self.assertRaises(MQTTException):
self.loop.run_until_complete(ConnectPacket.from_stream(stream))
def test_encode(self):
header = MQTTHeader(PacketType.CONNECT, 0x00, 0)
message = ConnectMessage(header, 0xce, 0, 'MQTT', 4)
message.client_id = '0123456789'
message.will_topic = 'WillTopic'
message.will_message = 'WillMessage'
message.user_name = 'user'
message.password = 'password'
encoded = yield from ConnectCodec.encode(message)
header = MQTTFixedHeader(PacketType.CONNECT, 0x00, 0)
variable_header = ConnectVariableHeader(0xce, 0, 'MQTT', 4)
payload = ConnectPayload('0123456789', 'WillTopic', 'WillMessage', 'user', 'password')
message = ConnectPacket(header, variable_header, payload)
encoded = message.to_bytes()
self.assertEqual(encoded, b'\x10\x3e\x00\x04MQTT\x04\xce\x00\x00\x00\x0a0123456789\x00\x09WillTopic\x00\x0bWillMessage\x00\x04user\x00\x08password')