amqtt/tests/mqtt/test_subscribe.py

31 wiersze
1.1 KiB
Python

import asyncio
2015-06-17 20:06:11 +00:00
import unittest
2021-03-27 12:16:42 +00:00
from amqtt.adapters import BufferReader
from amqtt.mqtt.constants import QOS_1, QOS_2
from amqtt.mqtt.packet import PacketIdVariableHeader
from amqtt.mqtt.subscribe import SubscribePacket, SubscribePayload
2015-06-17 20:06:11 +00:00
class SubscribePacketTest(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
def test_from_stream(self):
data = b"\x80\x0e\x00\x0a\x00\x03a/b\x01\x00\x03c/d\x02"
stream = BufferReader(data)
2015-06-17 20:06:11 +00:00
message = self.loop.run_until_complete(SubscribePacket.from_stream(stream))
2015-10-14 20:58:18 +00:00
(topic, qos) = message.payload.topics[0]
assert topic == "a/b"
assert qos == QOS_1
2015-10-14 20:58:18 +00:00
(topic, qos) = message.payload.topics[1]
assert topic == "c/d"
assert qos == QOS_2
2015-06-17 20:06:11 +00:00
def test_to_stream(self):
variable_header = PacketIdVariableHeader(10)
payload = SubscribePayload([("a/b", QOS_1), ("c/d", QOS_2)])
2015-06-17 20:06:11 +00:00
publish = SubscribePacket(variable_header=variable_header, payload=payload)
out = publish.to_bytes()
assert out == b"\x82\x0e\x00\n\x00\x03a/b\x01\x00\x03c/d\x02"