kopia lustrzana https://github.com/Yakifo/amqtt
Init message streaming for message encoding/decoding
rodzic
2b45fc6a3e
commit
d3f09dc4ed
|
@ -0,0 +1,38 @@
|
||||||
|
# Copyright (c) 2015 Nicolas JOUANIN
|
||||||
|
#
|
||||||
|
# See the file license.txt for copying permission.
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
class MessageType(Enum):
|
||||||
|
RESERVED_0 = 0
|
||||||
|
CONNECT = 1
|
||||||
|
CONNACK = 2
|
||||||
|
PUBLISH = 3
|
||||||
|
PUBACK = 4
|
||||||
|
PUBREC = 5
|
||||||
|
PUBREL = 6
|
||||||
|
PUBCOMP = 7
|
||||||
|
SUBSCRIBE = 8
|
||||||
|
SUBACK = 9
|
||||||
|
UNSUBSCRIBE = 10
|
||||||
|
UNSUBACK = 11
|
||||||
|
PINGREQ = 12
|
||||||
|
PINGRESP = 13
|
||||||
|
DISCONNECT = 14
|
||||||
|
RESERVED_15 = 15
|
||||||
|
|
||||||
|
|
||||||
|
def get_message_type(byte):
|
||||||
|
return MessageType(byte)
|
||||||
|
|
||||||
|
class Message:
|
||||||
|
def __init__(self, msg_type, length, dup_flag=False, qos=0, retain_flag=False):
|
||||||
|
if isinstance(msg_type, int):
|
||||||
|
enum_type = msg_type
|
||||||
|
else:
|
||||||
|
enum_type = get_message_type(msg_type)
|
||||||
|
self.message_type = enum_type
|
||||||
|
self.remainingLength = length
|
||||||
|
self.dupFlag = dup_flag
|
||||||
|
self.qos = qos
|
||||||
|
self.retain = retain_flag
|
|
@ -0,0 +1,3 @@
|
||||||
|
# Copyright (c) 2015 Nicolas JOUANIN
|
||||||
|
#
|
||||||
|
# See the file license.txt for copying permission.
|
|
@ -0,0 +1,51 @@
|
||||||
|
# Copyright (c) 2015 Nicolas JOUANIN
|
||||||
|
#
|
||||||
|
# See the file license.txt for copying permission.
|
||||||
|
import asyncio
|
||||||
|
from hbmqtt.utils import (
|
||||||
|
bytes_to_hex_str,
|
||||||
|
hex_to_int,
|
||||||
|
)
|
||||||
|
from hbmqtt.message import Message
|
||||||
|
from hbmqtt.streams.errors import FixedHeaderException
|
||||||
|
|
||||||
|
class BaseStream:
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def decode(self, reader):
|
||||||
|
b1 = yield from reader.read(1)
|
||||||
|
msg_type = BaseStream.get_message_type(b1)
|
||||||
|
(dup_flag, qos, retain_flag) = BaseStream.get_flags(b1)
|
||||||
|
remain_length = yield from self.decode_remaining_length(b1, reader)
|
||||||
|
return Message(msg_type, remain_length, dup_flag, qos, retain_flag)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_message_type(byte):
|
||||||
|
return (hex_to_int(byte) & 0xf0) >> 4
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_flags(data):
|
||||||
|
byte = hex_to_int(data)
|
||||||
|
b3 = True if (byte & 0x08) >> 3 else False
|
||||||
|
b21 = (byte & 0x06) >> 1
|
||||||
|
b0 = True if (byte & 0x01) else False
|
||||||
|
return b3, b21, b0
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def decode_remaining_length(self, reader):
|
||||||
|
multiplier = 1
|
||||||
|
value = 0
|
||||||
|
length_bytes = b''
|
||||||
|
while True:
|
||||||
|
encoded_byte = yield from reader.read(1)
|
||||||
|
length_bytes += encoded_byte
|
||||||
|
int_byte = hex_to_int(encoded_byte)
|
||||||
|
value += (int_byte & 0x7f) * multiplier
|
||||||
|
if (int_byte & 0x80) == 0:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
multiplier *= 128
|
||||||
|
if multiplier > 128*128*128:
|
||||||
|
raise FixedHeaderException("Invalid remaining length bytes:%s" % bytes_to_hex_str(length_bytes))
|
||||||
|
return value
|
|
@ -0,0 +1,10 @@
|
||||||
|
# Copyright (c) 2015 Nicolas JOUANIN
|
||||||
|
#
|
||||||
|
# See the file license.txt for copying permission.
|
||||||
|
|
||||||
|
class StreamException(BaseException):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FixedHeaderException(StreamException):
|
||||||
|
pass
|
|
@ -0,0 +1,56 @@
|
||||||
|
# Copyright (c) 2015 Nicolas JOUANIN
|
||||||
|
#
|
||||||
|
# See the file license.txt for copying permission.
|
||||||
|
import unittest
|
||||||
|
import asyncio
|
||||||
|
from hbmqtt.streams.base import BaseStream
|
||||||
|
from hbmqtt.streams.errors import FixedHeaderException
|
||||||
|
|
||||||
|
class TestBaseStream(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.loop = asyncio.new_event_loop()
|
||||||
|
|
||||||
|
def test_get_message_type(self):
|
||||||
|
m_type = BaseStream.get_message_type(b'\x10')
|
||||||
|
self.assertEqual(m_type, 1)
|
||||||
|
|
||||||
|
def test_get_flags(self):
|
||||||
|
(dup_flag, qos, retain_flag) = BaseStream.get_flags(b'\x1f')
|
||||||
|
self.assertTrue(dup_flag)
|
||||||
|
self.assertEqual(qos, 3)
|
||||||
|
self.assertTrue(retain_flag)
|
||||||
|
|
||||||
|
def test_decode_remaining_length1(self):
|
||||||
|
stream = asyncio.StreamReader(loop=self.loop)
|
||||||
|
stream.feed_data(b'\x7f')
|
||||||
|
s = BaseStream()
|
||||||
|
length = self.loop.run_until_complete(s.decode_remaining_length(stream))
|
||||||
|
self.assertEqual(length, 127)
|
||||||
|
|
||||||
|
def test_decode_remaining_length2(self):
|
||||||
|
stream = asyncio.StreamReader(loop=self.loop)
|
||||||
|
stream.feed_data(b'\xff\x7f')
|
||||||
|
s = BaseStream()
|
||||||
|
length = self.loop.run_until_complete(s.decode_remaining_length(stream))
|
||||||
|
self.assertEqual(length, 16383)
|
||||||
|
|
||||||
|
def test_decode_remaining_length3(self):
|
||||||
|
stream = asyncio.StreamReader(loop=self.loop)
|
||||||
|
stream.feed_data(b'\xff\xff\x7f')
|
||||||
|
s = BaseStream()
|
||||||
|
length = self.loop.run_until_complete(s.decode_remaining_length(stream))
|
||||||
|
self.assertEqual(length, 2097151)
|
||||||
|
|
||||||
|
def test_decode_remaining_length4(self):
|
||||||
|
stream = asyncio.StreamReader(loop=self.loop)
|
||||||
|
stream.feed_data(b'\xff\xff\xff\x7f')
|
||||||
|
s = BaseStream()
|
||||||
|
length = self.loop.run_until_complete(s.decode_remaining_length(stream))
|
||||||
|
self.assertEqual(length, 268435455)
|
||||||
|
|
||||||
|
def test_decode_remaining_length5(self):
|
||||||
|
stream = asyncio.StreamReader(loop=self.loop)
|
||||||
|
stream.feed_data(b'\xff\xff\xff\xff\x7f')
|
||||||
|
s = BaseStream()
|
||||||
|
with self.assertRaises(FixedHeaderException):
|
||||||
|
self.loop.run_until_complete(s.decode_remaining_length(stream))
|
Ładowanie…
Reference in New Issue