kopia lustrzana https://github.com/Yakifo/amqtt
Commit with test failure
rodzic
cf9e78e4bb
commit
2c60b3c1a0
|
@ -57,7 +57,7 @@ class ProtocolHandler:
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
self._reader_task = None
|
self._reader_task = None
|
||||||
self._keepalive_task = None
|
self._keepalive_task = None
|
||||||
self._reader_ready = asyncio.Event(loop=self._loop)
|
self._reader_ready = None
|
||||||
self._reader_stopped = asyncio.Event(loop=self._loop)
|
self._reader_stopped = asyncio.Event(loop=self._loop)
|
||||||
|
|
||||||
self._puback_waiters = dict()
|
self._puback_waiters = dict()
|
||||||
|
@ -67,19 +67,22 @@ class ProtocolHandler:
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def start(self):
|
def start(self):
|
||||||
|
self._reader_ready = asyncio.Event(loop=self._loop)
|
||||||
self._reader_task = asyncio.Task(self._reader_loop(), loop=self._loop)
|
self._reader_task = asyncio.Task(self._reader_loop(), loop=self._loop)
|
||||||
yield from asyncio.wait([self._reader_ready.wait()], loop=self._loop)
|
yield from asyncio.wait([self._reader_ready.wait()], loop=self._loop)
|
||||||
if self.keepalive_timeout:
|
if self.keepalive_timeout:
|
||||||
self._keepalive_task = self._loop.call_later(self.keepalive_timeout, self.handle_write_timeout)
|
self._keepalive_task = self._loop.call_later(self.keepalive_timeout, self.handle_write_timeout)
|
||||||
|
|
||||||
self.logger.debug("%s Handler tasks started" % self.session.client_id)
|
self.logger.debug("Handler tasks started")
|
||||||
yield from self.retry_deliveries()
|
yield from self.retry_deliveries()
|
||||||
|
self.logger.debug("Handler ready")
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def stop(self):
|
def stop(self):
|
||||||
# Stop incoming messages flow waiter
|
# Stop incoming messages flow waiter
|
||||||
for packet_id in self.session.inflight_in:
|
for packet_id in self.session.inflight_in:
|
||||||
self.session.inflight_in[packet_id].cancel()
|
self.session.inflight_in[packet_id].cancel()
|
||||||
|
# self._reader_stopped = asyncio.Event(loop=self._loop)
|
||||||
self._reader_task.cancel()
|
self._reader_task.cancel()
|
||||||
if self._keepalive_task:
|
if self._keepalive_task:
|
||||||
self._keepalive_task.cancel()
|
self._keepalive_task.cancel()
|
||||||
|
|
|
@ -3,10 +3,21 @@
|
||||||
# See the file license.txt for copying permission.
|
# See the file license.txt for copying permission.
|
||||||
import unittest
|
import unittest
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import logging
|
||||||
from hbmqtt.plugins.manager import PluginManager
|
from hbmqtt.plugins.manager import PluginManager
|
||||||
from hbmqtt.session import Session
|
from hbmqtt.session import Session
|
||||||
from hbmqtt.mqtt.protocol.handler import ProtocolHandler
|
from hbmqtt.mqtt.protocol.handler import ProtocolHandler
|
||||||
from hbmqtt.adapters import BufferReader, BufferWriter
|
from hbmqtt.adapters import StreamWriterAdapter, StreamReaderAdapter
|
||||||
|
from hbmqtt.mqtt.constants import *
|
||||||
|
from hbmqtt.mqtt.publish import PublishPacket
|
||||||
|
|
||||||
|
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
||||||
|
logging.basicConfig(level=logging.DEBUG, format=formatter)
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def adapt(reader, writer):
|
||||||
|
return StreamReaderAdapter(reader), StreamWriterAdapter(writer)
|
||||||
|
|
||||||
|
|
||||||
class ProtocolHandlerTest(unittest.TestCase):
|
class ProtocolHandlerTest(unittest.TestCase):
|
||||||
|
@ -14,29 +25,82 @@ class ProtocolHandlerTest(unittest.TestCase):
|
||||||
self.loop = asyncio.new_event_loop()
|
self.loop = asyncio.new_event_loop()
|
||||||
self.plugin_manager = PluginManager("hbmqtt.test.plugins", context=None, loop=self.loop)
|
self.plugin_manager = PluginManager("hbmqtt.test.plugins", context=None, loop=self.loop)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.loop.close()
|
||||||
|
|
||||||
def test_init_handler(self):
|
def test_init_handler(self):
|
||||||
s = Session()
|
s = Session()
|
||||||
handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop)
|
handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop)
|
||||||
self.assertIs(handler.session, s)
|
self.assertIs(handler.session, s)
|
||||||
self.assertIs(handler._loop, self.loop)
|
self.assertIs(handler._loop, self.loop)
|
||||||
self.assertFalse(handler._puback_waiters)
|
self.check_empty_waiters(handler)
|
||||||
self.assertFalse(handler._pubrec_waiters)
|
|
||||||
self.assertFalse(handler._pubrel_waiters)
|
|
||||||
self.assertFalse(handler._pubcomp_waiters)
|
|
||||||
|
|
||||||
def test_start_stop(self):
|
def test_start_stop(self):
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def server_coro(reader, writer):
|
def server_mock(reader, writer):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def test_coro():
|
def test_coro():
|
||||||
s = Session()
|
s = Session()
|
||||||
buffer = b''
|
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
|
||||||
s.reader = BufferReader(buffer)
|
s.reader, s.writer = adapt(reader, writer)
|
||||||
s.writer = BufferWriter(buffer)
|
|
||||||
handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop)
|
handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop)
|
||||||
yield from handler.start()
|
yield from self.start_handler(handler, s)
|
||||||
yield from handler.stop()
|
yield from self.stop_handler(handler, s)
|
||||||
return handler
|
|
||||||
handler = self.loop.run_until_complete(test_coro())
|
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
|
||||||
|
server = self.loop.run_until_complete(coro)
|
||||||
|
self.loop.run_until_complete(test_coro())
|
||||||
|
server.close()
|
||||||
|
|
||||||
|
def test_publish_qos0(self):
|
||||||
|
@asyncio.coroutine
|
||||||
|
def server_mock(reader, writer):
|
||||||
|
packet = yield from PublishPacket.from_stream(reader)
|
||||||
|
self.assertEquals(packet.topic_name, '/topic')
|
||||||
|
self.assertEquals(packet.qos, QOS_0)
|
||||||
|
self.assertIsNotNone(packet.packet_id)
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def test_coro():
|
||||||
|
s = Session()
|
||||||
|
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
|
||||||
|
s.reader, s.writer = adapt(reader, writer)
|
||||||
|
handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop)
|
||||||
|
yield from self.start_handler(handler, s)
|
||||||
|
yield from handler.mqtt_publish('/topic', b'test_data', QOS_0, False)
|
||||||
|
yield from self.stop_handler(handler, s)
|
||||||
|
|
||||||
|
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
|
||||||
|
server = self.loop.run_until_complete(coro)
|
||||||
|
self.loop.run_until_complete(test_coro())
|
||||||
|
log.debug("TEST")
|
||||||
|
server.close()
|
||||||
|
self.loop.run_until_complete(server.wait_closed())
|
||||||
|
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def start_handler(self, handler, session):
|
||||||
|
yield from handler.start()
|
||||||
|
self.assertTrue(handler._reader_ready)
|
||||||
|
self.check_empty_waiters(handler)
|
||||||
|
self.check_no_message(session)
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def stop_handler(self, handler, session):
|
||||||
|
yield from handler.stop()
|
||||||
|
self.assertTrue(handler._reader_stopped)
|
||||||
|
self.check_empty_waiters(handler)
|
||||||
|
self.check_no_message(session)
|
||||||
|
|
||||||
|
def check_empty_waiters(self, handler):
|
||||||
|
self.assertFalse(handler._puback_waiters)
|
||||||
|
self.assertFalse(handler._pubrec_waiters)
|
||||||
|
self.assertFalse(handler._pubrel_waiters)
|
||||||
|
self.assertFalse(handler._pubcomp_waiters)
|
||||||
|
|
||||||
|
def check_no_message(self, session):
|
||||||
|
self.assertFalse(session.inflight_out)
|
||||||
|
self.assertFalse(session.inflight_in)
|
||||||
|
self.assertEquals(session.delivered_message_queue.qsize(), 0)
|
Ładowanie…
Reference in New Issue