kopia lustrzana https://github.com/Yakifo/amqtt
rodzic
13c4a8a09f
commit
0e11d545d8
|
@ -333,22 +333,26 @@ class ProtocolHandler:
|
||||||
# Wait PUBREL
|
# Wait PUBREL
|
||||||
if app_message.packet_id in self._pubrel_waiters and not self._pubrel_waiters[app_message.packet_id].done():
|
if app_message.packet_id in self._pubrel_waiters and not self._pubrel_waiters[app_message.packet_id].done():
|
||||||
# PUBREL waiter already exists for this packet ID
|
# PUBREL waiter already exists for this packet ID
|
||||||
message = "Can't add PUBREL waiter, a waiter already exists for message Id '%s'" \
|
message = "A waiter already exists for message Id '%s', canceling it" \
|
||||||
% app_message.packet_id
|
% app_message.packet_id
|
||||||
self.logger.warning(message)
|
self.logger.warning(message)
|
||||||
raise HBMQTTException(message)
|
self._pubrel_waiters[app_message.packet_id].cancel()
|
||||||
waiter = asyncio.Future(loop=self._loop)
|
try:
|
||||||
self._pubrel_waiters[app_message.packet_id] = waiter
|
waiter = asyncio.Future(loop=self._loop)
|
||||||
yield from waiter
|
self._pubrel_waiters[app_message.packet_id] = waiter
|
||||||
del self._pubrel_waiters[app_message.packet_id]
|
yield from waiter
|
||||||
app_message.pubrel_packet = waiter.result()
|
del self._pubrel_waiters[app_message.packet_id]
|
||||||
# Initiate delivery and discard message
|
app_message.pubrel_packet = waiter.result()
|
||||||
yield from self.session.delivered_message_queue.put(app_message)
|
# Initiate delivery and discard message
|
||||||
del self.session.inflight_in[app_message.packet_id]
|
yield from self.session.delivered_message_queue.put(app_message)
|
||||||
# Send pubcomp
|
del self.session.inflight_in[app_message.packet_id]
|
||||||
pubcomp_packet = PubcompPacket.build(app_message.packet_id)
|
# Send pubcomp
|
||||||
yield from self._send_packet(pubcomp_packet)
|
pubcomp_packet = PubcompPacket.build(app_message.packet_id)
|
||||||
app_message.pubcomp_packet = pubcomp_packet
|
yield from self._send_packet(pubcomp_packet)
|
||||||
|
app_message.pubcomp_packet = pubcomp_packet
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
self.logger.debug("Message flow cancelled")
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _reader_loop(self):
|
def _reader_loop(self):
|
||||||
|
|
|
@ -6,6 +6,9 @@ from unittest.mock import patch, call, MagicMock
|
||||||
from hbmqtt.broker import *
|
from hbmqtt.broker import *
|
||||||
from hbmqtt.mqtt.constants import *
|
from hbmqtt.mqtt.constants import *
|
||||||
from hbmqtt.client import MQTTClient, ConnectException
|
from hbmqtt.client import MQTTClient, ConnectException
|
||||||
|
from hbmqtt.mqtt import ConnectPacket, ConnackPacket, PublishPacket, PubrecPacket, \
|
||||||
|
PubrelPacket, PubcompPacket, DisconnectPacket
|
||||||
|
from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPayload
|
||||||
|
|
||||||
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
||||||
logging.basicConfig(level=logging.DEBUG, format=formatter)
|
logging.basicConfig(level=logging.DEBUG, format=formatter)
|
||||||
|
@ -287,6 +290,58 @@ class BrokerTest(unittest.TestCase):
|
||||||
if future.exception():
|
if future.exception():
|
||||||
raise future.exception()
|
raise future.exception()
|
||||||
|
|
||||||
|
#@patch('hbmqtt.broker.PluginManager')
|
||||||
|
def test_client_publish_dup(self):
|
||||||
|
@asyncio.coroutine
|
||||||
|
def test_coro():
|
||||||
|
try:
|
||||||
|
broker = Broker(test_config, plugin_namespace="hbmqtt.test.plugins")
|
||||||
|
yield from broker.start()
|
||||||
|
self.assertTrue(broker.transitions.is_started())
|
||||||
|
|
||||||
|
conn_reader, conn_writer = \
|
||||||
|
yield from asyncio.open_connection('localhost', 1883, loop=self.loop)
|
||||||
|
reader = StreamReaderAdapter(conn_reader)
|
||||||
|
writer = StreamWriterAdapter(conn_writer)
|
||||||
|
|
||||||
|
vh = ConnectVariableHeader()
|
||||||
|
payload = ConnectPayload()
|
||||||
|
|
||||||
|
vh.keep_alive = 10
|
||||||
|
vh.clean_session_flag = False
|
||||||
|
vh.will_retain_flag = False
|
||||||
|
payload.client_id = 'test_id'
|
||||||
|
connect = ConnectPacket(vh=vh, payload=payload)
|
||||||
|
yield from connect.to_stream(writer)
|
||||||
|
yield from ConnackPacket.from_stream(reader)
|
||||||
|
|
||||||
|
publish_1 = PublishPacket.build('/test', b'data', 1, False, QOS_2, False)
|
||||||
|
yield from publish_1.to_stream(writer)
|
||||||
|
ensure_future(PubrecPacket.from_stream(reader), loop=self.loop)
|
||||||
|
|
||||||
|
yield from asyncio.sleep(2)
|
||||||
|
|
||||||
|
publish_dup = PublishPacket.build('/test', b'data', 1, True, QOS_2, False)
|
||||||
|
yield from publish_dup.to_stream(writer)
|
||||||
|
pubrec2 = yield from PubrecPacket.from_stream(reader)
|
||||||
|
pubrel = PubrelPacket.build(1)
|
||||||
|
yield from pubrel.to_stream(writer)
|
||||||
|
pubcomp = yield from PubcompPacket.from_stream(reader)
|
||||||
|
|
||||||
|
disconnect = DisconnectPacket()
|
||||||
|
yield from disconnect.to_stream(writer)
|
||||||
|
|
||||||
|
yield from asyncio.sleep(0.1)
|
||||||
|
yield from broker.shutdown()
|
||||||
|
future.set_result(True)
|
||||||
|
except Exception as ae:
|
||||||
|
future.set_exception(ae)
|
||||||
|
|
||||||
|
future = asyncio.Future(loop=self.loop)
|
||||||
|
self.loop.run_until_complete(test_coro())
|
||||||
|
if future.exception():
|
||||||
|
raise future.exception()
|
||||||
|
|
||||||
@patch('hbmqtt.broker.PluginManager')
|
@patch('hbmqtt.broker.PluginManager')
|
||||||
def test_client_publish_invalid_topic(self, MockPluginManager):
|
def test_client_publish_invalid_topic(self, MockPluginManager):
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
|
|
Ładowanie…
Reference in New Issue