pull/8/head
Nico 2015-09-14 22:08:14 +02:00
rodzic d4549c4349
commit 27bb4982d7
1 zmienionych plików z 45 dodań i 58 usunięć

Wyświetl plik

@ -5,7 +5,7 @@ import unittest
import asyncio
import logging
from hbmqtt.plugins.manager import PluginManager
from hbmqtt.session import Session
from hbmqtt.session import Session, OutgoingApplicationMessage, IncomingApplicationMessage
from hbmqtt.mqtt.protocol.handler import ProtocolHandler
from hbmqtt.adapters import StreamWriterAdapter, StreamReaderAdapter
from hbmqtt.mqtt.constants import *
@ -42,13 +42,16 @@ class ProtocolHandlerTest(unittest.TestCase):
@asyncio.coroutine
def test_coro():
s = Session()
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
s.reader, s.writer = adapt(reader, writer)
handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop)
yield from self.start_handler(handler, s, future)
yield from self.stop_handler(handler, s, future)
future.set_result(True)
try:
s = Session()
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
s.reader, s.writer = adapt(reader, writer)
handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop)
yield from self.start_handler(handler, s)
yield from self.stop_handler(handler, s)
future.set_result(True)
except AssertionError as ae:
future.set_exception(ae)
future = asyncio.Future(loop=self.loop)
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
@ -72,14 +75,23 @@ class ProtocolHandlerTest(unittest.TestCase):
@asyncio.coroutine
def test_coro():
s = Session()
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
s.reader, s.writer = adapt(reader, writer)
handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop)
yield from self.start_handler(handler, s, future)
message = yield from handler.mqtt_publish('/topic', b'test_data', QOS_0, False)
yield from self.stop_handler(handler, s, future)
future.set_result(True)
try:
s = Session()
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=self.loop)
s.reader, s.writer = adapt(reader, writer)
handler = ProtocolHandler(s, self.plugin_manager, loop=self.loop)
yield from self.start_handler(handler, s)
message = yield from handler.mqtt_publish('/topic', b'test_data', QOS_0, False)
self.assertIsInstance(message, OutgoingApplicationMessage)
self.assertIsNotNone(message.publish_packet)
self.assertIsNone(message.puback_packet)
self.assertIsNone(message.pubrec_packet)
self.assertIsNone(message.pubrel_packet)
self.assertIsNone(message.pubcomp_packet)
yield from self.stop_handler(handler, s)
future.set_result(True)
except AssertionError as ae:
future.set_exception(ae)
future = asyncio.Future(loop=self.loop)
coro = asyncio.start_server(server_mock, '127.0.0.1', 8888, loop=self.loop)
@ -90,52 +102,27 @@ class ProtocolHandlerTest(unittest.TestCase):
if future.exception():
raise future.exception()
@asyncio.coroutine
def start_handler(self, handler, session, future):
def start_handler(self, handler, session):
yield from handler.start()
try:
self.assertTrue(handler._reader_ready)
self.check_empty_waiters(handler)
self.check_no_message(session, future)
except AssertionError as ae:
if future and not future.cancelled():
future.set_exception(ae)
else:
raise ae
self.assertTrue(handler._reader_ready)
self.check_empty_waiters(handler)
self.check_no_message(session)
@asyncio.coroutine
def stop_handler(self, handler, session, future):
def stop_handler(self, handler, session):
yield from handler.stop()
try:
self.assertTrue(handler._reader_stopped)
self.check_empty_waiters(handler)
self.check_no_message(session, future)
except AssertionError as ae:
if future and not future.cancelled():
future.set_exception(ae)
else:
raise ae
self.assertTrue(handler._reader_stopped)
self.check_empty_waiters(handler)
self.check_no_message(session)
def check_empty_waiters(self, handler, future=None):
try:
self.assertFalse(handler._puback_waiters)
self.assertFalse(handler._pubrec_waiters)
self.assertFalse(handler._pubrel_waiters)
self.assertFalse(handler._pubcomp_waiters)
except AssertionError as ae:
if future and not future.cancelled():
future.set_exception(ae)
else:
raise ae
def check_empty_waiters(self, handler):
self.assertFalse(handler._puback_waiters)
self.assertFalse(handler._pubrec_waiters)
self.assertFalse(handler._pubrel_waiters)
self.assertFalse(handler._pubcomp_waiters)
def check_no_message(self, session, future):
try:
self.assertFalse(session.inflight_out)
self.assertFalse(session.inflight_in)
self.assertEquals(session.delivered_message_queue.qsize(), 0)
except AssertionError as ae:
if future and not future.cancelled():
future.set_exception(ae)
else:
raise ae
def check_no_message(self, session):
self.assertFalse(session.inflight_out)
self.assertFalse(session.inflight_in)
self.assertEquals(session.delivered_message_queue.qsize(), 0)