kopia lustrzana https://github.com/Yakifo/amqtt
HBMQTT-13 #Init broker class
rodzic
9bedd5f892
commit
f9278211ef
|
@ -0,0 +1,82 @@
|
|||
# Copyright (c) 2015 Nicolas JOUANIN
|
||||
#
|
||||
# See the file license.txt for copying permission.
|
||||
import logging
|
||||
import asyncio
|
||||
|
||||
from transitions import Machine, MachineError
|
||||
|
||||
|
||||
_defaults = {
|
||||
'bind-address': 'localhost',
|
||||
'bind-port': 1883
|
||||
}
|
||||
|
||||
|
||||
class BrokerException(BaseException):
|
||||
pass
|
||||
|
||||
|
||||
class Broker:
|
||||
states = ['new', 'starting', 'started', 'not_started', 'stopping', 'stopped', 'not_stopped', 'stopped']
|
||||
|
||||
def __init__(self, config=None, loop=None):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.config = _defaults
|
||||
if config is not None:
|
||||
self.config.update(config)
|
||||
|
||||
if loop is not None:
|
||||
self._loop = loop
|
||||
else:
|
||||
self._loop = asyncio.get_event_loop()
|
||||
|
||||
self._server = None
|
||||
|
||||
self._init_states()
|
||||
|
||||
def _init_states(self):
|
||||
self.machine = Machine(states=Broker.states, initial='new')
|
||||
self.machine.add_transition(trigger='start', source='new', dest='starting')
|
||||
self.machine.add_transition(trigger='starting_fail', source='starting', dest='not_started')
|
||||
self.machine.add_transition(trigger='starting_success', source='starting', dest='started')
|
||||
self.machine.add_transition(trigger='shutdown', source='started', dest='stopping')
|
||||
self.machine.add_transition(trigger='stopping_success', source='stopping', dest='stopped')
|
||||
self.machine.add_transition(trigger='stopping_failure', source='stopping', dest='not_stopped')
|
||||
self.machine.add_transition(trigger='start', source='stopped', dest='starting')
|
||||
|
||||
@asyncio.coroutine
|
||||
def start(self):
|
||||
try:
|
||||
self.machine.start()
|
||||
self.logger.debug("Broker starting")
|
||||
except MachineError as me:
|
||||
self.logger.debug("Invalid method call at this moment: %s" % me)
|
||||
raise BrokerException("Broker instance can't be started: %s" % me)
|
||||
|
||||
try:
|
||||
self._server = yield from asyncio.start_server(self.client_connected, self.config['bind-address'], self.config['bind-port'], loop=self._loop)
|
||||
self.logger.info("Broker listening on %s:%d" % (self.config['bind-address'], self.config['bind-port']))
|
||||
self.machine.starting_success()
|
||||
except Exception as e:
|
||||
self.logger.error("Broker startup failed: %s" % e)
|
||||
self.machine.starting_fail()
|
||||
raise BrokerException("Broker instance can't be started: %s" % e)
|
||||
|
||||
@asyncio.coroutine
|
||||
def shutdown(self):
|
||||
try:
|
||||
self.machine.shutdown()
|
||||
except MachineError as me:
|
||||
self.logger.debug("Invalid method call at this moment: %s" % me)
|
||||
raise BrokerException("Broker instance can't be stopped: %s" % me)
|
||||
self._server.close()
|
||||
self.logger.debug("Broker closing")
|
||||
yield from self._server.wait_closed()
|
||||
self.logger.info("Broker closed")
|
||||
self.machine.stopping_success()
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def client_connected(self, reader, writer):
|
||||
pass
|
|
@ -0,0 +1,19 @@
|
|||
import logging
|
||||
import asyncio
|
||||
from hbmqtt.broker import Broker
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
broker = Broker()
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
yield from broker.start()
|
||||
yield from asyncio.sleep(5)
|
||||
yield from broker.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
||||
logging.basicConfig(level=logging.DEBUG, format=formatter)
|
||||
asyncio.get_event_loop().run_until_complete(test_coro())
|
Ładowanie…
Reference in New Issue