kopia lustrzana https://github.com/Yakifo/amqtt
rodzic
7ebe1cdd6b
commit
a8e240a2b2
|
@ -211,23 +211,31 @@ class Broker:
|
|||
connected = True
|
||||
wait_disconnect = asyncio.Task(handler.wait_disconnect())
|
||||
wait_subscription = asyncio.Task(handler.get_next_pending_subscription())
|
||||
wait_unsubscription = asyncio.Task(handler.get_next_pending_unsubscription())
|
||||
wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message())
|
||||
while connected:
|
||||
done, pending = yield from asyncio.wait([wait_disconnect, wait_subscription, wait_deliver],
|
||||
done, pending = yield from asyncio.wait(
|
||||
[wait_disconnect, wait_subscription, wait_unsubscription, wait_deliver],
|
||||
return_when=asyncio.FIRST_COMPLETED)
|
||||
if wait_disconnect in done:
|
||||
connected = False
|
||||
wait_subscription.cancel()
|
||||
wait_unsubscription.cancel()
|
||||
wait_deliver.cancel()
|
||||
elif wait_unsubscription in done:
|
||||
unsubscription = wait_unsubscription.result()
|
||||
for topic in unsubscription.topics:
|
||||
self.del_subscription(topic, client_session)
|
||||
yield from handler.mqtt_acknowledge_unsubscription(unsubscription.packet_id)
|
||||
wait_unsubscription = asyncio.Task(handler.get_next_pending_unsubscription())
|
||||
elif wait_subscription in done:
|
||||
subscription = wait_subscription.result()
|
||||
return_codes = []
|
||||
for topic in subscription.topics:
|
||||
return_codes.append(self.add_subscription(topic, client_session))
|
||||
yield from handler.mqtt_acknowledge_subscription(subscription.packet_id, return_codes)
|
||||
i=0
|
||||
for topic in subscription.topics:
|
||||
if return_codes[i] != 0x80:
|
||||
for index, topic in enumerate(subscription.topics):
|
||||
if return_codes[index] != 0x80:
|
||||
yield from self.publish_retained_messages_for_subscription(topic, client_session)
|
||||
wait_subscription = asyncio.Task(handler.get_next_pending_subscription())
|
||||
elif wait_deliver in done:
|
||||
|
@ -297,6 +305,18 @@ class Broker:
|
|||
except KeyError:
|
||||
return 0x80
|
||||
|
||||
def del_subscription(self, a_filter, session):
|
||||
try:
|
||||
sessions = self._topics[a_filter]
|
||||
for index, s in enumerate(sessions):
|
||||
if s['session'].client_id == session.client_id:
|
||||
self.logger.debug("Removing subscription on topic '%s' for client %s" %
|
||||
(a_filter, format_client_message(session=session)))
|
||||
sessions.pop(index)
|
||||
except KeyError:
|
||||
# Unsubscribe topic not found in current subscribed topics
|
||||
pass
|
||||
|
||||
def matches(self, topic, filter):
|
||||
import re
|
||||
match_pattern = re.compile(filter.replace('#', '.*').replace('+', '[\s\w\d]+'))
|
||||
|
|
|
@ -12,6 +12,7 @@ from hbmqtt.mqtt.pingresp import PingRespPacket
|
|||
from hbmqtt.mqtt.subscribe import SubscribePacket
|
||||
from hbmqtt.mqtt.suback import SubackPacket
|
||||
from hbmqtt.mqtt.unsubscribe import UnsubscribePacket
|
||||
from hbmqtt.mqtt.unsuback import UnsubackPacket
|
||||
from hbmqtt.utils import format_client_message
|
||||
|
||||
|
||||
|
@ -82,7 +83,7 @@ class BrokerProtocolHandler(ProtocolHandler):
|
|||
@asyncio.coroutine
|
||||
def handle_unsubscribe(self, unsubscribe: UnsubscribePacket):
|
||||
unsubscription = UnSubscription(unsubscribe.variable_header.packet_id, unsubscribe.payload.topics)
|
||||
yield from self._pending_subscriptions.put(unsubscription)
|
||||
yield from self._pending_unsubscriptions.put(unsubscription)
|
||||
|
||||
@asyncio.coroutine
|
||||
def get_next_pending_subscription(self):
|
||||
|
@ -98,3 +99,8 @@ class BrokerProtocolHandler(ProtocolHandler):
|
|||
def mqtt_acknowledge_subscription(self, packet_id, return_codes):
|
||||
suback = SubackPacket.build(packet_id, return_codes)
|
||||
yield from self.outgoing_queue.put(suback)
|
||||
|
||||
@asyncio.coroutine
|
||||
def mqtt_acknowledge_unsubscription(self, packet_id):
|
||||
unsuback = UnsubackPacket.build(packet_id)
|
||||
yield from self.outgoing_queue.put(unsuback)
|
|
@ -79,7 +79,7 @@ class ProtocolHandler:
|
|||
self._pubcomp_waiters = dict()
|
||||
self.delivered_message = asyncio.Queue()
|
||||
|
||||
def attach_to_session(self, session:Session):
|
||||
def attach_to_session(self, session: Session):
|
||||
self.session = session
|
||||
self.session.handler = self
|
||||
extra_info = self.session.writer.get_extra_info('sockname')
|
||||
|
@ -93,8 +93,8 @@ class ProtocolHandler:
|
|||
@asyncio.coroutine
|
||||
def start(self):
|
||||
self._running = True
|
||||
self._reader_task = asyncio.async(self._reader_coro(), loop=self._loop)
|
||||
self._writer_task = asyncio.async(self._writer_coro(), loop=self._loop)
|
||||
self._reader_task = asyncio.Task(self._reader_coro(), loop=self._loop)
|
||||
self._writer_task = asyncio.Task(self._writer_coro(), loop=self._loop)
|
||||
yield from asyncio.wait(
|
||||
[self._reader_ready.wait(), self._writer_ready.wait()], loop=self._loop)
|
||||
self.logger.debug("Handler tasks started")
|
||||
|
|
Ładowanie…
Reference in New Issue