From 6a876be559a79e2f821ddc8aad3148465d4983c0 Mon Sep 17 00:00:00 2001 From: Nico Date: Thu, 15 Oct 2015 21:57:21 +0200 Subject: [PATCH] Improve perf --- hbmqtt/codecs.py | 21 ++++++++++++--------- hbmqtt/mqtt/protocol/handler.py | 7 +++---- hbmqtt/mqtt/publish.py | 6 +++--- samples/broker_start.py | 8 +++++++- tests/mqtt/test_suback.py | 4 ++-- 5 files changed, 27 insertions(+), 19 deletions(-) diff --git a/hbmqtt/codecs.py b/hbmqtt/codecs.py index 196fdcd..eaaffb5 100644 --- a/hbmqtt/codecs.py +++ b/hbmqtt/codecs.py @@ -3,7 +3,7 @@ # See the file license.txt for copying permission. import asyncio from math import ceil -from struct import unpack +from struct import pack, unpack from hbmqtt.errors import NoDataException @@ -29,18 +29,18 @@ def bytes_to_int(data): return data -def int_to_bytes(int_value: int, length=-1) -> bytes: +def int_to_bytes(int_value: int, length: int) -> bytes: """ convert an integer to a sequence of bytes using big endian byte ordering :param int_value: integer value to convert :param length: (optional) byte length :return: byte sequence """ - if length == -1: - length = ceil(int_value.bit_length()//8) - if length == 0: - length = 1 - return int_value.to_bytes(length, byteorder='big') + if length == 1: + fmt = "!B" + elif length == 2: + fmt = "!H" + return pack(fmt, int_value) @asyncio.coroutine @@ -66,8 +66,11 @@ def decode_string(reader) -> bytes: """ length_bytes = yield from read_or_raise(reader, 2) str_length = unpack("!H", length_bytes) - byte_str = yield from read_or_raise(reader, str_length[0]) - return byte_str.decode(encoding='utf-8') + if str_length[0]: + byte_str = yield from read_or_raise(reader, str_length[0]) + return byte_str.decode(encoding='utf-8') + else: + return '' @asyncio.coroutine diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index a1c4bd1..8afd6cd 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -355,14 +355,14 @@ class ProtocolHandler: def _reader_loop(self): self.logger.debug("%s Starting reader coro" % self.session.client_id) running_tasks = collections.deque() + keepalive_timeout = self.session.keep_alive + if keepalive_timeout <= 0: + keepalive_timeout = None while True: try: self._reader_ready.set() while running_tasks and running_tasks[0].done(): running_tasks.popleft() - keepalive_timeout = self.session.keep_alive - if keepalive_timeout <= 0: - keepalive_timeout = None fixed_header = yield from asyncio.wait_for( MQTTFixedHeader.from_stream(self.reader), keepalive_timeout, loop=self._loop) @@ -440,7 +440,6 @@ class ProtocolHandler: self._keepalive_task = self._loop.call_later(self.keepalive_timeout, self.handle_write_timeout) yield from self.plugins_manager.fire_event(EVENT_MQTT_PACKET_SENT, packet=packet, session=self.session) - self._loop.call_soon(self.on_packet_sent.send, packet) except ConnectionResetError as cre: yield from self.handle_connection_closed() raise diff --git a/hbmqtt/mqtt/publish.py b/hbmqtt/mqtt/publish.py index d1847ce..35ebac0 100644 --- a/hbmqtt/mqtt/publish.py +++ b/hbmqtt/mqtt/publish.py @@ -18,10 +18,10 @@ class PublishVariableHeader(MQTTVariableHeader): return type(self).__name__ + '(topic={0}, packet_id={1})'.format(self.topic_name, self.packet_id) def to_bytes(self): - out = b'' - out += encode_string(self.topic_name) + out = bytearray() + out.extend(encode_string(self.topic_name)) if self.packet_id is not None: - out += int_to_bytes(self.packet_id, 2) + out.extend(int_to_bytes(self.packet_id, 2)) return out @classmethod diff --git a/samples/broker_start.py b/samples/broker_start.py index e4773a9..ea76f84 100644 --- a/samples/broker_start.py +++ b/samples/broker_start.py @@ -40,6 +40,12 @@ def test_coro(): if __name__ == '__main__': formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" #formatter = "%(asctime)s :: %(levelname)s :: %(message)s" - logging.basicConfig(level=logging.DEBUG, format=formatter) + logging.basicConfig(level=logging.INFO, format=formatter) + + #import selectors + + #selector = selectors.EpollSelector() + #loop = asyncio.SelectorEventLoop(selector) + #asyncio.set_event_loop(loop) asyncio.get_event_loop().run_until_complete(test_coro()) asyncio.get_event_loop().run_forever() \ No newline at end of file diff --git a/tests/mqtt/test_suback.py b/tests/mqtt/test_suback.py index 4244b9b..9e6613d 100644 --- a/tests/mqtt/test_suback.py +++ b/tests/mqtt/test_suback.py @@ -30,6 +30,6 @@ class SubackPacketTest(unittest.TestCase): SubackPayload.RETURN_CODE_02, SubackPayload.RETURN_CODE_80 ]) - publish = SubackPacket(variable_header=variable_header, payload=payload) - out = publish.to_bytes() + suback = SubackPacket(variable_header=variable_header, payload=payload) + out = suback.to_bytes() self.assertEqual(out, b'\x90\x06\x00\x0a\x00\x01\x02\x80')