Merge branch 'release/0.4'

pull/8/head
Nicolas Jouanin 2015-07-27 21:55:42 +02:00
commit d747d1adb2
12 zmienionych plików z 483 dodań i 201 usunięć

Wyświetl plik

@ -2,4 +2,4 @@
#
# See the file license.txt for copying permission.
VERSION = (0, 3, 0, 'final', 0)
VERSION = (0, 4, 0, 'final', 0)

Wyświetl plik

@ -16,7 +16,8 @@ from hbmqtt.utils import format_client_message, gen_client_id
_defaults = {
'bind-address': 'localhost',
'bind-port': 1883,
'timeout-disconnect-delay': 10
'timeout-disconnect-delay': 2,
'publish-retry-delay': 5,
}
@ -29,6 +30,9 @@ class Subscription:
self.session = session
self.qos = qos
def __repr__(self):
return type(self).__name__ + '(client_id={0}, qos={1!r})'.format(self.session.client_id, self.qos)
class RetainedApplicationMessage:
def __init__(self, source_session, topic, data, qos=None):
@ -158,9 +162,8 @@ class Broker:
self.logger.debug("known sessions={0}".format(self._sessions))
if connect.variable_header.clean_session_flag:
client_id = connect.payload.client_id
if client_id is not None and client_id in self._sessions:
# Delete existing session
del self._sessions[client_id]
if client_id is not None:
self.delete_session(client_id)
client_session = Session()
client_session.parent = 0
self._sessions[client_id] = client_session
@ -193,6 +196,7 @@ class Broker:
client_session.keep_alive = connect.variable_header.keep_alive + self.config['timeout-disconnect-delay']
else:
client_session.keep_alive = 0
client_session.publish_retry_delay = self.config['publish-retry-delay']
client_session.reader = reader
client_session.writer = writer
@ -213,10 +217,11 @@ class Broker:
client_session.machine.connect()
handler = BrokerProtocolHandler(self._loop)
handler.attach_to_session(client_session)
self.logger.debug("Start messages handling")
self.logger.debug("%s Start messages handling" % client_session.client_id)
yield from handler.start()
self.logger.debug("Retained messages queue size: %d" % client_session.retained_messages.qsize())
yield from self.publish_session_retained_messages(client_session)
self.logger.debug("Wait for disconnect")
self.logger.debug("%s Wait for disconnect" % client_session.client_id)
connected = True
wait_disconnect = asyncio.Task(handler.wait_disconnect())
@ -229,7 +234,7 @@ class Broker:
return_when=asyncio.FIRST_COMPLETED)
if wait_disconnect in done:
result = wait_disconnect.result()
self.logger.debug("Result from wait_diconnect: %s" % result)
self.logger.debug("%s Result from wait_diconnect: %s" % (client_session.client_id, result))
if result is None:
self.logger.debug("Will flag: %s" % client_session.will_flag)
#Connection closed anormally, send will message
@ -247,12 +252,14 @@ class Broker:
client_session.will_qos)
connected = False
if wait_unsubscription in done:
self.logger.debug("%s handling unsubscription" % client_session.client_id)
unsubscription = wait_unsubscription.result()
for topic in unsubscription['topics']:
self.del_subscription(topic, client_session)
yield from handler.mqtt_acknowledge_unsubscription(unsubscription['packet_id'])
wait_unsubscription = asyncio.Task(handler.get_next_pending_unsubscription())
if wait_subscription in done:
self.logger.debug("%s handling subscription" % client_session.client_id)
subscriptions = wait_subscription.result()
return_codes = []
for subscription in subscriptions['topics']:
@ -264,18 +271,22 @@ class Broker:
wait_subscription = asyncio.Task(handler.get_next_pending_subscription())
self.logger.debug(repr(self._subscriptions))
if wait_deliver in done:
publish_packet = wait_deliver.result().packet
self.logger.debug("%s handling message delivery" % client_session.client_id)
publish_packet = wait_deliver.result()
packet_id = publish_packet.variable_header.packet_id
topic_name = publish_packet.variable_header.topic_name
data = publish_packet.payload.data
yield from self.broadcast_application_message(client_session, topic_name, data)
if publish_packet.retain_flag:
self.retain_message(client_session, topic_name, data)
# Acknowledge message delivery
yield from handler.mqtt_acknowledge_delivery(packet_id)
wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message())
wait_subscription.cancel()
wait_unsubscription.cancel()
wait_deliver.cancel()
self.logger.debug("Client disconnecting")
self.logger.debug("%s Client disconnecting" % client_session.client_id)
try:
yield from handler.stop()
except Exception as e:
@ -285,7 +296,7 @@ class Broker:
handler = None
client_session.machine.disconnect()
writer.close()
self.logger.debug("Session disconnected")
self.logger.debug("%s Session disconnected" % client_session.client_id)
@asyncio.coroutine
def check_connect(self, connect: ConnectPacket):
@ -306,12 +317,12 @@ class Broker:
def retain_message(self, source_session, topic_name, data, qos=None):
if data is not None and data != b'':
# If retained flag set, store the message for further subscriptions
self.logger.debug("Retaining message on topic %s" % topic_name)
self.logger.debug("%s Retaining message on topic %s" % (source_session.client_id, topic_name))
retained_message = RetainedApplicationMessage(source_session, topic_name, data, qos)
self._global_retained_messages[topic_name] = retained_message
else:
# [MQTT-3.3.1-10]
self.logger.debug("Clear retained messages for topic '%s'" % topic_name)
self.logger.debug("%s Clear retained messages for topic '%s'" % (source_session.client_id, topic_name))
del self._global_retained_messages[topic_name]
def add_subscription(self, subscription, session):
@ -383,9 +394,8 @@ class Broker:
(format_client_message(session=source_session),
topic, format_client_message(session=target_session)))
handler = subscription.session.handler
packet_id = handler.session.next_packet_id
publish_tasks.append(
asyncio.Task(handler.mqtt_publish(topic, data, packet_id, False, qos, retain=False))
asyncio.Task(handler.mqtt_publish(topic, data, qos, retain=False))
)
else:
self.logger.debug("retaining application message from %s on topic '%s' to client '%s'" %
@ -395,10 +405,17 @@ class Broker:
publish_tasks.append(
asyncio.Task(target_session.retained_messages.put(retained_message))
)
if len(publish_tasks) > 0:
asyncio.wait(publish_tasks)
except Exception as e:
self.logger.warn("Message broadcasting failed: %s", e)
self.logger.debug("End Broadcasting message from %s on topic %s" %
(format_client_message(session=source_session), topic)
)
for client_id in self._sessions:
self.logger.debug("%s Retained messages queue size: %d" %
(client_id, self._sessions[client_id].retained_messages.qsize()))
@asyncio.coroutine
def publish_session_retained_messages(self, session):
@ -408,10 +425,9 @@ class Broker:
publish_tasks = []
while not session.retained_messages.empty():
retained = yield from session.retained_messages.get()
packet_id = session.next_packet_id
publish_tasks.append(asyncio.Task(
session.handler.mqtt_publish(
retained.topic, retained.data, packet_id, False, retained.qos, True)))
retained.topic, retained.data, retained.qos, True)))
if len(publish_tasks) > 0:
asyncio.wait(publish_tasks)
@ -425,11 +441,35 @@ class Broker:
if self.matches(d_topic, subscription['filter']):
self.logger.debug("%s and %s match" % (d_topic, subscription['filter']))
retained = self._global_retained_messages[d_topic]
packet_id = session.next_packet_id
publish_tasks.append(asyncio.Task(
session.handler.mqtt_publish(
retained.topic, retained.data, packet_id, False, subscription['qos'], True)))
retained.topic, retained.data, subscription['qos'], True)))
if len(publish_tasks) > 0:
asyncio.wait(publish_tasks)
self.logger.debug("End broadcasting messages retained due to subscription on '%s' from %s" %
(subscription['filter'], format_client_message(session=session)))
def delete_session(self, client_id):
"""
Delete an existing session data, for example due to clean session set in CONNECT
:param client_id:
:return:
"""
try:
session = self._sessions[client_id]
except KeyError:
session = None
if session is None:
self.logger.warn("Delete session : session %s doesn't exist" % client_id)
return
# Delete subscriptions
self.logger.debug("deleting session %s subscriptions" % repr(session))
nb_sub = 0
for filter in self._subscriptions:
self.del_subscription(filter, session)
nb_sub += 1
self.logger.debug("%d subscriptions deleted" % nb_sub)
self.logger.debug("deleting existing session %s" % repr(self._sessions[client_id]))
del self._sessions[client_id]

Wyświetl plik

@ -143,7 +143,7 @@ class MQTTClient:
self._handler.mqtt_ping()
@asyncio.coroutine
def publish(self, topic, message, dup=False, qos=None, retain=None):
def publish(self, topic, message, qos=None, retain=None):
def get_retain_and_qos():
if qos:
_qos = qos
@ -164,11 +164,11 @@ class MQTTClient:
return _qos, _retain
(app_qos, app_retain) = get_retain_and_qos()
if app_qos == 0:
yield from self._handler.mqtt_publish(topic, message, self.session.next_packet_id, dup, 0x00, app_retain)
yield from self._handler.mqtt_publish(topic, message, 0x00, app_retain)
if app_qos == 1:
yield from self._handler.mqtt_publish(topic, message, self.session.next_packet_id, dup, 0x01, app_retain)
yield from self._handler.mqtt_publish(topic, message, 0x01, app_retain)
if app_qos == 2:
yield from self._handler.mqtt_publish(topic, message, self.session.next_packet_id, dup, 0x02, app_retain)
yield from self._handler.mqtt_publish(topic, message, 0x02, app_retain)
@asyncio.coroutine
def subscribe(self, topics):
@ -182,6 +182,10 @@ class MQTTClient:
def deliver_message(self):
return (yield from self._handler.mqtt_deliver_next_message())
@asyncio.coroutine
def acknowledge_delivery(self, packet_id):
yield from self._handler.mqtt_acknowledge_delivery(packet_id)
@asyncio.coroutine
def _connect_coro(self):
try:

Wyświetl plik

@ -37,7 +37,7 @@ class BrokerProtocolHandler(ProtocolHandler):
@asyncio.coroutine
def wait_disconnect(self):
yield from self._disconnect_waiter
return (yield from self._disconnect_waiter)
def handle_write_timeout(self):
pass
@ -48,7 +48,7 @@ class BrokerProtocolHandler(ProtocolHandler):
@asyncio.coroutine
def handle_disconnect(self, disconnect):
if self._disconnect_waiter is not None:
if self._disconnect_waiter and not self._disconnect_waiter.done():
self._disconnect_waiter.set_result(disconnect)
@asyncio.coroutine
@ -59,8 +59,8 @@ class BrokerProtocolHandler(ProtocolHandler):
def handle_connect(self, connect: ConnectPacket):
# Broker handler shouldn't received CONNECT message during messages handling
# as CONNECT messages are managed by the broker on client connection
self.logger.error('[MQTT-3.1.0-2] %s : CONNECT message received during messages handling' %
(format_client_message(self.session)))
self.logger.error('%s [MQTT-3.1.0-2] %s : CONNECT message received during messages handling' %
(self.session.client_id, format_client_message(self.session)))
if self._disconnect_waiter is not None and not self._disconnect_waiter.done():
self._disconnect_waiter.set_result(None)

Wyświetl plik

@ -3,10 +3,10 @@
# See the file license.txt for copying permission.
import logging
import asyncio
from asyncio import futures
from datetime import datetime
from hbmqtt.mqtt.packet import MQTTFixedHeader, MQTTPacket
from hbmqtt.mqtt import packet_class
from hbmqtt.errors import NoDataException
from hbmqtt.errors import NoDataException, HBMQTTException
from hbmqtt.mqtt.packet import PacketType
from hbmqtt.mqtt.connack import ConnackPacket
from hbmqtt.mqtt.connect import ConnectPacket
@ -23,30 +23,8 @@ from hbmqtt.mqtt.unsubscribe import UnsubscribePacket
from hbmqtt.mqtt.unsuback import UnsubackPacket
from hbmqtt.mqtt.disconnect import DisconnectPacket
from hbmqtt.session import Session
from transitions import Machine
class InFlightMessage:
states = ['new', 'published', 'acknowledged', 'received', 'released', 'completed']
def __init__(self, packet, qos):
self.packet = packet
self.qos = qos
self.puback = None
self.pubrec = None
self.pubcomp = None
self.pubrel = None
self._init_states()
def _init_states(self):
self.machine = Machine(model=self, states=InFlightMessage.states, initial='new')
self.machine.add_transition(trigger='publish', source='new', dest='published')
if self.qos == 0x01:
self.machine.add_transition(trigger='acknowledge', source='published', dest='acknowledged')
if self.qos == 0x02:
self.machine.add_transition(trigger='receive', source='published', dest='received')
self.machine.add_transition(trigger='release', source='received', dest='released')
self.machine.add_transition(trigger='complete', source='released', dest='completed')
from hbmqtt.specs import *
from hbmqtt.mqtt.protocol.inflight import *
class ProtocolHandler:
@ -68,16 +46,8 @@ class ProtocolHandler:
self._running = False
self.incoming_queues = dict()
self.application_messages = asyncio.Queue()
for p in PacketType:
self.incoming_queues[p] = asyncio.Queue()
self.outgoing_queue = asyncio.Queue()
self._puback_waiters = dict()
self._pubrec_waiters = dict()
self._pubrel_waiters = dict()
self._pubcomp_waiters = dict()
self.delivered_message = asyncio.Queue()
def attach_to_session(self, session: Session):
self.session = session
@ -97,50 +67,59 @@ class ProtocolHandler:
self._writer_task = asyncio.Task(self._writer_coro(), loop=self._loop)
yield from asyncio.wait(
[self._reader_ready.wait(), self._writer_ready.wait()], loop=self._loop)
self.logger.debug("Handler tasks started")
self.logger.debug("%s Handler tasks started" % self.session.client_id)
yield from self.retry_deliveries()
@asyncio.coroutine
def mqtt_publish(self, topic, message, packet_id, dup, qos, retain):
if packet_id in self.session.inflight_out:
self.logger.warn("A message with the same packet ID is already in flight")
packet = PublishPacket.build(topic, message, packet_id, dup, qos, retain)
def retry_deliveries(self):
"""
Handle [MQTT-4.4.0-1] by resending PUBLISH and PUBREL messages for pending out messages
:return:
"""
self.logger.debug("Begin messages delivery retries")
ack_packets = []
for packet_id in self.session.outgoing_msg:
message = self.session.outgoing_msg[packet_id]
self.logger.debug(message.state)
if message.is_new() or message.is_published():
self.logger.debug("Retrying publish message Id=%d", packet_id)
message.publish_packet.dup_flag = True
ack = False
while not ack:
yield from self.outgoing_queue.put(message.publish_packet)
message.retry_publish()
ack = yield from message.wait_acknowledge()
ack_packets.append(packet_id)
if message.is_received():
self.logger.debug("Retrying pubrel message Id=%d", packet_id)
yield from self.outgoing_queue.put(PubrelPacket.build(packet_id))
message.sent_pubrel()
for packet_id in ack_packets:
del self.session.outgoing_msg[packet_id]
self.logger.debug("%d messages redelivered" % len(ack_packets))
self.logger.debug("End messages delivery retries")
@asyncio.coroutine
def mqtt_publish(self, topic, message, qos, retain):
packet_id = self.session.next_packet_id
if packet_id in self.session.outgoing_msg:
self.logger.warn("%s A message with the same packet ID is already in flight" % self.session.client_id)
packet = PublishPacket.build(topic, message, packet_id, False, qos, retain)
yield from self.outgoing_queue.put(packet)
inflight_message = InFlightMessage(packet, qos)
self.session.inflight_out[packet.variable_header.packet_id] = inflight_message
inflight_message.publish()
if qos == 0x01:
waiter = futures.Future(loop=self._loop)
self._puback_waiters[packet_id] = waiter
yield from waiter
inflight_message.puback = waiter.result()
inflight_message.acknowledge()
del self._puback_waiters[packet_id]
if qos == 0x02:
# Wait for PUBREC
waiter = futures.Future(loop=self._loop)
self._pubrec_waiters[packet_id] = waiter
yield from waiter
inflight_message.pubrec = waiter.result()
del self._pubrec_waiters[packet_id]
inflight_message.receive()
# Send pubrel
pubrel = PubrelPacket.build(packet_id)
yield from self.outgoing_queue.put(pubrel)
inflight_message.pubrel = pubrel
inflight_message.release()
# Wait for pubcomp
waiter = futures.Future(loop=self._loop)
self._pubcomp_waiters[packet_id] = waiter
yield from waiter
inflight_message.pubcomp = waiter.result()
del self._pubcomp_waiters[packet_id]
inflight_message.complete()
del self.session.inflight_out[packet_id]
return inflight_message
if qos != QOS_0:
inflight_message = OutgoingInFlightMessage(packet, qos, loop=self._loop)
inflight_message.sent_publish()
self.session.outgoing_msg[packet_id] = inflight_message
ack = yield from inflight_message.wait_acknowledge()
while not ack:
#Retry publish
packet = PublishPacket.build(topic, message, packet_id, True, qos, retain)
self.logger.debug("Retry delivery of packet %s" % repr(packet))
inflight_message.publish_packet = packet
yield from self.outgoing_queue.put(packet)
inflight_message.retry_publish()
ack = yield from inflight_message.wait_acknowledge()
del self.session.outgoing_msg[packet_id]
@asyncio.coroutine
def stop(self):
@ -148,10 +127,15 @@ class ProtocolHandler:
self.session.reader.feed_eof()
yield from self.outgoing_queue.put("STOP")
yield from asyncio.wait([self._writer_task, self._reader_task], loop=self._loop)
# Stop incoming messages flow waiter
for packet_id in self.session.incoming_msg:
self.session.incoming_msg[packet_id].cancel()
for packet_id in self.session.outgoing_msg:
self.session.outgoing_msg[packet_id].cancel()
@asyncio.coroutine
def _reader_coro(self):
self.logger.debug("Starting reader coro")
self.logger.debug("%s Starting reader coro" % self.session.client_id)
while self._running:
try:
self._reader_ready.set()
@ -162,55 +146,60 @@ class ProtocolHandler:
if fixed_header:
cls = packet_class(fixed_header)
packet = yield from cls.from_stream(self.session.reader, fixed_header=fixed_header)
self.logger.debug(" <-in-- " + repr(packet))
self.logger.debug("%s <-in-- %s" % (self.session.client_id, repr(packet)))
task = None
if packet.fixed_header.packet_type == PacketType.CONNACK:
asyncio.Task(self.handle_connack(packet))
task = asyncio.Task(self.handle_connack(packet))
elif packet.fixed_header.packet_type == PacketType.SUBSCRIBE:
asyncio.Task(self.handle_subscribe(packet))
task = asyncio.Task(self.handle_subscribe(packet))
elif packet.fixed_header.packet_type == PacketType.UNSUBSCRIBE:
asyncio.Task(self.handle_unsubscribe(packet))
task = asyncio.Task(self.handle_unsubscribe(packet))
elif packet.fixed_header.packet_type == PacketType.SUBACK:
asyncio.Task(self.handle_suback(packet))
task = asyncio.Task(self.handle_suback(packet))
elif packet.fixed_header.packet_type == PacketType.UNSUBACK:
asyncio.Task(self.handle_unsuback(packet))
task = asyncio.Task(self.handle_unsuback(packet))
elif packet.fixed_header.packet_type == PacketType.PUBACK:
asyncio.Task(self.handle_puback(packet))
task = asyncio.Task(self.handle_puback(packet))
elif packet.fixed_header.packet_type == PacketType.PUBREC:
asyncio.Task(self.handle_pubrec(packet))
task = asyncio.Task(self.handle_pubrec(packet))
elif packet.fixed_header.packet_type == PacketType.PUBREL:
asyncio.Task(self.handle_pubrel(packet))
task = asyncio.Task(self.handle_pubrel(packet))
elif packet.fixed_header.packet_type == PacketType.PUBCOMP:
asyncio.Task(self.handle_pubcomp(packet))
task = asyncio.Task(self.handle_pubcomp(packet))
elif packet.fixed_header.packet_type == PacketType.PINGREQ:
asyncio.Task(self.handle_pingreq(packet))
task = asyncio.Task(self.handle_pingreq(packet))
elif packet.fixed_header.packet_type == PacketType.PINGRESP:
asyncio.Task(self.handle_pingresp(packet))
task = asyncio.Task(self.handle_pingresp(packet))
elif packet.fixed_header.packet_type == PacketType.PUBLISH:
asyncio.Task(self.handle_publish(packet))
task = asyncio.Task(self.handle_publish(packet))
elif packet.fixed_header.packet_type == PacketType.DISCONNECT:
asyncio.Task(self.handle_disconnect(packet))
task = asyncio.Task(self.handle_disconnect(packet))
elif packet.fixed_header.packet_type == PacketType.CONNECT:
asyncio.Task(self.handle_connect(packet))
task = asyncio.Task(self.handle_connect(packet))
else:
self.logger.warn("Unhandled packet type: %s" % packet.fixed_header.packet_type)
self.logger.warn("%s Unhandled packet type: %s" %
(self.session.client_id, packet.fixed_header.packet_type))
if task:
# Wait for message handling ends
asyncio.wait([task])
else:
self.logger.debug("No more data, stopping reader coro")
self.logger.debug("%s No more data, stopping reader coro" % self.session.client_id)
yield from self.handle_connection_closed()
break
except asyncio.TimeoutError:
self.logger.debug("Input stream read timeout")
self.logger.debug("%s Input stream read timeout" % self.session.client_id)
self.handle_read_timeout()
except NoDataException as nde:
self.logger.debug("No data available")
self.logger.debug("%s No data available" % self.session.client_id)
except Exception as e:
self.logger.warn("Unhandled exception in reader coro: %s" % e)
self.logger.warn("%s Unhandled exception in reader coro: %s" % (self.session.client_id, e))
break
self.logger.debug("Reader coro stopped")
self.logger.debug("%s Reader coro stopped" % self.session.client_id)
@asyncio.coroutine
def _writer_coro(self):
self.logger.debug("Starting writer coro")
self.logger.debug("%s Starting writer coro" % self.session.client_id)
while self._running:
try:
self._writer_ready.set()
@ -219,23 +208,22 @@ class ProtocolHandler:
keepalive_timeout = None
packet = yield from asyncio.wait_for(self.outgoing_queue.get(), keepalive_timeout)
if not isinstance(packet, MQTTPacket):
self.logger.debug("Writer interruption")
self.logger.debug("%s Writer interruption" % self.session.client_id)
break
yield from packet.to_stream(self.session.writer)
self.logger.debug(" -out-> " + repr(packet))
self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet)))
yield from self.session.writer.drain()
#self.outgoing_queue.task_done() # to be used with Python 3.5
except asyncio.TimeoutError as ce:
self.logger.debug("Output queue get timeout")
self.logger.debug("%s Output queue get timeout" % self.session.client_id)
if self._running:
self.handle_write_timeout()
except ConnectionResetError as cre:
yield from self.handle_connection_closed()
break
except Exception as e:
self.logger.warn("Unhandled exception in writer coro: %s" % e)
self.logger.warn("%sUnhandled exception in writer coro: %s" % (self.session.client_id, e))
break
self.logger.debug("Writer coro stopping")
self.logger.debug("%s Writer coro stopping" % self.session.client_id)
# Flush queue before stopping
if not self.outgoing_queue.empty():
while True:
@ -244,79 +232,95 @@ class ProtocolHandler:
if not isinstance(packet, MQTTPacket):
break
yield from packet.to_stream(self.session.writer)
self.logger.debug(" -out-> " + repr(packet))
self.logger.debug("%s -out-> %s" % (self.session.client_id, repr(packet)))
except asyncio.QueueEmpty:
break
except Exception as e:
self.logger.warn("Unhandled exception in writer coro: %s" % e)
self.logger.debug("Writer coro stopped")
self.logger.warn("%s Unhandled exception in writer coro: %s" % (self.session.client_id, e))
self.logger.debug("%s Writer coro stopped" % self.session.client_id)
@asyncio.coroutine
def mqtt_deliver_next_message(self):
inflight_message = yield from self.delivered_message.get()
return inflight_message
packet_id = yield from self.session.delivered_message_queue.get()
message = self.session.incoming_msg[packet_id]
if message.qos == QOS_0:
del self.session.incoming_msg[packet_id]
self.logger.debug("Discarded incoming message %s" % packet_id)
return message.publish_packet
@asyncio.coroutine
def mqtt_acknowledge_delivery(self, packet_id):
try:
message = self.session.incoming_msg[packet_id]
message.acknowledge_delivery()
self.logger.debug('Message delivery acknowledged, packed_id=%d' % packet_id)
except KeyError:
pass
def handle_write_timeout(self):
self.logger.warn('write timeout unhandled')
self.logger.warn('%s write timeout unhandled' % self.session.client_id)
def handle_read_timeout(self):
self.logger.warn('read timeout unhandled')
self.logger.warn('%s read timeout unhandled' % self.session.client_id)
@asyncio.coroutine
def handle_connack(self, connack: ConnackPacket):
self.logger.warn('CONNACK unhandled')
self.logger.warn('%s CONNACK unhandled' % self.session.client_id)
@asyncio.coroutine
def handle_connect(self, connect: ConnectPacket):
self.logger.warn('CONNECT unhandled')
self.logger.warn('%s CONNECT unhandled' % self.session.client_id)
@asyncio.coroutine
def handle_subscribe(self, subscribe: SubscribePacket):
self.logger.warn('SUBSCRIBE unhandled')
self.logger.warn('%s SUBSCRIBE unhandled' % self.session.client_id)
@asyncio.coroutine
def handle_unsubscribe(self, subscribe: UnsubscribePacket):
self.logger.warn('UNSUBSCRIBE unhandled')
self.logger.warn('%s UNSUBSCRIBE unhandled' % self.session.client_id)
@asyncio.coroutine
def handle_suback(self, suback: SubackPacket):
self.logger.warn('SUBACK unhandled')
self.logger.warn('%s SUBACK unhandled' % self.session.client_id)
@asyncio.coroutine
def handle_unsuback(self, unsuback: UnsubackPacket):
self.logger.warn('UNSUBACK unhandled')
self.logger.warn('%s UNSUBACK unhandled' % self.session.client_id)
@asyncio.coroutine
def handle_pingresp(self, pingresp: PingRespPacket):
self.logger.warn('PINGRESP unhandled')
self.logger.warn('%s PINGRESP unhandled' % self.session.client_id)
@asyncio.coroutine
def handle_pingreq(self, pingreq: PingReqPacket):
self.logger.warn('PINGREQ unhandled')
self.logger.warn('%s PINGREQ unhandled' % self.session.client_id)
@asyncio.coroutine
def handle_disconnect(self, disconnect: DisconnectPacket):
self.logger.warn('DISCONNECT unhandled')
self.logger.warn('%s DISCONNECT unhandled' % self.session.client_id)
@asyncio.coroutine
def handle_connection_closed(self):
self.logger.warn('Connection closed unhandled')
self.logger.warn('%s Connection closed unhandled' % self.session.client_id)
@asyncio.coroutine
def handle_puback(self, puback: PubackPacket):
packet_id = puback.variable_header.packet_id
try:
waiter = self._puback_waiters[packet_id]
waiter.set_result(puback)
inflight_message = self.session.outgoing_msg[packet_id]
inflight_message.received_puback()
except KeyError as ke:
self.logger.warn("Received PUBACK for unknown pending subscription with Id: %s" % packet_id)
self.logger.warn("%s Received PUBACK for unknown pending subscription with Id: %s" %
(self.session.client_id, packet_id))
@asyncio.coroutine
def handle_pubrec(self, pubrec: PubrecPacket):
packet_id = pubrec.variable_header.packet_id
try:
waiter = self._pubrec_waiters[packet_id]
waiter.set_result(pubrec)
inflight_message = self.session.outgoing_msg[packet_id]
inflight_message.received_pubrec()
yield from self.outgoing_queue.put(PubrelPacket.build(packet_id))
inflight_message.sent_pubrel()
except KeyError as ke:
self.logger.warn("Received PUBREC for unknown pending subscription with Id: %s" % packet_id)
@ -324,8 +328,8 @@ class ProtocolHandler:
def handle_pubcomp(self, pubcomp: PubcompPacket):
packet_id = pubcomp.variable_header.packet_id
try:
waiter = self._pubcomp_waiters[packet_id]
waiter.set_result(pubcomp)
inflight_message = self.session.outgoing_msg[packet_id]
inflight_message.received_pubcomp()
except KeyError as ke:
self.logger.warn("Received PUBCOMP for unknown pending subscription with Id: %s" % packet_id)
@ -333,44 +337,77 @@ class ProtocolHandler:
def handle_pubrel(self, pubrel: PubrecPacket):
packet_id = pubrel.variable_header.packet_id
try:
waiter = self._pubrel_waiters[packet_id]
waiter.set_result(pubrel)
inflight_message = self.session.incoming_msg[packet_id]
inflight_message.received_pubrel()
except KeyError as ke:
self.logger.warn("Received PUBREL for unknown pending subscription with Id: %s" % packet_id)
@asyncio.coroutine
def handle_publish(self, publish : PublishPacket):
inflight_message = None
packet_id = publish.variable_header.packet_id
qos = (publish.fixed_header.flags >> 1) & 0x03
def handle_publish(self, publish_packet: PublishPacket):
incoming_message = None
packet_id = publish_packet.variable_header.packet_id
qos = publish_packet.qos
if qos == 0:
inflight_message = InFlightMessage(publish, qos)
yield from self.delivered_message.put(inflight_message)
else:
if packet_id in self.session.inflight_in:
inflight_message = self.session.inflight_in[packet_id]
if publish_packet.dup_flag:
self.logger.warn("[MQTT-3.3.1-2] DUP flag must set to 0 for QOS 0 message. Message ignored: %s" %
repr(publish_packet))
else:
inflight_message = InFlightMessage(publish, qos)
self.session.inflight_in[packet_id] = inflight_message
inflight_message.publish()
# Assign packet_id as it's needed internally
packet_id = self.session.next_packet_id
publish_packet.variable_header.packet_id = packet_id
incoming_message = IncomingInFlightMessage(publish_packet,
qos,
self.session.publish_retry_delay,
self._loop)
incoming_message.received_publish()
self.session.incoming_msg[packet_id] = incoming_message
yield from self.session.delivered_message_queue.put(packet_id)
else:
# Check if publish is a retry
if packet_id in self.session.incoming_msg:
incoming_message = self.session.incoming_msg[packet_id]
else:
incoming_message = IncomingInFlightMessage(publish_packet,
qos,
self.session.publish_retry_delay,
self._loop)
self.session.incoming_msg[packet_id] = incoming_message
incoming_message.publish()
if qos == 1:
puback = PubackPacket.build(packet_id)
yield from self.outgoing_queue.put(puback)
inflight_message.acknowledge()
# Initiate delivery
yield from self.session.delivered_message_queue.put(packet_id)
ack = yield from incoming_message.wait_acknowledge()
if ack:
# Send PUBACK
puback = PubackPacket.build(packet_id)
yield from self.outgoing_queue.put(puback)
#Discard message
del self.session.incoming_msg[packet_id]
self.logger.debug("Discarded incoming message %d" % packet_id)
else:
raise HBMQTTException("Something wrong, ack is False")
if qos == 2:
# Send PUBREC
pubrec = PubrecPacket.build(packet_id)
yield from self.outgoing_queue.put(pubrec)
inflight_message.receive()
waiter = futures.Future(loop=self._loop)
self._pubrel_waiters[packet_id] = waiter
yield from waiter
inflight_message.pubrel = waiter.result()
del self._pubrel_waiters[packet_id]
inflight_message.release()
pubcomp = PubcompPacket.build(packet_id)
yield from self.outgoing_queue.put(pubcomp)
inflight_message.complete()
yield from self.delivered_message.put(inflight_message)
del self.session.inflight_in[packet_id]
incoming_message.sent_pubrec()
# Wait for pubrel
ack = yield from incoming_message.wait_pubrel()
if ack:
# Initiate delivery
yield from self.session.delivered_message_queue.put(packet_id)
else:
raise HBMQTTException("Something wrong, ack is False")
ack = yield from incoming_message.wait_acknowledge()
if ack:
# Send PUBCOMP
pubcomp = PubcompPacket.build(packet_id)
yield from self.outgoing_queue.put(pubcomp)
incoming_message.sent_pubcomp()
#Discard message
del self.session.incoming_msg[packet_id]
self.logger.debug("Discarded incoming message %d" % packet_id)
else:
raise HBMQTTException("Something wrong, ack is False")

Wyświetl plik

@ -0,0 +1,193 @@
# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
import asyncio
from transitions import Machine, MachineError
from datetime import datetime
from hbmqtt.errors import HBMQTTException
class InFlightMessage:
states = ['new', 'published', 'acknowledged', 'received', 'released', 'completed']
def __init__(self, packet, qos, ack_timeout=0, loop=None):
if loop is None:
self._loop = asyncio.get_event_loop()
else:
self._loop = loop
self.publish_packet = packet
self.qos = qos
self.publish_ts = None
self.puback_ts = None
self.pubrec_ts = None
self.pubrel_ts = None
self.pubcomp_ts = None
self.nb_retries = 0
self._ack_waiter = asyncio.Future(loop=self._loop)
self._ack_timeout = ack_timeout
self._ack_timeout_handle = None
self._init_states()
def _init_states(self):
self.machine = Machine(model=self, states=InFlightMessage.states, initial='new')
self.machine.add_transition(trigger='publish', source='new', dest='published')
self.machine.add_transition(trigger='publish', source='published', dest='published')
self.machine.add_transition(trigger='publish', source='received', dest='published')
self.machine.add_transition(trigger='publish', source='released', dest='published')
if self.qos == 0x01:
self.machine.add_transition(trigger='acknowledge', source='published', dest='acknowledged')
if self.qos == 0x02:
self.machine.add_transition(trigger='receive', source='published', dest='received')
self.machine.add_transition(trigger='release', source='received', dest='released')
self.machine.add_transition(trigger='complete', source='released', dest='completed')
self.machine.add_transition(trigger='acknowledge', source='completed', dest='acknowledged')
@asyncio.coroutine
def wait_acknowledge(self):
return (yield from self._ack_waiter)
def start_ack_timeout(self):
def cb_timeout():
self._ack_waiter.set_result(False)
if self._ack_timeout:
self._ack_timeout_handle = self._loop.call_later(self._ack_timeout, cb_timeout)
def cancel_ack_timeout(self):
if self._ack_timeout_handle:
self._ack_timeout_handle.cancel()
def reset_ack_timeout(self):
self.cancel_ack_timeout()
self.start_ack_timeout()
def cancel(self):
if self._ack_waiter and not self._ack_waiter.done():
self._ack_waiter.cancel()
self.cancel_ack_timeout()
class OutgoingInFlightMessage(InFlightMessage):
def received_puback(self):
try:
self.acknowledge()
self.puback_ts = datetime.now()
self.cancel_ack_timeout()
self._ack_waiter.set_result(True)
except MachineError:
raise HBMQTTException(
'Invalid call to method received_puback on in-flight messages with QOS=%d, state=%s' %
(self.qos, self.state))
def received_pubrec(self):
try:
self.receive()
self.pubrec_ts = datetime.now()
self.publish_packet = None # Discard message
self.reset_ack_timeout()
except MachineError:
raise HBMQTTException(
'Invalid call to method received_pubrec on in-flight messages with QOS=%d, state=%s' %
(self.qos, self.state))
def received_pubcomp(self):
try:
self.complete()
self.pubcomp_ts = datetime.now()
self.cancel_ack_timeout()
self._ack_waiter.set_result(True)
self.acknowledge()
except MachineError:
raise HBMQTTException(
'Invalid call to method received_pubcomp on in-flight messages with QOS=%d, state=%s' %
(self.qos, self.state))
def sent_pubrel(self):
try:
self.release()
self.pubrel_ts = datetime.now()
except MachineError:
raise HBMQTTException(
'Invalid call to method sent_pubrel on in-flight messages with QOS=%d, state=%s' %
(self.qos, self.state))
def retry_publish(self):
try:
self.publish()
self._ack_waiter = asyncio.Future(loop=self._loop)
self.nb_retries += 1
self.publish_ts = datetime.now()
self.start_ack_timeout()
except MachineError:
raise HBMQTTException(
'Invalid call to method retry_publish on in-flight messages with QOS=%d, state=%s' %
(self.qos, self.state))
def sent_publish(self):
try:
self.publish()
self.publish_ts = datetime.now()
self.start_ack_timeout()
except MachineError:
raise HBMQTTException(
'Invalid call to method sent_publish on in-flight messages with QOS=%d, state=%s' %
(self.qos, self.state))
class IncomingInFlightMessage(InFlightMessage):
def __init__(self, packet, qos, ack_timeout=0, loop=None):
super().__init__(packet, qos, ack_timeout, loop)
self._pubrel_waiter = asyncio.Future(loop=self._loop)
def received_publish(self):
try:
self.publish()
self.publish_ts = datetime.now()
except MachineError:
raise HBMQTTException(
'Invalid call to method received_publish on in-flight messages with QOS=%d, state=%s' %
(self.qos, self.state))
def sent_pubrec(self):
try:
self.receive()
self.pubrec_ts = datetime.now()
except MachineError:
raise HBMQTTException(
'Invalid call to method sent_pubrec on in-flight messages with QOS=%d, state=%s' %
(self.qos, self.state))
def sent_pubcomp(self):
try:
self.complete()
self.pubcomp_ts = datetime.now()
except MachineError:
raise HBMQTTException(
'Invalid call to method sent_pubrec on in-flight messages with QOS=%d, state=%s' %
(self.qos, self.state))
@asyncio.coroutine
def wait_pubrel(self):
return (yield from self._pubrel_waiter)
def received_pubrel(self):
try:
self.release()
self.pubrel_ts = datetime.now()
self._pubrel_waiter.set_result(True)
except MachineError:
raise HBMQTTException(
'Invalid call to method received_pubcomp on in-flight messages with QOS=%d, state=%s' %
(self.qos, self.state))
def acknowledge_delivery(self):
try:
self._ack_waiter.set_result(True)
except MachineError:
raise HBMQTTException(
'Invalid call to method acknowledge_delivery on in-flight messages with QOS=%d, state=%s' %
(self.qos, self.state))
def cancel(self):
super().cancel()
if self._pubrel_waiter and not self._pubrel_waiter.done():
self._pubrel_waiter.cancel()

Wyświetl plik

@ -61,7 +61,6 @@ class PublishPacket(MQTTPacket):
RETAIN_FLAG = 0x01
QOS_FLAG = 0x06
def __init__(self, fixed: MQTTFixedHeader=None, variable_header: PublishVariableHeader=None, payload=None):
if fixed is None:
header = MQTTFixedHeader(PacketType.PUBLISH, 0x00)

Wyświetl plik

@ -1,10 +1,10 @@
# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
from enum import Enum
from transitions import Machine, MachineError
from asyncio import Queue
class Session:
states = ['new', 'connected', 'disconnected']
@ -24,6 +24,7 @@ class Session:
self.will_retain = None
self.will_topic = None
self.keep_alive = 0
self.publish_retry_delay = 0
self.username = None
self.password = None
self.scheme = None
@ -31,10 +32,18 @@ class Session:
self.parent = 0
self.handler = None
self.inflight_out = dict()
self.inflight_in = dict()
# Used to store outgoing InflightMessage while publish protocol flows
self.outgoing_msg = dict()
# Used to store incoming InflightMessage while publish protocol flows
self.incoming_msg = dict()
# Stores messages retained for this session
self.retained_messages = Queue()
# Stores PUBLISH messages ID received in order and ready for application process
self.delivered_message_queue = Queue()
def _init_states(self):
self.machine = Machine(states=Session.states, initial='new')
self.machine.add_transition(trigger='connect', source='new', dest='connected')

Wyświetl plik

@ -1,8 +1,7 @@
__author__ = 'nico'
# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
from enum import Enum
class Qos(Enum):
Qos_0 = 0x00
Qos_1 = 0x01
Qos_2 = 0x02
QOS_0 = 0x00
QOS_1 = 0x01
QOS_2 = 0x02

Wyświetl plik

@ -1,2 +1,2 @@
transitions
transitions==0.2.5
blinker

Wyświetl plik

@ -38,5 +38,5 @@ def test_coro():
if __name__ == '__main__':
formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
logging.basicConfig(level=logging.DEBUG, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())

Wyświetl plik

@ -21,12 +21,13 @@ def uptime_coro():
# Subscribe to '$SYS/broker/uptime' with QOS=1
yield from C.subscribe([
{'filter': '$SYS/broker/uptime', 'qos': 0x01},
{'filter': '$SYS/broker/load/#', 'qos': 0x00},
{'filter': '$SYS/broker/load/#', 'qos': 0x02},
])
logger.info("Subscribed")
for i in range(1, 10):
inflight = yield from C.deliver_message()
print(inflight.packet.payload.data)
for i in range(1, 100):
packet = yield from C.deliver_message()
print("%d %s : %s" % (i, packet.variable_header.topic_name, str(packet.payload.data)))
yield from C.acknowledge_delivery(packet.variable_header.packet_id)
yield from C.unsubscribe(['$SYS/broker/uptime'])
logger.info("UnSubscribed")
yield from C.disconnect()