Add read_string + tests

pull/8/head
Nicolas Jouanin 2015-05-29 22:50:07 +02:00
rodzic 5cec05feee
commit eca1e47e9c
2 zmienionych plików z 36 dodań i 2 usunięć

Wyświetl plik

@ -7,7 +7,7 @@ from hbmqtt.streams.errors import NoDataException
def bytes_to_hex_str(data):
return '0x' + ''.join(format(b, '02x') for b in data)
def hex_to_int(data):
def bytes_to_int(data):
return int.from_bytes(data, byteorder='big')
@asyncio.coroutine
@ -19,4 +19,7 @@ def read_or_raise(reader, n=-1):
@asyncio.coroutine
def read_string(reader):
length = yield from read_or_raise(reader, 2)
length_bytes = yield from read_or_raise(reader, 2)
str_length = bytes_to_int(length_bytes)
byte_str = yield from read_or_raise(reader, str_length)
return byte_str.decode()

Wyświetl plik

@ -0,0 +1,31 @@
# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
import unittest
import asyncio
from hbmqtt.utils import (
bytes_to_hex_str,
bytes_to_int,
read_string,
)
from hbmqtt.message import MessageType
class TestUtils(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
def test_bytes_to_hex_str(self):
ret = bytes_to_hex_str(b'\x7f')
self.assertEqual(ret, '0x7f')
def test_bytes_to_int(self):
ret = bytes_to_int(b'\x7f')
self.assertEqual(ret, 127)
ret = bytes_to_int(b'\xff\xff')
self.assertEqual(ret, 65535)
def test_read_string(self):
stream = asyncio.StreamReader(loop=self.loop)
stream.feed_data(b'\x00\x02AA')
ret = self.loop.run_until_complete(read_string(stream))
self.assertEqual(ret, 'AA')