amqtt/tests/mqtt/test_connect.py

188 wiersze
7.5 KiB
Python

import asyncio
import unittest
import pytest
from amqtt.adapters import BufferReader
from amqtt.errors import AMQTTError
from amqtt.mqtt.connect import ConnectPacket, ConnectPayload, ConnectVariableHeader
from amqtt.mqtt.packet import CONNECT, MQTTFixedHeader, PUBLISH
class ConnectPacketTest(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
def test_decode_ok(self):
data = (
b"\x10\x3e\x00\x04MQTT\x04\xce\x00\x00\x00\x0a0123456789"
b"\x00\x09WillTopic\x00\x0bWillMessage\x00\x04user\x00\x08password"
)
stream = BufferReader(data)
message = self.loop.run_until_complete(ConnectPacket.from_stream(stream))
assert message.variable_header is not None
assert message.variable_header.proto_name == "MQTT"
assert message.variable_header.proto_level == 4
assert message.variable_header.username_flag
assert message.variable_header.password_flag
assert not message.variable_header.will_retain_flag
assert message.variable_header.will_qos == 1
assert message.variable_header.will_flag
assert message.variable_header.clean_session_flag
assert not message.variable_header.reserved_flag
assert message.payload is not None
assert message.payload.client_id == "0123456789"
assert message.payload.will_topic == "WillTopic"
assert message.payload.will_message == b"WillMessage"
assert message.payload.username == "user"
assert message.payload.password == "password"
def test_decode_ok_will_flag(self):
data = b"\x10\x26\x00\x04MQTT\x04\xca\x00\x00\x00\x0a0123456789\x00\x04user\x00\x08password"
stream = BufferReader(data)
message = self.loop.run_until_complete(ConnectPacket.from_stream(stream))
assert message.variable_header is not None
assert message.variable_header.proto_name == "MQTT"
assert message.variable_header.proto_level == 4
assert message.variable_header.username_flag
assert message.variable_header.password_flag
assert not message.variable_header.will_retain_flag
assert message.variable_header.will_qos == 1
assert not message.variable_header.will_flag
assert message.variable_header.clean_session_flag
assert not message.variable_header.reserved_flag
assert message.payload is not None
assert message.payload.client_id == "0123456789"
assert message.payload.will_topic is None
assert message.payload.will_message is None
assert message.payload.username == "user"
assert message.payload.password == "password"
def test_decode_fail_reserved_flag(self):
data = (
b"\x10\x3e\x00\x04MQTT\x04\xcf\x00\x00\x00\x0a0123456789"
b"\x00\x09WillTopic\x00\x0bWillMessage\x00\x04user\x00\x08password"
)
stream = BufferReader(data)
message = self.loop.run_until_complete(ConnectPacket.from_stream(stream))
assert message.variable_header is not None
assert message.variable_header.reserved_flag
def test_decode_fail_miss_clientId(self):
data = b"\x10\x0a\x00\x04MQTT\x04\xce\x00\x00"
stream = BufferReader(data)
message = self.loop.run_until_complete(ConnectPacket.from_stream(stream))
assert message.payload is not None
assert message.payload.client_id is not None
def test_decode_fail_miss_willtopic(self):
data = b"\x10\x16\x00\x04MQTT\x04\xce\x00\x00\x00\x0a0123456789"
stream = BufferReader(data)
message = self.loop.run_until_complete(ConnectPacket.from_stream(stream))
assert message.payload is not None
assert message.payload.will_topic is None
def test_decode_fail_miss_username(self):
data = b"\x10\x2e\x00\x04MQTT\x04\xce\x00\x00\x00\x0a0123456789\x00\x09WillTopic\x00\x0bWillMessage"
stream = BufferReader(data)
message = self.loop.run_until_complete(ConnectPacket.from_stream(stream))
assert message.payload is not None
assert message.payload.username is None
def test_decode_fail_miss_password(self):
data = b"\x10\x34\x00\x04MQTT\x04\xce\x00\x00\x00\x0a0123456789\x00\x09WillTopic\x00\x0bWillMessage\x00\x04user"
stream = BufferReader(data)
message = self.loop.run_until_complete(ConnectPacket.from_stream(stream))
assert message.payload is not None
assert message.payload.password is None
def test_encode(self):
header = MQTTFixedHeader(CONNECT, 0x00, 0)
variable_header = ConnectVariableHeader(0xCE, 0, "MQTT", 4)
payload = ConnectPayload(
"0123456789",
"WillTopic",
b"WillMessage",
"user",
"password",
)
message = ConnectPacket(header, variable_header, payload)
encoded = message.to_bytes()
assert (
encoded
== b"\x10>\x00\x04MQTT\x04\xce\x00\x00\x00\n0123456789\x00\tWillTopic\x00\x0bWillMessage\x00\x04user\x00\x08password"
)
def test_getattr_ok(self):
data = (
b"\x10\x3e\x00\x04MQTT\x04\xce\x00\x00\x00\x0a0123456789"
b"\x00\x09WillTopic\x00\x0bWillMessage\x00\x04user\x00\x08password"
)
stream = BufferReader(data)
message = self.loop.run_until_complete(ConnectPacket.from_stream(stream))
assert message.variable_header is not None
assert message.variable_header.proto_name == "MQTT"
assert message.proto_name == "MQTT"
assert message.variable_header.proto_level == 4
assert message.proto_level == 4
assert message.variable_header.username_flag
assert message.username_flag
assert message.variable_header.password_flag
assert message.password_flag
assert not message.variable_header.will_retain_flag
assert not message.will_retain_flag
assert message.variable_header.will_qos == 1
assert message.will_qos == 1
assert message.variable_header.will_flag
assert message.will_flag
assert message.variable_header.clean_session_flag
assert message.clean_session_flag
assert not message.variable_header.reserved_flag
assert not message.reserved_flag
assert message.payload is not None
assert message.payload.client_id == "0123456789"
assert message.client_id == "0123456789"
assert message.payload.will_topic == "WillTopic"
assert message.will_topic == "WillTopic"
assert message.payload.will_message == b"WillMessage"
assert message.will_message == b"WillMessage"
assert message.payload.username == "user"
assert message.username == "user"
assert message.payload.password == "password"
assert message.password == "password"
def test_incorrect_fixed_header():
header = MQTTFixedHeader(PUBLISH, 0x00)
with pytest.raises(AMQTTError):
_ = ConnectPacket(fixed=header)
@pytest.mark.parametrize("prop", [
"proto_name",
"proto_level",
"username_flag",
"password_flag",
"clean_session_flag",
"will_retain_flag",
"will_qos",
"will_flag",
"reserved_flag",
"client_id",
"client_id_is_random",
"will_topic",
"will_message",
"username",
"password",
"keep_alive",
])
def test_empty_variable_header(prop):
packet = ConnectPacket()
with pytest.raises(ValueError):
assert getattr(packet, prop) is not None
with pytest.raises(ValueError):
assert setattr(packet, prop, "a value")