From 2c60b3c1a0fdb78becfd38a1a64024a66ed06e51 Mon Sep 17 00:00:00 2001 From: Nico Date: Sun, 13 Sep 2015 22:36:22 +0200 Subject: [PATCH] Commit with test failure --- hbmqtt/mqtt/protocol/handler.py | 7 ++- tests/mqtt/protocol/test_handler.py | 90 ++++++++++++++++++++++++----- 2 files changed, 82 insertions(+), 15 deletions(-) diff --git a/hbmqtt/mqtt/protocol/handler.py b/hbmqtt/mqtt/protocol/handler.py index fe28592..c155b9b 100644 --- a/hbmqtt/mqtt/protocol/handler.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -57,7 +57,7 @@ class ProtocolHandler: self._loop = loop self._reader_task = None self._keepalive_task = None - self._reader_ready = asyncio.Event(loop=self._loop) + self._reader_ready = None self._reader_stopped = asyncio.Event(loop=self._loop) self._puback_waiters = dict() @@ -67,19 +67,22 @@ class ProtocolHandler: @asyncio.coroutine def start(self): + self._reader_ready = asyncio.Event(loop=self._loop) self._reader_task = asyncio.Task(self._reader_loop(), loop=self._loop) yield from asyncio.wait([self._reader_ready.wait()], loop=self._loop) if self.keepalive_timeout: self._keepalive_task = self._loop.call_later(self.keepalive_timeout, self.handle_write_timeout) - self.logger.debug("%s Handler tasks started" % self.session.client_id) + self.logger.debug("Handler tasks started") yield from self.retry_deliveries() + self.logger.debug("Handler ready") @asyncio.coroutine def stop(self): # Stop incoming messages flow waiter for packet_id in self.session.inflight_in: self.session.inflight_in[packet_id].cancel() +# self._reader_stopped = asyncio.Event(loop=self._loop) self._reader_task.cancel() if self._keepalive_task: self._keepalive_task.cancel() diff --git a/tests/mqtt/protocol/test_handler.py b/tests/mqtt/protocol/test_handler.py index e81cbe8..11ffd80 100644 --- a/tests/mqtt/protocol/test_handler.py +++ b/tests/mqtt/protocol/test_handler.py @@ -3,10 +3,21 @@ # See the file license.txt for copying permission. import unittest import asyncio +import logging from hbmqtt.plugins.manager import PluginManager from hbmqtt.session import Session from hbmqtt.mqtt.protocol.handler import ProtocolHandler -from hbmqtt.adapters import BufferReader, BufferWriter +from hbmqtt.adapters import StreamWriterAdapter, StreamReaderAdapter +from hbmqtt.mqtt.constants import * +from hbmqtt.mqtt.publish import PublishPacket + +formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" +logging.basicConfig(level=logging.DEBUG, format=formatter) +log = logging.getLogger(__name__) + + +def adapt(reader, writer): + return StreamReaderAdapter(reader), StreamWriterAdapter(writer) class ProtocolHandlerTest(unittest.TestCase): @@ -14,29 +25,82 @@ class ProtocolHandlerTest(unittest.TestCase): self.loop = asyncio.new_event_loop() self.plugin_manager = PluginManager("hbmqtt.test.plugins", context=None, loop=self.loop) + def tearDown(self): + self.loop.close() + def test_init_handler(self): s = Session() handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop) self.assertIs(handler.session, s) self.assertIs(handler._loop, self.loop) - self.assertFalse(handler._puback_waiters) - self.assertFalse(handler._pubrec_waiters) - self.assertFalse(handler._pubrel_waiters) - self.assertFalse(handler._pubcomp_waiters) + self.check_empty_waiters(handler) def test_start_stop(self): @asyncio.coroutine - def server_coro(reader, writer): + def server_mock(reader, writer): pass @asyncio.coroutine def test_coro(): s = Session() - buffer = b'' - s.reader = BufferReader(buffer) - s.writer = BufferWriter(buffer) + reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop) + s.reader, s.writer = adapt(reader, writer) handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop) - yield from handler.start() - yield from handler.stop() - return handler - handler = self.loop.run_until_complete(test_coro()) \ No newline at end of file + yield from self.start_handler(handler, s) + yield from self.stop_handler(handler, s) + + coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop) + server = self.loop.run_until_complete(coro) + self.loop.run_until_complete(test_coro()) + server.close() + + def test_publish_qos0(self): + @asyncio.coroutine + def server_mock(reader, writer): + packet = yield from PublishPacket.from_stream(reader) + self.assertEquals(packet.topic_name, '/topic') + self.assertEquals(packet.qos, QOS_0) + self.assertIsNotNone(packet.packet_id) + + @asyncio.coroutine + def test_coro(): + s = Session() + reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop) + s.reader, s.writer = adapt(reader, writer) + handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop) + yield from self.start_handler(handler, s) + yield from handler.mqtt_publish('/topic', b'test_data', QOS_0, False) + yield from self.stop_handler(handler, s) + + coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop) + server = self.loop.run_until_complete(coro) + self.loop.run_until_complete(test_coro()) + log.debug("TEST") + server.close() + self.loop.run_until_complete(server.wait_closed()) + + + @asyncio.coroutine + def start_handler(self, handler, session): + yield from handler.start() + self.assertTrue(handler._reader_ready) + self.check_empty_waiters(handler) + self.check_no_message(session) + + @asyncio.coroutine + def stop_handler(self, handler, session): + yield from handler.stop() + self.assertTrue(handler._reader_stopped) + self.check_empty_waiters(handler) + self.check_no_message(session) + + def check_empty_waiters(self, handler): + self.assertFalse(handler._puback_waiters) + self.assertFalse(handler._pubrec_waiters) + self.assertFalse(handler._pubrel_waiters) + self.assertFalse(handler._pubcomp_waiters) + + def check_no_message(self, session): + self.assertFalse(session.inflight_out) + self.assertFalse(session.inflight_in) + self.assertEquals(session.delivered_message_queue.qsize(), 0) \ No newline at end of file