diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index e69de29..3ba007c 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -0,0 +1,82 @@ +# Copyright (c) 2015 Nicolas JOUANIN +# +# See the file license.txt for copying permission. +import logging +import asyncio + +from transitions import Machine, MachineError + + +_defaults = { + 'bind-address': 'localhost', + 'bind-port': 1883 +} + + +class BrokerException(BaseException): + pass + + +class Broker: + states = ['new', 'starting', 'started', 'not_started', 'stopping', 'stopped', 'not_stopped', 'stopped'] + + def __init__(self, config=None, loop=None): + self.logger = logging.getLogger(__name__) + self.config = _defaults + if config is not None: + self.config.update(config) + + if loop is not None: + self._loop = loop + else: + self._loop = asyncio.get_event_loop() + + self._server = None + + self._init_states() + + def _init_states(self): + self.machine = Machine(states=Broker.states, initial='new') + self.machine.add_transition(trigger='start', source='new', dest='starting') + self.machine.add_transition(trigger='starting_fail', source='starting', dest='not_started') + self.machine.add_transition(trigger='starting_success', source='starting', dest='started') + self.machine.add_transition(trigger='shutdown', source='started', dest='stopping') + self.machine.add_transition(trigger='stopping_success', source='stopping', dest='stopped') + self.machine.add_transition(trigger='stopping_failure', source='stopping', dest='not_stopped') + self.machine.add_transition(trigger='start', source='stopped', dest='starting') + + @asyncio.coroutine + def start(self): + try: + self.machine.start() + self.logger.debug("Broker starting") + except MachineError as me: + self.logger.debug("Invalid method call at this moment: %s" % me) + raise BrokerException("Broker instance can't be started: %s" % me) + + try: + self._server = yield from asyncio.start_server(self.client_connected, self.config['bind-address'], self.config['bind-port'], loop=self._loop) + self.logger.info("Broker listening on %s:%d" % (self.config['bind-address'], self.config['bind-port'])) + self.machine.starting_success() + except Exception as e: + self.logger.error("Broker startup failed: %s" % e) + self.machine.starting_fail() + raise BrokerException("Broker instance can't be started: %s" % e) + + @asyncio.coroutine + def shutdown(self): + try: + self.machine.shutdown() + except MachineError as me: + self.logger.debug("Invalid method call at this moment: %s" % me) + raise BrokerException("Broker instance can't be stopped: %s" % me) + self._server.close() + self.logger.debug("Broker closing") + yield from self._server.wait_closed() + self.logger.info("Broker closed") + self.machine.stopping_success() + + + @asyncio.coroutine + def client_connected(self, reader, writer): + pass diff --git a/samples/broker_start.py b/samples/broker_start.py index e69de29..d137c7a 100644 --- a/samples/broker_start.py +++ b/samples/broker_start.py @@ -0,0 +1,19 @@ +import logging +import asyncio +from hbmqtt.broker import Broker + +logger = logging.getLogger(__name__) + +broker = Broker() + +@asyncio.coroutine +def test_coro(): + yield from broker.start() + yield from asyncio.sleep(5) + yield from broker.shutdown() + + +if __name__ == '__main__': + formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" + logging.basicConfig(level=logging.DEBUG, format=formatter) + asyncio.get_event_loop().run_until_complete(test_coro())