From b681de96de8636bfd128d8e8413a0216f1630161 Mon Sep 17 00:00:00 2001 From: Nicolas Jouanin Date: Tue, 7 Jul 2015 21:55:17 +0200 Subject: [PATCH] Source tree refactoring --- hbmqtt/{client/_client.py => client.py} | 5 +- hbmqtt/client/__init__.py | 1 - hbmqtt/mqtt/protocol/__init__.py | 0 hbmqtt/mqtt/protocol/client_handler.py | 152 ++++++++++++++++++ .../mqtt/{protocol.py => protocol/handler.py} | 141 +--------------- samples/client_keepalive.py | 4 +- samples/client_publish.py | 4 +- samples/client_subscribe.py | 4 +- 8 files changed, 164 insertions(+), 147 deletions(-) rename hbmqtt/{client/_client.py => client.py} (98%) delete mode 100644 hbmqtt/client/__init__.py create mode 100644 hbmqtt/mqtt/protocol/__init__.py create mode 100644 hbmqtt/mqtt/protocol/client_handler.py rename hbmqtt/mqtt/{protocol.py => protocol/handler.py} (71%) diff --git a/hbmqtt/client/_client.py b/hbmqtt/client.py similarity index 98% rename from hbmqtt/client/_client.py rename to hbmqtt/client.py index 625ac84..4b4c05b 100644 --- a/hbmqtt/client/_client.py +++ b/hbmqtt/client.py @@ -9,10 +9,7 @@ from transitions import Machine, MachineError from hbmqtt.utils import not_in_dict_or_none from hbmqtt.session import Session, SessionState from hbmqtt.mqtt.connack import ReturnCode -from hbmqtt.mqtt.subscribe import SubscribePacket -from hbmqtt.mqtt.suback import SubackPacket -from hbmqtt.errors import MQTTException -from hbmqtt.mqtt.protocol import ClientProtocolHandler +from hbmqtt.mqtt.protocol.client_handler import ClientProtocolHandler _defaults = { 'keep_alive': 10, diff --git a/hbmqtt/client/__init__.py b/hbmqtt/client/__init__.py deleted file mode 100644 index e1bd617..0000000 --- a/hbmqtt/client/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__author__ = 'nico' diff --git a/hbmqtt/mqtt/protocol/__init__.py b/hbmqtt/mqtt/protocol/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hbmqtt/mqtt/protocol/client_handler.py b/hbmqtt/mqtt/protocol/client_handler.py new file mode 100644 index 0000000..52e86b9 --- /dev/null +++ b/hbmqtt/mqtt/protocol/client_handler.py @@ -0,0 +1,152 @@ +# Copyright (c) 2015 Nicolas JOUANIN +# +# See the file license.txt for copying permission. +import logging +import asyncio +from asyncio import futures +from hbmqtt.mqtt.protocol.handler import ProtocolHandler +from hbmqtt.mqtt.packet import MQTTFixedHeader +from hbmqtt.mqtt.packet import PacketType +from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPacket, ConnectPayload +from hbmqtt.mqtt.connack import ConnackPacket +from hbmqtt.mqtt.disconnect import DisconnectPacket +from hbmqtt.mqtt.pingreq import PingReqPacket +from hbmqtt.mqtt.pingresp import PingRespPacket +from hbmqtt.mqtt.subscribe import SubscribePacket +from hbmqtt.mqtt.suback import SubackPacket +from hbmqtt.mqtt.unsubscribe import UnsubscribePacket +from hbmqtt.mqtt.unsuback import UnsubackPacket +from hbmqtt.session import Session + +class ClientProtocolHandler(ProtocolHandler): + def __init__(self, session: Session, config, loop=None): + super().__init__(session, config, loop) + self._ping_task = None + self._connack_waiter = None + self._pingresp_queue = asyncio.Queue() + self._subscriptions_waiter = dict() + self._unsubscriptions_waiter = dict() + + @asyncio.coroutine + def start(self): + yield from super().start() + + @asyncio.coroutine + def stop(self): + yield from super().stop() + if self._ping_task: + try: + self._ping_task.cancel() + except Exception: + pass + + def handle_keepalive(self): + self._ping_task = self._loop.call_soon(asyncio.async, self.mqtt_ping()) + + @asyncio.coroutine + def mqtt_subscribe(self, topics, packet_id): + """ + :param topics: array of topics [{'filter':'/a/b', 'qos': 0x00}, ...] + :return: + """ + subscribe = SubscribePacket.build(topics, packet_id) + yield from self.outgoing_queue.put(subscribe) + waiter = futures.Future(loop=self._loop) + self._subscriptions_waiter[subscribe.variable_header.packet_id] = waiter + return_codes = yield from waiter + del self._subscriptions_waiter[subscribe.variable_header.packet_id] + return return_codes + + @asyncio.coroutine + def handle_suback(self, suback: SubackPacket): + packet_id = suback.variable_header.packet_id + try: + waiter = self._subscriptions_waiter.get(packet_id) + waiter.set_result(suback.payload.return_codes) + except KeyError as ke: + self.logger.warn("Received SUBACK for unknown pending subscription with Id: %s" % packet_id) + + @asyncio.coroutine + def mqtt_unsubscribe(self, topics, packet_id): + """ + + :param topics: array of topics ['/a/b', ...] + :return: + """ + unsubscribe = UnsubscribePacket.build(topics, packet_id) + yield from self.outgoing_queue.put(unsubscribe) + waiter = futures.Future(loop=self._loop) + self._unsubscriptions_waiter[unsubscribe.variable_header.packet_id] = waiter + yield from waiter + del self._unsubscriptions_waiter[unsubscribe.variable_header.packet_id] + + @asyncio.coroutine + def handle_unsuback(self, unsuback: UnsubackPacket): + packet_id = unsuback.variable_header.packet_id + try: + waiter = self._unsubscriptions_waiter.get(packet_id) + waiter.set_result(None) + except KeyError as ke: + self.logger.warn("Received UNSUBACK for unknown pending subscription with Id: %s" % packet_id) + + @asyncio.coroutine + def mqtt_connect(self): + def build_connect_packet(session): + vh = ConnectVariableHeader() + payload = ConnectPayload() + + vh.keep_alive = session.keep_alive + vh.clean_session_flag = session.clean_session + vh.will_retain_flag = session.will_retain + payload.client_id = session.client_id + + if session.username: + vh.username_flag = True + payload.username = session.username + else: + vh.username_flag = False + + if session.password: + vh.password_flag = True + payload.password = session.password + else: + vh.password_flag = False + if session.will_flag: + vh.will_flag = True + vh.will_qos = session.will_qos + payload.will_message = session.will_message + payload.will_topic = session.will_topic + else: + vh.will_flag = False + + header = MQTTFixedHeader(PacketType.CONNECT, 0x00) + packet = ConnectPacket(header, vh, payload) + return packet + + packet = build_connect_packet(self.session) + yield from self.outgoing_queue.put(packet) + self._connack_waiter = futures.Future(loop=self._loop) + return (yield from self._connack_waiter) + + @asyncio.coroutine + def handle_connack(self, connack: ConnackPacket): + self._connack_waiter.set_result(connack.variable_header.return_code) + + @asyncio.coroutine + def mqtt_disconnect(self): + # yield from self.outgoing_queue.join() To be used in Python 3.5 + disconnect_packet = DisconnectPacket() + yield from self.outgoing_queue.put(disconnect_packet) + self._connack_waiter = None + + @asyncio.coroutine + def mqtt_ping(self): + ping_packet = PingReqPacket() + yield from self.outgoing_queue.put(ping_packet) + self._pingresp_waiter = futures.Future(loop=self._loop) + resp = yield from self._pingresp_queue.get() + return resp + + @asyncio.coroutine + def handle_pingresp(self, pingresp: PingRespPacket): + yield from self._pingresp_queue.put(pingresp) diff --git a/hbmqtt/mqtt/protocol.py b/hbmqtt/mqtt/protocol/handler.py similarity index 71% rename from hbmqtt/mqtt/protocol.py rename to hbmqtt/mqtt/protocol/handler.py index 9841e4c..5e544df 100644 --- a/hbmqtt/mqtt/protocol.py +++ b/hbmqtt/mqtt/protocol/handler.py @@ -8,22 +8,18 @@ from hbmqtt.mqtt.packet import MQTTFixedHeader from hbmqtt.mqtt import packet_class from hbmqtt.errors import NoDataException from hbmqtt.mqtt.packet import PacketType -from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPacket, ConnectPayload from hbmqtt.mqtt.connack import ConnackPacket -from hbmqtt.mqtt.disconnect import DisconnectPacket -from hbmqtt.mqtt.pingreq import PingReqPacket from hbmqtt.mqtt.pingresp import PingRespPacket from hbmqtt.mqtt.publish import PublishPacket from hbmqtt.mqtt.pubrel import PubrelPacket from hbmqtt.mqtt.puback import PubackPacket from hbmqtt.mqtt.pubrec import PubrecPacket from hbmqtt.mqtt.pubcomp import PubcompPacket -from hbmqtt.mqtt.subscribe import SubscribePacket from hbmqtt.mqtt.suback import SubackPacket -from hbmqtt.mqtt.unsubscribe import UnsubscribePacket from hbmqtt.mqtt.unsuback import UnsubackPacket from hbmqtt.session import Session -from transitions import Machine, MachineError +from transitions import Machine + class InFlightMessage: states = ['new', 'published', 'acknowledged', 'received', 'released', 'completed'] @@ -308,136 +304,3 @@ class ProtocolHandler: inflight_message.complete() yield from self.delivered_message.put(inflight_message) del self.session.inflight_in[packet_id] - -class ClientProtocolHandler(ProtocolHandler): - def __init__(self, session: Session, config, loop=None): - super().__init__(session, config, loop) - self._ping_task = None - self._connack_waiter = None - self._pingresp_queue = asyncio.Queue() - self._subscriptions_waiter = dict() - self._unsubscriptions_waiter = dict() - - @asyncio.coroutine - def start(self): - yield from super().start() - - @asyncio.coroutine - def stop(self): - yield from super().stop() - if self._ping_task: - try: - self._ping_task.cancel() - except Exception: - pass - - def handle_keepalive(self): - self._ping_task = self._loop.call_soon(asyncio.async, self.mqtt_ping()) - - @asyncio.coroutine - def mqtt_subscribe(self, topics, packet_id): - """ - :param topics: array of topics [{'filter':'/a/b', 'qos': 0x00}, ...] - :return: - """ - subscribe = SubscribePacket.build(topics, packet_id) - yield from self.outgoing_queue.put(subscribe) - waiter = futures.Future(loop=self._loop) - self._subscriptions_waiter[subscribe.variable_header.packet_id] = waiter - return_codes = yield from waiter - del self._subscriptions_waiter[subscribe.variable_header.packet_id] - return return_codes - - @asyncio.coroutine - def handle_suback(self, suback: SubackPacket): - packet_id = suback.variable_header.packet_id - try: - waiter = self._subscriptions_waiter.get(packet_id) - waiter.set_result(suback.payload.return_codes) - except KeyError as ke: - self.logger.warn("Received SUBACK for unknown pending subscription with Id: %s" % packet_id) - - @asyncio.coroutine - def mqtt_unsubscribe(self, topics, packet_id): - """ - - :param topics: array of topics ['/a/b', ...] - :return: - """ - unsubscribe = UnsubscribePacket.build(topics, packet_id) - yield from self.outgoing_queue.put(unsubscribe) - waiter = futures.Future(loop=self._loop) - self._unsubscriptions_waiter[unsubscribe.variable_header.packet_id] = waiter - yield from waiter - del self._unsubscriptions_waiter[unsubscribe.variable_header.packet_id] - - @asyncio.coroutine - def handle_unsuback(self, unsuback: UnsubackPacket): - packet_id = unsuback.variable_header.packet_id - try: - waiter = self._unsubscriptions_waiter.get(packet_id) - waiter.set_result(None) - except KeyError as ke: - self.logger.warn("Received UNSUBACK for unknown pending subscription with Id: %s" % packet_id) - - @asyncio.coroutine - def mqtt_connect(self): - def build_connect_packet(session): - vh = ConnectVariableHeader() - payload = ConnectPayload() - - vh.keep_alive = session.keep_alive - vh.clean_session_flag = session.clean_session - vh.will_retain_flag = session.will_retain - payload.client_id = session.client_id - - if session.username: - vh.username_flag = True - payload.username = session.username - else: - vh.username_flag = False - - if session.password: - vh.password_flag = True - payload.password = session.password - else: - vh.password_flag = False - if session.will_flag: - vh.will_flag = True - vh.will_qos = session.will_qos - payload.will_message = session.will_message - payload.will_topic = session.will_topic - else: - vh.will_flag = False - - header = MQTTFixedHeader(PacketType.CONNECT, 0x00) - packet = ConnectPacket(header, vh, payload) - return packet - - packet = build_connect_packet(self.session) - yield from self.outgoing_queue.put(packet) - self._connack_waiter = futures.Future(loop=self._loop) - return (yield from self._connack_waiter) - - @asyncio.coroutine - def handle_connack(self, connack: ConnackPacket): - self._connack_waiter.set_result(connack.variable_header.return_code) - - @asyncio.coroutine - def mqtt_disconnect(self): - # yield from self.outgoing_queue.join() To be used in Python 3.5 - disconnect_packet = DisconnectPacket() - yield from self.outgoing_queue.put(disconnect_packet) - self._connack_waiter = None - - @asyncio.coroutine - def mqtt_ping(self): - ping_packet = PingReqPacket() - yield from self.outgoing_queue.put(ping_packet) - self._pingresp_waiter = futures.Future(loop=self._loop) - resp = yield from self._pingresp_queue.get() - return resp - - @asyncio.coroutine - def handle_pingresp(self, pingresp: PingRespPacket): - yield from self._pingresp_queue.put(pingresp) diff --git a/samples/client_keepalive.py b/samples/client_keepalive.py index 3e45f7f..2c7d8df 100644 --- a/samples/client_keepalive.py +++ b/samples/client_keepalive.py @@ -1,7 +1,9 @@ import logging -from hbmqtt.client._client import MQTTClient import asyncio +from hbmqtt.client import MQTTClient + + # # This sample shows a client running idle. # Meanwhile, keepalive is managed through PING messages sent every 5 seconds diff --git a/samples/client_publish.py b/samples/client_publish.py index 46be8f4..886eba8 100644 --- a/samples/client_publish.py +++ b/samples/client_publish.py @@ -1,7 +1,9 @@ import logging -from hbmqtt.client._client import MQTTClient import asyncio +from hbmqtt.client import MQTTClient + + # # This sample shows how to publish messages to broker using different QOS diff --git a/samples/client_subscribe.py b/samples/client_subscribe.py index da667b5..e3b1ecd 100644 --- a/samples/client_subscribe.py +++ b/samples/client_subscribe.py @@ -1,7 +1,9 @@ import logging -from hbmqtt.client._client import MQTTClient import asyncio +from hbmqtt.client import MQTTClient + + # # This sample shows how to subscbribe a topic and receive data from incoming messages