kopia lustrzana https://github.com/Yakifo/amqtt
Rename streams -> codecs
rodzic
16c3a0cdce
commit
5a70775bd5
|
@ -0,0 +1,35 @@
|
||||||
|
# Copyright (c) 2015 Nicolas JOUANIN
|
||||||
|
#
|
||||||
|
# See the file license.txt for copying permission.
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from hbmqtt.codecs.utils import (
|
||||||
|
bytes_to_hex_str,
|
||||||
|
bytes_to_int,
|
||||||
|
read_string,
|
||||||
|
read_or_raise,
|
||||||
|
)
|
||||||
|
from hbmqtt.message import FixedHeader, ConnectMessage
|
||||||
|
from hbmqtt.codecs.errors import CodecException
|
||||||
|
|
||||||
|
|
||||||
|
class ConnectException(CodecException):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ConnectStream:
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def decode(self, fixed_header: FixedHeader, reader):
|
||||||
|
if fixed_header.flags:
|
||||||
|
raise ConnectException("[MQTT-2.2.2-1] Header flags reserved for future use")
|
||||||
|
message = ConnectMessage(fixed_header)
|
||||||
|
protocol_name = yield from read_string(reader)
|
||||||
|
if protocol_name is not 'MQTT':
|
||||||
|
raise ConnectException('[MQTT-3.1.2-1] Incorrect protocol name')
|
||||||
|
protocol_level_byte = read_or_raise(reader, 1)
|
||||||
|
protocol_level = bytes_to_int(protocol_level_byte)
|
||||||
|
if protocol_level != 4:
|
||||||
|
raise ConnectException('Unsupported protocol level %s' % bytes_to_hex_str(protocol_level_byte))
|
|
@ -2,9 +2,8 @@
|
||||||
#
|
#
|
||||||
# See the file license.txt for copying permission.
|
# See the file license.txt for copying permission.
|
||||||
|
|
||||||
class StreamException(BaseException):
|
class CodecException(BaseException):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class NoDataException(CodecException):
|
||||||
class FixedHeaderException(StreamException):
|
|
||||||
pass
|
pass
|
|
@ -2,18 +2,20 @@
|
||||||
#
|
#
|
||||||
# See the file license.txt for copying permission.
|
# See the file license.txt for copying permission.
|
||||||
import asyncio
|
import asyncio
|
||||||
from hbmqtt.utils import (
|
|
||||||
|
from hbmqtt.codecs.utils import (
|
||||||
bytes_to_hex_str,
|
bytes_to_hex_str,
|
||||||
bytes_to_int,
|
bytes_to_int,
|
||||||
read_or_raise,
|
read_or_raise,
|
||||||
)
|
)
|
||||||
from hbmqtt.message import FixedHeader, MessageType
|
from hbmqtt.message import FixedHeader, MessageType
|
||||||
from hbmqtt.streams.errors import StreamException, NoDataException
|
from hbmqtt.codecs.errors import CodecException
|
||||||
|
|
||||||
class FixedHeaderException(StreamException):
|
|
||||||
|
class FixedHeaderException(CodecException):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class FixedHeaderStream:
|
class FixedHeaderCodec:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -25,10 +27,10 @@ class FixedHeaderStream:
|
||||||
:return: FixedHeader instance
|
:return: FixedHeader instance
|
||||||
"""
|
"""
|
||||||
b1 = yield from read_or_raise(reader, 1)
|
b1 = yield from read_or_raise(reader, 1)
|
||||||
msg_type = FixedHeaderStream.get_message_type(b1)
|
msg_type = FixedHeaderCodec.get_message_type(b1)
|
||||||
if msg_type is MessageType.RESERVED_0 or msg_type is MessageType.RESERVED_15:
|
if msg_type is MessageType.RESERVED_0 or msg_type is MessageType.RESERVED_15:
|
||||||
raise FixedHeaderException("Usage of control packet type %s is forbidden" % msg_type)
|
raise FixedHeaderException("Usage of control packet type %s is forbidden" % msg_type)
|
||||||
flags = FixedHeaderStream.get_flags(b1)
|
flags = FixedHeaderCodec.get_flags(b1)
|
||||||
|
|
||||||
remain_length = yield from self.decode_remaining_length(reader)
|
remain_length = yield from self.decode_remaining_length(reader)
|
||||||
return FixedHeader(msg_type, flags, remain_length)
|
return FixedHeader(msg_type, flags, remain_length)
|
|
@ -3,7 +3,7 @@
|
||||||
# See the file license.txt for copying permission.
|
# See the file license.txt for copying permission.
|
||||||
import unittest
|
import unittest
|
||||||
import asyncio
|
import asyncio
|
||||||
from hbmqtt.streams.fixed_header import FixedHeaderStream, FixedHeaderException
|
from hbmqtt.codecs.header import FixedHeaderCodec, FixedHeaderException
|
||||||
from hbmqtt.message import MessageType
|
from hbmqtt.message import MessageType
|
||||||
|
|
||||||
class TestFixedHeader(unittest.TestCase):
|
class TestFixedHeader(unittest.TestCase):
|
||||||
|
@ -11,11 +11,11 @@ class TestFixedHeader(unittest.TestCase):
|
||||||
self.loop = asyncio.new_event_loop()
|
self.loop = asyncio.new_event_loop()
|
||||||
|
|
||||||
def test_get_message_type(self):
|
def test_get_message_type(self):
|
||||||
m_type = FixedHeaderStream.get_message_type(b'\x10')
|
m_type = FixedHeaderCodec.get_message_type(b'\x10')
|
||||||
self.assertEqual(m_type, MessageType.CONNECT)
|
self.assertEqual(m_type, MessageType.CONNECT)
|
||||||
|
|
||||||
def test_get_flags(self):
|
def test_get_flags(self):
|
||||||
flags = FixedHeaderStream.get_flags(b'\x1f')
|
flags = FixedHeaderCodec.get_flags(b'\x1f')
|
||||||
self.assertTrue(flags & 0x08)
|
self.assertTrue(flags & 0x08)
|
||||||
self.assertTrue(flags & 0x04)
|
self.assertTrue(flags & 0x04)
|
||||||
self.assertTrue(flags & 0x02)
|
self.assertTrue(flags & 0x02)
|
||||||
|
@ -25,42 +25,42 @@ class TestFixedHeader(unittest.TestCase):
|
||||||
def test_decode_remaining_length1(self):
|
def test_decode_remaining_length1(self):
|
||||||
stream = asyncio.StreamReader(loop=self.loop)
|
stream = asyncio.StreamReader(loop=self.loop)
|
||||||
stream.feed_data(b'\x7f')
|
stream.feed_data(b'\x7f')
|
||||||
s = FixedHeaderStream()
|
s = FixedHeaderCodec()
|
||||||
length = self.loop.run_until_complete(s.decode_remaining_length(stream))
|
length = self.loop.run_until_complete(s.decode_remaining_length(stream))
|
||||||
self.assertEqual(length, 127)
|
self.assertEqual(length, 127)
|
||||||
|
|
||||||
def test_decode_remaining_length2(self):
|
def test_decode_remaining_length2(self):
|
||||||
stream = asyncio.StreamReader(loop=self.loop)
|
stream = asyncio.StreamReader(loop=self.loop)
|
||||||
stream.feed_data(b'\xff\x7f')
|
stream.feed_data(b'\xff\x7f')
|
||||||
s = FixedHeaderStream()
|
s = FixedHeaderCodec()
|
||||||
length = self.loop.run_until_complete(s.decode_remaining_length(stream))
|
length = self.loop.run_until_complete(s.decode_remaining_length(stream))
|
||||||
self.assertEqual(length, 16383)
|
self.assertEqual(length, 16383)
|
||||||
|
|
||||||
def test_decode_remaining_length3(self):
|
def test_decode_remaining_length3(self):
|
||||||
stream = asyncio.StreamReader(loop=self.loop)
|
stream = asyncio.StreamReader(loop=self.loop)
|
||||||
stream.feed_data(b'\xff\xff\x7f')
|
stream.feed_data(b'\xff\xff\x7f')
|
||||||
s = FixedHeaderStream()
|
s = FixedHeaderCodec()
|
||||||
length = self.loop.run_until_complete(s.decode_remaining_length(stream))
|
length = self.loop.run_until_complete(s.decode_remaining_length(stream))
|
||||||
self.assertEqual(length, 2097151)
|
self.assertEqual(length, 2097151)
|
||||||
|
|
||||||
def test_decode_remaining_length4(self):
|
def test_decode_remaining_length4(self):
|
||||||
stream = asyncio.StreamReader(loop=self.loop)
|
stream = asyncio.StreamReader(loop=self.loop)
|
||||||
stream.feed_data(b'\xff\xff\xff\x7f')
|
stream.feed_data(b'\xff\xff\xff\x7f')
|
||||||
s = FixedHeaderStream()
|
s = FixedHeaderCodec()
|
||||||
length = self.loop.run_until_complete(s.decode_remaining_length(stream))
|
length = self.loop.run_until_complete(s.decode_remaining_length(stream))
|
||||||
self.assertEqual(length, 268435455)
|
self.assertEqual(length, 268435455)
|
||||||
|
|
||||||
def test_decode_remaining_length5(self):
|
def test_decode_remaining_length5(self):
|
||||||
stream = asyncio.StreamReader(loop=self.loop)
|
stream = asyncio.StreamReader(loop=self.loop)
|
||||||
stream.feed_data(b'\xff\xff\xff\xff\x7f')
|
stream.feed_data(b'\xff\xff\xff\xff\x7f')
|
||||||
s = FixedHeaderStream()
|
s = FixedHeaderCodec()
|
||||||
with self.assertRaises(FixedHeaderException):
|
with self.assertRaises(FixedHeaderException):
|
||||||
self.loop.run_until_complete(s.decode_remaining_length(stream))
|
self.loop.run_until_complete(s.decode_remaining_length(stream))
|
||||||
|
|
||||||
def test_decode_ok(self):
|
def test_decode_ok(self):
|
||||||
stream = asyncio.StreamReader(loop=self.loop)
|
stream = asyncio.StreamReader(loop=self.loop)
|
||||||
stream.feed_data(b'\x10\x7f')
|
stream.feed_data(b'\x10\x7f')
|
||||||
s = FixedHeaderStream()
|
s = FixedHeaderCodec()
|
||||||
header = self.loop.run_until_complete(s.decode(stream))
|
header = self.loop.run_until_complete(s.decode(stream))
|
||||||
self.assertEqual(header.message_type, MessageType.CONNECT)
|
self.assertEqual(header.message_type, MessageType.CONNECT)
|
||||||
self.assertFalse(header.flags & 0x08)
|
self.assertFalse(header.flags & 0x08)
|
||||||
|
@ -70,6 +70,6 @@ class TestFixedHeader(unittest.TestCase):
|
||||||
def test_decode_ko(self):
|
def test_decode_ko(self):
|
||||||
stream = asyncio.StreamReader(loop=self.loop)
|
stream = asyncio.StreamReader(loop=self.loop)
|
||||||
stream.feed_data(b'\x0f\x7f')
|
stream.feed_data(b'\x0f\x7f')
|
||||||
s = FixedHeaderStream()
|
s = FixedHeaderCodec()
|
||||||
with self.assertRaises(FixedHeaderException):
|
with self.assertRaises(FixedHeaderException):
|
||||||
self.loop.run_until_complete(s.decode(stream))
|
self.loop.run_until_complete(s.decode(stream))
|
Ładowanie…
Reference in New Issue