kopia lustrzana https://github.com/Yakifo/amqtt
Add start/stop test
rodzic
a5c34ed315
commit
52c3b83704
|
@ -6,6 +6,7 @@ import asyncio
|
|||
from hbmqtt.plugins.manager import PluginManager
|
||||
from hbmqtt.session import Session
|
||||
from hbmqtt.mqtt.protocol.handler import ProtocolHandler
|
||||
from hbmqtt.adapters import BufferReader, BufferWriter
|
||||
|
||||
|
||||
class ProtocolHandlerTest(unittest.TestCase):
|
||||
|
@ -22,3 +23,20 @@ class ProtocolHandlerTest(unittest.TestCase):
|
|||
self.assertFalse(handler._pubrec_waiters)
|
||||
self.assertFalse(handler._pubrel_waiters)
|
||||
self.assertFalse(handler._pubcomp_waiters)
|
||||
|
||||
def test_start_stop(self):
|
||||
@asyncio.coroutine
|
||||
def server_coro(reader, writer):
|
||||
pass
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_coro():
|
||||
s = Session()
|
||||
buffer = b''
|
||||
s.reader = BufferReader(buffer)
|
||||
s.writer = BufferWriter(buffer)
|
||||
handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop)
|
||||
yield from handler.start()
|
||||
yield from handler.stop()
|
||||
return handler
|
||||
handler = self.loop.run_until_complete(test_coro())
|
Ładowanie…
Reference in New Issue