amqtt/tests/test_codecs.py

37 wiersze
985 B
Python

# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
import unittest
import asyncio
from hbmqtt.codecs import (
bytes_to_hex_str,
bytes_to_int,
decode_string,
encode_string,
)
class TestCodecs(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
def test_bytes_to_hex_str(self):
ret = bytes_to_hex_str(b'\x7f')
self.assertEqual(ret, '0x7f')
def test_bytes_to_int(self):
ret = bytes_to_int(b'\x7f')
self.assertEqual(ret, 127)
ret = bytes_to_int(b'\xff\xff')
self.assertEqual(ret, 65535)
def test_decode_string(self):
stream = asyncio.StreamReader(loop=self.loop)
stream.feed_data(b'\x00\x02AA')
ret = self.loop.run_until_complete(decode_string(stream))
self.assertEqual(ret, 'AA')
def test_encode_string(self):
encoded = encode_string('AA')
self.assertEqual(b'\x00\x02AA', encoded)