diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index d43a90b..bd1203a 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -6,6 +6,10 @@ import threading import logging from hbmqtt.errors import BrokerException from transitions import Machine, MachineError +from hbmqtt.codecs.header import MQTTHeaderCodec +from hbmqtt.codecs.errors import CodecException +from hbmqtt.codecs.connect import ConnectMessage +from hbmqtt.message import MessageType class BrokerProtocol(asyncio.Protocol): @@ -74,9 +78,35 @@ class Broker: def _run_server_loop(self, loop): asyncio.set_event_loop(loop) - coro = loop.create_server(BrokerProtocol, self.host, self.port) + coro = asyncio.start_server(client_connected, self.host, self.port) self._server = loop.run_until_complete(coro) self.logger.debug("Broker listening %s:%s" % (self.host, self.port)) self.machine.starting_success() self.logger.debug("Broker started, ready to serve") - loop.run_forever() \ No newline at end of file + loop.run_forever() + + +def init_message_codecs(): + """ + Init dict of MQTT message encoders/decoders + :return: + """ + codecs = { + MessageType.CONNECT, ConnectMessage + } + return codecs + +@asyncio.coroutine +def client_connected(reader, writer): + (remote_address, remote_port) = writer.get_extra_info('peername') + codecs = init_message_codecs() + while True: + try: + # Read fixed header + fixed_header = yield from MQTTHeaderCodec.decode(reader) + # Find message decoder and decode + codec = codecs[fixed_header.message_type] + message = yield from codec.decode(fixed_header, reader) + except CodecException: + #End connection + break diff --git a/tests/test_broker.py b/tests/test_broker.py index 53508cd..545abe6 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -1,3 +1,6 @@ +# Copyright (c) 2015 Nicolas JOUANIN +# +# See the file license.txt for copying permission. import unittest import logging import time @@ -9,6 +12,6 @@ class TestBroker(unittest.TestCase): def test_start_broker(self): b = Broker() b.start() - time.sleep(1) + time.sleep(100) self.assertEqual(b.machine.state, 'started') b.shutdown() \ No newline at end of file