2015-07-07 20:48:53 +00:00
|
|
|
# Copyright (c) 2015 Nicolas JOUANIN
|
|
|
|
#
|
|
|
|
# See the file license.txt for copying permission.
|
|
|
|
import logging
|
|
|
|
import asyncio
|
|
|
|
|
|
|
|
from transitions import Machine, MachineError
|
2015-07-08 19:54:10 +00:00
|
|
|
from hbmqtt.session import Session
|
|
|
|
from hbmqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler
|
2015-07-07 20:48:53 +00:00
|
|
|
|
|
|
|
|
|
|
|
_defaults = {
|
|
|
|
'bind-address': 'localhost',
|
|
|
|
'bind-port': 1883
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
class BrokerException(BaseException):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class Broker:
|
|
|
|
states = ['new', 'starting', 'started', 'not_started', 'stopping', 'stopped', 'not_stopped', 'stopped']
|
|
|
|
|
|
|
|
def __init__(self, config=None, loop=None):
|
|
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
self.config = _defaults
|
|
|
|
if config is not None:
|
|
|
|
self.config.update(config)
|
|
|
|
|
|
|
|
if loop is not None:
|
|
|
|
self._loop = loop
|
|
|
|
else:
|
|
|
|
self._loop = asyncio.get_event_loop()
|
|
|
|
|
|
|
|
self._server = None
|
2015-07-08 19:54:10 +00:00
|
|
|
self._handlers = []
|
2015-07-07 20:48:53 +00:00
|
|
|
self._init_states()
|
|
|
|
|
|
|
|
def _init_states(self):
|
|
|
|
self.machine = Machine(states=Broker.states, initial='new')
|
|
|
|
self.machine.add_transition(trigger='start', source='new', dest='starting')
|
|
|
|
self.machine.add_transition(trigger='starting_fail', source='starting', dest='not_started')
|
|
|
|
self.machine.add_transition(trigger='starting_success', source='starting', dest='started')
|
|
|
|
self.machine.add_transition(trigger='shutdown', source='started', dest='stopping')
|
|
|
|
self.machine.add_transition(trigger='stopping_success', source='stopping', dest='stopped')
|
|
|
|
self.machine.add_transition(trigger='stopping_failure', source='stopping', dest='not_stopped')
|
|
|
|
self.machine.add_transition(trigger='start', source='stopped', dest='starting')
|
|
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
|
def start(self):
|
|
|
|
try:
|
|
|
|
self.machine.start()
|
|
|
|
self.logger.debug("Broker starting")
|
|
|
|
except MachineError as me:
|
|
|
|
self.logger.debug("Invalid method call at this moment: %s" % me)
|
|
|
|
raise BrokerException("Broker instance can't be started: %s" % me)
|
|
|
|
|
|
|
|
try:
|
|
|
|
self._server = yield from asyncio.start_server(self.client_connected, self.config['bind-address'], self.config['bind-port'], loop=self._loop)
|
|
|
|
self.logger.info("Broker listening on %s:%d" % (self.config['bind-address'], self.config['bind-port']))
|
|
|
|
self.machine.starting_success()
|
|
|
|
except Exception as e:
|
|
|
|
self.logger.error("Broker startup failed: %s" % e)
|
|
|
|
self.machine.starting_fail()
|
|
|
|
raise BrokerException("Broker instance can't be started: %s" % e)
|
|
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
|
def shutdown(self):
|
|
|
|
try:
|
|
|
|
self.machine.shutdown()
|
|
|
|
except MachineError as me:
|
|
|
|
self.logger.debug("Invalid method call at this moment: %s" % me)
|
|
|
|
raise BrokerException("Broker instance can't be stopped: %s" % me)
|
|
|
|
self._server.close()
|
|
|
|
self.logger.debug("Broker closing")
|
|
|
|
yield from self._server.wait_closed()
|
|
|
|
self.logger.info("Broker closed")
|
|
|
|
self.machine.stopping_success()
|
|
|
|
|
|
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
|
def client_connected(self, reader, writer):
|
2015-07-08 19:54:10 +00:00
|
|
|
self.logger.info(repr(writer.get_extra_info('peername')))
|
|
|
|
extra_info = writer.get_extra_info('peername')
|
|
|
|
remote_address = extra_info[0]
|
|
|
|
remote_port = extra_info[1]
|
2015-07-07 20:58:49 +00:00
|
|
|
self.logger.debug("Connection from %s:%d" % (remote_address, remote_port))
|
2015-07-08 19:54:10 +00:00
|
|
|
new_session = Session()
|
|
|
|
new_session.remote_address = remote_address
|
|
|
|
new_session.remote_port = remote_port
|
|
|
|
new_session.reader = reader
|
|
|
|
new_session.writer = writer
|
|
|
|
handler = BrokerProtocolHandler(new_session, self._loop)
|
|
|
|
self._handlers.append(handler)
|
|
|
|
self.logger.debug("Start messages handling")
|
2015-07-08 20:47:33 +00:00
|
|
|
yield from handler.start()
|
2015-07-08 19:54:10 +00:00
|
|
|
self.logger.debug("Wait for disconnect")
|
2015-07-08 20:47:33 +00:00
|
|
|
yield from handler.wait_disconnect()
|
2015-07-08 19:54:10 +00:00
|
|
|
yield from handler.stop()
|