2015-05-29 20:50:07 +00:00
|
|
|
import asyncio
|
2024-12-21 10:52:26 +00:00
|
|
|
import unittest
|
2015-05-30 11:58:29 +00:00
|
|
|
|
2024-12-21 10:52:26 +00:00
|
|
|
from amqtt.adapters import StreamReaderAdapter
|
2025-01-16 19:01:16 +00:00
|
|
|
from amqtt.codecs_amqtt import (
|
2015-05-29 20:50:07 +00:00
|
|
|
bytes_to_hex_str,
|
|
|
|
bytes_to_int,
|
2015-05-31 12:23:45 +00:00
|
|
|
decode_string,
|
2015-05-31 12:30:32 +00:00
|
|
|
encode_string,
|
2015-05-29 20:50:07 +00:00
|
|
|
)
|
2015-05-30 11:58:29 +00:00
|
|
|
|
2015-05-29 20:50:07 +00:00
|
|
|
|
2015-06-13 12:38:59 +00:00
|
|
|
class TestCodecs(unittest.TestCase):
|
2015-05-29 20:50:07 +00:00
|
|
|
def setUp(self):
|
|
|
|
self.loop = asyncio.new_event_loop()
|
|
|
|
|
|
|
|
def test_bytes_to_hex_str(self):
|
2021-03-14 20:44:41 +00:00
|
|
|
ret = bytes_to_hex_str(b"\x7f")
|
2024-12-21 10:52:26 +00:00
|
|
|
assert ret == "0x7f"
|
2015-05-29 20:50:07 +00:00
|
|
|
|
|
|
|
def test_bytes_to_int(self):
|
2021-03-14 20:44:41 +00:00
|
|
|
ret = bytes_to_int(b"\x7f")
|
2024-12-21 10:52:26 +00:00
|
|
|
assert ret == 127
|
2021-03-14 20:44:41 +00:00
|
|
|
ret = bytes_to_int(b"\xff\xff")
|
2024-12-21 10:52:26 +00:00
|
|
|
assert ret == 65535
|
2015-05-29 20:50:07 +00:00
|
|
|
|
2015-05-31 12:30:32 +00:00
|
|
|
def test_decode_string(self):
|
2015-05-29 20:50:07 +00:00
|
|
|
stream = asyncio.StreamReader(loop=self.loop)
|
2021-03-14 20:44:41 +00:00
|
|
|
stream.feed_data(b"\x00\x02AA")
|
2024-12-21 10:52:26 +00:00
|
|
|
ret = self.loop.run_until_complete(decode_string(StreamReaderAdapter(stream)))
|
|
|
|
assert ret == "AA"
|
2015-05-31 12:30:32 +00:00
|
|
|
|
|
|
|
def test_encode_string(self):
|
2021-03-14 20:44:41 +00:00
|
|
|
encoded = encode_string("AA")
|
2024-12-21 10:52:26 +00:00
|
|
|
assert encoded == b"\x00\x02AA"
|