Merge branch 'release/v0.1'

pull/8/head
nico 2015-06-30 13:24:34 +02:00
commit e520b62020
20 zmienionych plików z 768 dodań i 265 usunięć

Wyświetl plik

@ -1,3 +1,5 @@
# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
VERSION = (0, 1, 0, 'final', 0)

Wyświetl plik

@ -8,25 +8,19 @@ from transitions import Machine, MachineError
from hbmqtt.utils import not_in_dict_or_none
from hbmqtt.session import Session, SessionState
from hbmqtt.mqtt.connect import ConnectPacket
from hbmqtt.mqtt.connack import ConnackPacket, ReturnCode
from hbmqtt.mqtt.disconnect import DisconnectPacket
from hbmqtt.mqtt.publish import PublishPacket
from hbmqtt.mqtt.puback import PubackPacket
from hbmqtt.mqtt.pubrec import PubrecPacket
from hbmqtt.mqtt.pubrel import PubrelPacket
from hbmqtt.mqtt.pubcomp import PubcompPacket
from hbmqtt.mqtt.pingreq import PingReqPacket
from hbmqtt.mqtt.pingresp import PingRespPacket
from hbmqtt.mqtt.connack import ReturnCode
from hbmqtt.mqtt.subscribe import SubscribePacket
from hbmqtt.mqtt.suback import SubackPacket
from hbmqtt.errors import MQTTException
from hbmqtt.mqtt.protocol import ClientProtocolHandler
_defaults = {
'keep_alive': 60,
'keep_alive': 10,
'ping_delay': 1,
'default_qos': 0,
'default_retain': False
'default_retain': False,
'inflight-polling-interval': 1,
'subscriptions-polling-interval': 1,
}
@ -40,7 +34,7 @@ def gen_client_id():
class MQTTClient:
states = ['new', 'connecting', 'connected', 'idle', 'disconnected']
states = ['new', 'connecting', 'connected', 'disconnected']
def __init__(self, client_id=None, config={}, loop=None):
"""
@ -56,7 +50,7 @@ class MQTTClient:
uri: mqtt:xxx@yyy//localhost:1883/
# OR a mix ot both
keep_alive: 60
clean_session: true
cleansession: true
will:
retain: false
topic: some/topic
@ -85,8 +79,8 @@ class MQTTClient:
self._loop = loop
else:
self._loop = asyncio.get_event_loop()
self._ping_handle = None
self._session = None
self.session = None
self._handler = None
def _init_states(self):
self.machine = Machine(states=MQTTClient.states, initial='new')
@ -94,20 +88,18 @@ class MQTTClient:
self.machine.add_transition(trigger='connect', source='disconnected', dest='connecting')
self.machine.add_transition(trigger='connect_fail', source='connecting', dest='disconnected')
self.machine.add_transition(trigger='connect_success', source='connecting', dest='connected')
self.machine.add_transition(trigger='idle', source='connected', dest='idle')
self.machine.add_transition(trigger='disconnect', source='idle', dest='disconnected')
self.machine.add_transition(trigger='disconnect', source='connected', dest='disconnected')
@asyncio.coroutine
def connect(self, host=None, port=None, username=None, password=None, uri=None, clean_session=None):
def connect(self, host=None, port=None, username=None, password=None, uri=None, cleansession=None):
try:
self.machine.connect()
self._session = self._init_session(host, port, username, password, uri, clean_session)
self.logger.debug("Connect with session parameters: %s" % self._session)
self.session = self._initsession(host, port, username, password, uri, cleansession)
self.logger.debug("Connect with session parameters: %s" % self.session)
yield from self._connect_coro()
self.machine.connect_success()
self._keep_alive()
except MachineError:
msg = "Connect call incompatible with client current state '%s'" % self.machine.current_state
self.logger.warn(msg)
@ -121,37 +113,23 @@ class MQTTClient:
@asyncio.coroutine
def disconnect(self):
try:
self.machine.disconnect()
disconnect_packet = DisconnectPacket()
self.logger.debug(" -out-> " + repr(disconnect_packet))
yield from disconnect_packet.to_stream(self._session.writer)
self._session.writer.close()
yield from self._handler.mqtt_disconnect()
yield from self._handler.stop()
except Exception as e:
self.logger.warn("Unhandled exception: %s" % e)
raise ClientException("Unhandled exception: %s" % e)
except MachineError as me:
self.logger.debug("Invalid method call at this moment: %s" % me)
raise ClientException("Client instance can't be disconnected: %s" % me)
self._loop.stop()
self._session = None
self.session = None
@asyncio.coroutine
def ping(self):
ping_packet = PingReqPacket()
self.logger.debug(" -out-> " + repr(ping_packet))
yield from ping_packet.to_stream(self._session.writer)
response = yield from PingRespPacket.from_stream(self._session.reader)
self.logger.debug(" <-in-- " + repr(response))
self._keep_alive()
def _keep_alive(self):
if self._ping_handle:
try:
self._ping_handle.cancel()
self.logger.debug('Cancel pending ping')
except Exception:
pass
next_ping = self._session.keep_alive-self.config['ping_delay']
if next_ping > 0:
self.logger.debug('Next ping in %d seconds' % next_ping)
self._ping_handle = self._loop.call_later(next_ping, asyncio.async, self.ping())
"""
Send a MQTT ping request and wait for response
:return: None
"""
self._handler.mqtt_ping()
@asyncio.coroutine
def publish(self, topic, message, dup=False, qos=None, retain=None):
@ -183,81 +161,44 @@ class MQTTClient:
@asyncio.coroutine
def _publish_qos_0(self, topic, message, dup, retain):
packet = PublishPacket.build(topic, message, self._session.next_packet_id, dup, 0x00, retain)
self.logger.debug(" -out-> " + repr(packet))
yield from packet.to_stream(self._session.writer)
self._keep_alive()
yield from self._handler.mqtt_publish(topic, message, self.session.next_packet_id, dup, 0x00, retain)
@asyncio.coroutine
def _publish_qos_1(self, topic, message, dup, retain):
packet = PublishPacket.build(topic, message, self._session.next_packet_id, dup, 0x01, retain)
self.logger.debug(" -out-> " + repr(packet))
yield from packet.to_stream(self._session.writer)
puback = yield from PubackPacket.from_stream(self._session.reader)
self.logger.debug(" <-in-- " + repr(puback))
self._keep_alive()
if packet.variable_header.packet_id != puback.variable_header.packet_id:
raise MQTTException("[MQTT-4.3.2-2] Puback packet packet_id doesn't match publish packet")
yield from self._handler.mqtt_publish(topic, message, self.session.next_packet_id, dup, 0x01, retain)
@asyncio.coroutine
def _publish_qos_2(self, topic, message, dup, retain):
publish = PublishPacket.build(topic, message, self._session.next_packet_id, dup, 0x02, retain)
self.logger.debug(" -out-> " + repr(publish))
yield from publish.to_stream(self._session.writer)
pubrec = yield from PubrecPacket.from_stream(self._session.reader)
if publish.variable_header.packet_id != pubrec.variable_header.packet_id:
raise MQTTException("[MQTT-4.3.2-2] Puback packet packet_id doesn't match publish packet")
self.logger.debug(" <-in-- " + repr(pubrec))
pubrel = PubrelPacket.build(pubrec.variable_header.packet_id)
yield from pubrel.to_stream(self._session.writer)
self.logger.debug(" -out-> " + repr(pubrel))
pubcomp = yield from PubcompPacket.from_stream(self._session.reader)
self.logger.debug(" <-in-- " + repr(pubcomp))
if pubrel.variable_header.packet_id != pubcomp.variable_header.packet_id:
raise MQTTException("[MQTT-4.3.2-2] Pubcomp packet packet_id doesn't match pubrel packet")
self._keep_alive()
yield from self._handler.mqtt_publish(topic, message, self.session.next_packet_id, dup, 0x02, retain)
@asyncio.coroutine
def subscribe(self, topics):
subscribe = SubscribePacket.build(topics, self._session.next_packet_id)
yield from subscribe.to_stream(self._session.writer)
self.logger.debug(" -out-> " + repr(subscribe))
yield from self._handler.mqtt_subscribe(topics, self.session.next_packet_id)
suback = yield from SubackPacket.from_stream(self._session.reader)
self.logger.debug(" <-in-- " + repr(suback))
if suback.variable_header.packet_id != subscribe.variable_header.packet_id:
raise MQTTException("[MQTT-4.3.2-2] Suback packet packet_id doesn't match subscribe packet")
self._keep_alive()
@asyncio.coroutine
def unsubscribe(self, topics):
yield from self._handler.mqtt_unsubscribe(topics, self.session.next_packet_id)
@asyncio.coroutine
def _connect_coro(self):
try:
self._session.reader, self._session.writer = \
yield from asyncio.open_connection(self._session.remote_address, self._session.remote_port)
self._session.local_address, self._session.local_port = self._session.writer.get_extra_info('sockname')
self.session.reader, self.session.writer = \
yield from asyncio.open_connection(self.session.remote_address, self.session.remote_port)
self._handler = ClientProtocolHandler(self.session, self.config)
yield from self._handler.start()
# Send CONNECT packet and wait for CONNACK
packet = ConnectPacket.build_request_from_session(self._session)
yield from packet.to_stream(self._session.writer)
self.logger.debug(" -out-> " + repr(packet))
return_code = yield from self._handler.mqtt_connect()
connack = yield from ConnackPacket.from_stream(self._session.reader)
self.logger.debug(" <-in-- " + repr(connack))
if connack.variable_header.return_code is not ReturnCode.CONNECTION_ACCEPTED:
raise ClientException("Connection rejected with code '%s'" % hex(connack.variable_header.return_code))
if return_code is not ReturnCode.CONNECTION_ACCEPTED:
raise ClientException("Connection rejected with code '%s'" % hex(return_code))
self._session.state = SessionState.CONNECTED
self.logger.debug("connected to %s:%s" % (self._session.remote_address, self._session.remote_port))
self.session.state = SessionState.CONNECTED
self.logger.debug("connected to %s:%s" % (self.session.remote_address, self.session.remote_port))
except Exception as e:
self._session.state = SessionState.DISCONNECTED
self.session.state = SessionState.DISCONNECTED
raise e
def _init_session(self, host=None, port=None, username=None, password=None, uri=None, clean_session=None) -> dict:
def _initsession(self, host=None, port=None, username=None, password=None, uri=None, cleansession=None) -> dict:
# Load config
broker_conf = self.config.get('broker', dict()).copy()
if 'mqtt' not in broker_conf:
@ -287,8 +228,8 @@ class MQTTClient:
broker_conf['username'] = username
if password:
broker_conf['password'] = password
if clean_session is not None:
broker_conf['clean_session'] = clean_session
if cleansession is not None:
broker_conf['cleansession'] = cleansession
for key in ['scheme', 'host', 'port']:
if not_in_dict_or_none(broker_conf, key):
@ -301,10 +242,10 @@ class MQTTClient:
s.username = broker_conf['username']
s.password = broker_conf['password']
s.scheme = broker_conf['scheme']
if clean_session is not None:
s.clean_session = clean_session
if cleansession is not None:
s.cleansession = cleansession
else:
s.clean_session = self.config.get('clean_session', True)
s.cleansession = self.config.get('cleansession', True)
s.keep_alive = self.config['keep_alive']
if 'will' in self.config:
s.will_flag = True

Wyświetl plik

@ -1,12 +1,43 @@
# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
from hbmqtt.errors import HBMQTTException
from hbmqtt.mqtt.packet import MQTTFixedHeader, MQTTPacket, PacketType
from hbmqtt.mqtt.connect import ConnectPacket
from hbmqtt.mqtt.connack import ConnackPacket
from hbmqtt.mqtt.disconnect import DisconnectPacket
from hbmqtt.mqtt.pingreq import PingReqPacket
from hbmqtt.mqtt.pingresp import PingRespPacket
from hbmqtt.mqtt.publish import PublishPacket
from hbmqtt.mqtt.puback import PubackPacket
from hbmqtt.mqtt.pubrec import PubrecPacket
from hbmqtt.mqtt.pubrel import PubrelPacket
from hbmqtt.mqtt.pubcomp import PubcompPacket
from hbmqtt.mqtt.subscribe import SubscribePacket
from hbmqtt.mqtt.suback import SubackPacket
from hbmqtt.mqtt.unsubscribe import UnsubscribePacket
from hbmqtt.mqtt.unsuback import UnsubackPacket
packet_dict = {
PacketType.CONNECT: ConnectPacket,
PacketType.CONNACK: ConnackPacket,
PacketType.PUBLISH: PublishPacket,
PacketType.PUBACK: PubackPacket,
PacketType.PUBREC: PubrecPacket,
PacketType.PUBREL: PubrelPacket,
PacketType.PUBCOMP: PubcompPacket,
PacketType.SUBSCRIBE: SubscribePacket,
PacketType.SUBACK: SubackPacket,
PacketType.UNSUBSCRIBE: UnsubscribePacket,
PacketType.UNSUBACK: UnsubackPacket,
PacketType.PINGREQ: PingReqPacket,
PacketType.PINGRESP: PingRespPacket,
PacketType.DISCONNECT: DisconnectPacket
}
def packet_class(fixed_header: MQTTFixedHeader):
if fixed_header.packet_type == PacketType.CONNECT:
return ConnectPacket
if fixed_header.packet_type == PacketType.CONNACK:
return ConnackPacket
try:
cls = packet_dict[fixed_header.packet_type]
return cls
except KeyError:
raise HBMQTTException("Unexpected packet Type '%s'" % fixed_header.packet_type)

Wyświetl plik

@ -4,7 +4,6 @@
from hbmqtt.mqtt.packet import MQTTPacket, MQTTFixedHeader, PacketType, MQTTVariableHeader, MQTTPayload
from hbmqtt.codecs import *
from hbmqtt.errors import MQTTException, CodecException, HBMQTTException, NoDataException
from hbmqtt.session import Session
class ConnectVariableHeader(MQTTVariableHeader):
@ -199,7 +198,7 @@ class ConnectPacket(MQTTPacket):
VARIABLE_HEADER = ConnectVariableHeader
PAYLOAD = ConnectPayload
def __init__(self, fixed: MQTTFixedHeader, vh: ConnectVariableHeader, payload: ConnectPayload):
def __init__(self, fixed: MQTTFixedHeader=None, vh: ConnectVariableHeader=None, payload: ConnectPayload=None):
if fixed is None:
header = MQTTFixedHeader(PacketType.CONNECT, 0x00)
else:
@ -209,36 +208,3 @@ class ConnectPacket(MQTTPacket):
super().__init__(header)
self.variable_header = vh
self.payload = payload
@classmethod
def build_request_from_session(cls, session: Session):
vh = ConnectVariableHeader()
payload = ConnectPayload()
vh.keep_alive = session.keep_alive
vh.clean_session_flag = session.clean_session
vh.will_retain_flag = session.will_retain
payload.client_id = session.client_id
if session.username:
vh.username_flag = True
payload.username = session.username
else:
vh.username_flag = False
if session.password:
vh.password_flag = True
payload.password = session.password
else:
vh.password_flag = False
if session.will_flag:
vh.will_flag = True
vh.will_qos = session.will_qos
payload.will_message = session.will_message
payload.will_topic = session.will_topic
else:
vh.will_flag = False
header = MQTTFixedHeader(PacketType.CONNECT, 0x00)
packet = cls(header, vh, payload)
return packet

Wyświetl plik

@ -110,14 +110,17 @@ class MQTTFixedHeader:
raise MQTTException("Invalid remaining length bytes:%s" % bytes_to_hex_str(length_bytes))
return value
b1 = yield from read_or_raise(reader, 1)
msg_type = decode_message_type(b1)
if msg_type is PacketType.RESERVED_0 or msg_type is PacketType.RESERVED_15:
raise MQTTException("Usage of control packet type %s is forbidden" % msg_type)
flags = decode_flags(b1)
try:
b1 = yield from read_or_raise(reader, 1)
msg_type = decode_message_type(b1)
if msg_type is PacketType.RESERVED_0 or msg_type is PacketType.RESERVED_15:
raise MQTTException("Usage of control packet type %s is forbidden" % msg_type)
flags = decode_flags(b1)
remain_length = yield from decode_remaining_length()
return cls(msg_type, flags, remain_length)
remain_length = yield from decode_remaining_length()
return cls(msg_type, flags, remain_length)
except NoDataException:
return None
def __repr__(self):
return type(self).__name__ + '(type={0}, length={1}, flags={2})'.format(self.packet_type, self.remaining_length, hex(self.flags))

Wyświetl plik

@ -0,0 +1,434 @@
# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
import logging
import asyncio
from hbmqtt.mqtt.packet import MQTTFixedHeader
from hbmqtt.mqtt import packet_class
from hbmqtt.errors import NoDataException
from hbmqtt.mqtt.packet import PacketType
from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPacket, ConnectPayload
from hbmqtt.mqtt.disconnect import DisconnectPacket
from hbmqtt.mqtt.pingreq import PingReqPacket
from hbmqtt.mqtt.publish import PublishPacket
from hbmqtt.mqtt.pubrel import PubrelPacket
from hbmqtt.mqtt.subscribe import SubscribePacket
from hbmqtt.mqtt.unsubscribe import UnsubscribePacket
from hbmqtt.session import Session
from blinker import Signal
from transitions import Machine, MachineError
class InFlightMessage:
states = ['new', 'published', 'acknowledged', 'received', 'released', 'completed']
def __init__(self, packet_id, qos):
self.packet_id = packet_id
self.qos = qos
self._init_states()
def _init_states(self):
self.machine = Machine(model=self, states=InFlightMessage.states, initial='new')
self.machine.add_transition(trigger='publish', source='new', dest='published')
if self.qos == 0x01:
self.machine.add_transition(trigger='acknowledge', source='published', dest='acknowledged')
if self.qos == 0x02:
self.machine.add_transition(trigger='receive', source='published', dest='received')
self.machine.add_transition(trigger='release', source='received', dest='released')
self.machine.add_transition(trigger='complete', source='released', dest='completed')
class ProtocolHandler:
"""
Class implementing the MQTT communication protocol using asyncio features
"""
packet_sent = Signal()
packet_received = Signal()
def __init__(self, session: Session, config, loop=None):
self.logger = logging.getLogger(__name__)
self.session = session
self.config = config
if loop is None:
self._loop = asyncio.get_event_loop()
else:
self._loop = loop
self._reader_task = None
self._writer_task = None
self._inflight_task = None
self._reader_ready = asyncio.Event(loop=self._loop)
self._writer_ready = asyncio.Event(loop=self._loop)
self._inflight_ready = asyncio.Event(loop=self._loop)
self._inflight_changed = asyncio.Condition(loop=self._loop)
self._running = False
self.session.local_address, self.session.local_port = self.session.writer.get_extra_info('sockname')
self.incoming_queues = dict()
for p in PacketType:
self.incoming_queues[p] = asyncio.Queue()
self.outgoing_queue = asyncio.Queue()
self.inflight_messages = dict()
@asyncio.coroutine
def start(self):
self._running = True
self._reader_task = asyncio.async(self._reader_coro(), loop=self._loop)
self._writer_task = asyncio.async(self._writer_coro(), loop=self._loop)
self._inflight_task = asyncio.async(self._inflight_coro(), loop=self._loop)
yield from asyncio.wait(
[self._reader_ready.wait(), self._writer_ready.wait(), self._inflight_ready.wait()], loop=self._loop)
self.logger.debug("Handler tasks started")
@asyncio.coroutine
def mqtt_publish(self, topic, message, packet_id, dup, qos, retain):
def qos_0_predicate():
ret = False
try:
if self.inflight_messages.get(packet_id).state == 'published':
ret = True
#self.logger.debug("qos_0 predicate return %s" % ret)
return ret
except KeyError:
return False
def qos_1_predicate():
ret = False
try:
if self.inflight_messages.get(packet_id).state == 'acknowledged':
ret = True
#self.logger.debug("qos_1 predicate return %s" % ret)
return ret
except KeyError:
return False
def qos_2_predicate():
ret = False
try:
if self.inflight_messages.get(packet_id).state == 'completed':
ret = True
#self.logger.debug("qos_1 predicate return %s" % ret)
return ret
except KeyError:
return False
if packet_id in self.inflight_messages:
self.logger.warn("A message with the same packet ID is already in flight")
packet = PublishPacket.build(topic, message, packet_id, dup, qos, retain)
yield from self.outgoing_queue.put(packet)
inflight_message = InFlightMessage(packet.variable_header.packet_id, qos)
inflight_message.publish()
self.inflight_messages[packet.variable_header.packet_id] = inflight_message
yield from self._inflight_changed.acquire()
if qos == 0x00:
yield from self._inflight_changed.wait_for(qos_0_predicate)
if qos == 0x01:
yield from self._inflight_changed.wait_for(qos_1_predicate)
if qos == 0x02:
yield from self._inflight_changed.wait_for(qos_2_predicate)
self.inflight_messages.pop(packet.variable_header.packet_id)
self._inflight_changed.release()
return packet
@asyncio.coroutine
def stop(self):
self._running = False
self.session.reader.feed_eof()
yield from asyncio.wait([self._inflight_task, self._writer_task, self._reader_task], loop=self._loop)
@asyncio.coroutine
def _reader_coro(self):
self.logger.debug("Starting reader coro")
while self._running:
try:
self._reader_ready.set()
fixed_header = yield from asyncio.wait_for(MQTTFixedHeader.from_stream(self.session.reader), 5)
if fixed_header:
cls = packet_class(fixed_header)
packet = yield from cls.from_stream(self.session.reader, fixed_header=fixed_header)
self.logger.debug(" <-in-- " + repr(packet))
yield from self.incoming_queues[packet.fixed_header.packet_type].put(packet)
self.packet_received.send(packet)
else:
self.logger.debug("No more data, stopping reader coro")
break
except asyncio.TimeoutError:
self.logger.debug("Input stream read timeout")
except NoDataException as nde:
self.logger.debug("No data available")
except Exception as e:
self.logger.warn("Unhandled exception in reader coro: %s" % e)
break
self.logger.debug("Reader coro stopped")
@asyncio.coroutine
def _writer_coro(self):
self.logger.debug("Starting writer coro")
while self._running:
try:
self._writer_ready.set()
packet = yield from asyncio.wait_for(self.outgoing_queue.get(), 5)
yield from packet.to_stream(self.session.writer)
self.logger.debug(" -out-> " + repr(packet))
yield from self.session.writer.drain()
self.packet_sent.send(packet)
except asyncio.TimeoutError as ce:
self.logger.debug("Output queue get timeout")
except Exception as e:
self.logger.warn("Unhandled exception in writer coro: %s" % e)
break
self.logger.debug("Writer coro stopping")
# Flush queue before stopping
if not self.outgoing_queue.empty():
while True:
try:
packet = self.outgoing_queue.get_nowait()
yield from packet.to_stream(self.session.writer)
self.logger.debug(" -out-> " + repr(packet))
except asyncio.QueueEmpty:
break
except Exception as e:
self.logger.warn("Unhandled exception in writer coro: %s" % e)
self.logger.debug("Writer coro stopped")
@asyncio.coroutine
def _inflight_coro(self):
self.logger.debug("Starting in-flight messages polling coro")
while self._running:
self._inflight_ready.set()
yield from asyncio.sleep(self.config['inflight-polling-interval'])
self.logger.debug("in-flight polling coro wake-up")
try:
while not self.incoming_queues[PacketType.PUBACK].empty():
packet = self.incoming_queues[PacketType.PUBACK].get_nowait()
packet_id = packet.variable_header.packet_id
inflight_message = self.inflight_messages.get(packet_id)
inflight_message.acknowledge()
self.logger.debug("Message with packet Id=%s acknowledged" % packet_id)
while not self.incoming_queues[PacketType.PUBREC].empty():
packet = self.incoming_queues[PacketType.PUBREC].get_nowait()
packet_id = packet.variable_header.packet_id
inflight_message = self.inflight_messages.get(packet_id)
inflight_message.receive()
self.logger.debug("Message with packet Id=%s received" % packet_id)
rel_packet = PubrelPacket.build(packet_id)
yield from self.outgoing_queue.put(rel_packet)
inflight_message.release()
self.logger.debug("Message with packet Id=%s released" % packet_id)
while not self.incoming_queues[PacketType.PUBCOMP].empty():
packet = self.incoming_queues[PacketType.PUBCOMP].get_nowait()
packet_id = packet.variable_header.packet_id
inflight_message = self.inflight_messages.get(packet_id)
inflight_message.complete()
self.logger.debug("Message with packet Id=%s completed" % packet_id)
yield from self._inflight_changed.acquire()
self._inflight_changed.notify_all()
self._inflight_changed.release()
except KeyError:
self.logger.warn("Received %s for unknown inflight message Id %d" % (packet.fixed_header.packet_type, packet_id))
except MachineError as me:
self.logger.warn("Packet type incompatible with message QOS: %s" % me)
self.logger.debug("In-flight messages polling coro stopped")
class Subscription:
states = ['new', 'subscribed', 'acknowledged']
def __init__(self, packet_id, topics):
self.topics = topics
self.packet_id = packet_id
self._init_states()
def _init_states(self):
self.machine = Machine(model=self, states=Subscription.states, initial='new')
self.machine.add_transition(trigger='subscribe', source='new', dest='subscribed')
self.machine.add_transition(trigger='acknowledge', source='subscribed', dest='acknowledged')
class UnSubscription:
states = ['new', 'unsubscribed', 'acknowledged']
def __init__(self, packet_id, topics):
self.topics = topics
self.packet_id = packet_id
self._init_states()
def _init_states(self):
self.machine = Machine(model=self, states=UnSubscription.states, initial='new')
self.machine.add_transition(trigger='unsubscribe', source='new', dest='unsubscribed')
self.machine.add_transition(trigger='acknowledge', source='unsubscribed', dest='acknowledged')
class ClientProtocolHandler(ProtocolHandler):
def __init__(self, session: Session, config, loop=None):
super().__init__(session, config, loop)
self._ping_task = None
self.subscriptions = dict()
self._subscription_task = None
self._subscriptions_changed = asyncio.Condition(loop=self._loop)
self._subscriptions_ready = asyncio.Event(loop=self._loop)
@asyncio.coroutine
def start(self):
yield from super().start()
self.packet_sent.connect(self._do_keepalive)
self._subscription_task = asyncio.async(self._subscriptions_coro(), loop=self._loop)
yield from asyncio.wait([self._subscriptions_ready.wait()], loop=self._loop)
@asyncio.coroutine
def stop(self):
yield from super().stop()
yield from asyncio.wait([self._subscription_task], loop=self._loop)
if self._ping_task:
try:
self._ping_task.cancel()
except Exception:
pass
def _do_keepalive(self, message):
if self._ping_task:
try:
self._ping_task.cancel()
except Exception:
pass
next_ping = self.session.keep_alive - self.config['ping_delay']
if next_ping > 0:
self.logger.debug('Next ping in %d seconds if no new messages between' % next_ping)
self._ping_task = self._loop.call_later(next_ping, asyncio.async, self.mqtt_ping())
def _subscriptions_coro(self):
self.logger.debug("Starting subscriptions polling coro")
while self._running:
self._subscriptions_ready.set()
yield from asyncio.sleep(self.config['subscriptions-polling-interval'])
self.logger.debug("Subscriptions polling coro wake-up")
try:
while not self.incoming_queues[PacketType.SUBACK].empty():
packet = self.incoming_queues[PacketType.SUBACK].get_nowait()
packet_id = packet.variable_header.packet_id
subscription = self.subscriptions.get(packet_id)
for i in range(len(subscription.topics)):
subscription.topics[i]['return_code'] = packet.payload.return_codes[i]
subscription.acknowledge()
self.logger.debug("Subscription with packet Id=%s acknowledged" % packet_id)
while not self.incoming_queues[PacketType.UNSUBACK].empty():
packet = self.incoming_queues[PacketType.UNSUBACK].get_nowait()
packet_id = packet.variable_header.packet_id
subscription = self.subscriptions.get(packet_id)
subscription.acknowledge()
self.logger.debug("Unsubscription with packet Id=%s acknowledged" % packet_id)
yield from self._subscriptions_changed.acquire()
self._subscriptions_changed.notify_all()
self._subscriptions_changed.release()
except KeyError:
self.logger.warn("Received %s for unknown subscription message Id %d" % (packet.fixed_header.packet_type, packet_id))
except MachineError as me:
self.logger.warn("Packet type incompatible with message QOS: %s" % me)
self.logger.debug("Subscriptions polling coro stopped")
@asyncio.coroutine
def mqtt_subscribe(self, topics, packet_id):
"""
:param topics: array of topics [{'filter':'/a/b', 'qos': 0x00}, ...]
:return:
"""
def acknowledged_predicate():
if self.subscriptions[subscribe.variable_header.packet_id].state == 'acknowledged':
return True
else:
return False
subscribe = SubscribePacket.build(topics, packet_id)
yield from self.outgoing_queue.put(subscribe)
subscription = Subscription(subscribe.variable_header.packet_id, topics)
subscription.subscribe()
self.subscriptions[subscribe.variable_header.packet_id] = subscription
yield from self._subscriptions_changed.acquire()
yield from self._subscriptions_changed.wait_for(acknowledged_predicate)
subscription = self.subscriptions.pop(subscribe.variable_header.packet_id)
self._subscriptions_changed.release()
return subscription
@asyncio.coroutine
def mqtt_unsubscribe(self, topics, packet_id):
"""
:param topics: array of topics ['/a/b', ...]
:return:
"""
def acknowledged_predicate():
if self.subscriptions[unsubscribe.variable_header.packet_id].state == 'acknowledged':
return True
else:
return False
unsubscribe = UnsubscribePacket.build(topics, packet_id)
yield from self.outgoing_queue.put(unsubscribe)
subscription = UnSubscription(unsubscribe.variable_header.packet_id, topics)
subscription.unsubscribe()
self.subscriptions[unsubscribe.variable_header.packet_id] = subscription
self.subscriptions[unsubscribe.variable_header.packet_id] = subscription
yield from self._subscriptions_changed.acquire()
yield from self._subscriptions_changed.wait_for(acknowledged_predicate)
subscription = self.subscriptions.pop(unsubscribe.variable_header.packet_id)
self._subscriptions_changed.release()
return subscription
@asyncio.coroutine
def mqtt_connect(self):
def build_connect_packet(session):
vh = ConnectVariableHeader()
payload = ConnectPayload()
vh.keep_alive = session.keep_alive
vh.clean_session_flag = session.clean_session
vh.will_retain_flag = session.will_retain
payload.client_id = session.client_id
if session.username:
vh.username_flag = True
payload.username = session.username
else:
vh.username_flag = False
if session.password:
vh.password_flag = True
payload.password = session.password
else:
vh.password_flag = False
if session.will_flag:
vh.will_flag = True
vh.will_qos = session.will_qos
payload.will_message = session.will_message
payload.will_topic = session.will_topic
else:
vh.will_flag = False
header = MQTTFixedHeader(PacketType.CONNECT, 0x00)
packet = ConnectPacket(header, vh, payload)
return packet
packet = build_connect_packet(self.session)
yield from self.outgoing_queue.put(packet)
connack = yield from self.incoming_queues[PacketType.CONNACK].get()
return connack.variable_header.return_code
@asyncio.coroutine
def mqtt_disconnect(self):
disconnect_packet = DisconnectPacket()
yield from self.outgoing_queue.put(disconnect_packet)
self._ping_task.cancel()
@asyncio.coroutine
def mqtt_ping(self):
self.logger.debug("Pinging ...")
ping_packet = PingReqPacket()
yield from self.outgoing_queue.put(ping_packet)
yield from self.incoming_queues[PacketType.PINGRESP].get()

Wyświetl plik

@ -37,7 +37,7 @@ class UnsubscribePacket(MQTTPacket):
def __init__(self, fixed: MQTTFixedHeader=None, variable_header: PacketIdVariableHeader=None, payload=None):
if fixed is None:
header = MQTTFixedHeader(PacketType.UNSUBSCRIBE, 0x00)
header = MQTTFixedHeader(PacketType.UNSUBSCRIBE, 0x02) # [MQTT-3.10.1-1]
else:
if fixed.packet_type is not PacketType.UNSUBSCRIBE:
raise HBMQTTException("Invalid fixed packet type %s for UnsubscribePacket init" % fixed.packet_type)
@ -46,3 +46,9 @@ class UnsubscribePacket(MQTTPacket):
super().__init__(header)
self.variable_header = variable_header
self.payload = payload
@classmethod
def build(cls, topics, packet_id):
v_header = PacketIdVariableHeader(packet_id)
payload = UnubscribePayload(topics)
return UnsubscribePacket(variable_header=v_header, payload=payload)

Wyświetl plik

@ -1,74 +0,0 @@
# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
import logging
import asyncio
import threading
from hbmqtt.session import Session
from hbmqtt.mqtt.packet import MQTTFixedHeader
from hbmqtt.mqtt import packet_class
class ProtoThread(threading.Thread):
def __init__(self, session: Session, loop: asyncio.BaseEventLoop):
super().__init__(name="MQTT Protocol communication thread")
self.logger = logging.getLogger(__name__)
self._loop = loop
self._session = session
def run(self):
asyncio.set_event_loop(self._loop)
self._loop.call_soon(asyncio.async, self._read_protocol())
if not self._loop.is_running():
self._loop.run_forever()
@asyncio.coroutine
def _read_protocol(self):
while true:
class ProtocolHandler:
"""
Class implementing the MQTT communication protocol using asyncio features
"""
def __init__(self, session: Session, loop: asyncio.BaseEventLoop):
self.logger = logging.getLogger(__name__)
self.session = session
self._loop = loop
self._reader_task = None
self._writer_task = None
def start(self):
self._reader_task = asyncio.async(self._writer_coro, loop=self._loop)
self._writer_task = asyncio.async(self._reader_coro, loop=self._loop)
def stop(self):
self._reader_task.cancel()
self._writer_task.cancel()
@asyncio.coroutine
def _reader_coro(self):
self.logger.debug("Starting reader coro")
while True:
try:
fixed_header = yield from MQTTFixedHeader.from_stream(self.session.reader)
packet = packet_class(fixed_header).from_stream(self.session.reader, fixed_header=fixed_header)
except asyncio.CancelledError:
self.logger.warn("Reader coro stopping")
break
except Exception as e:
self.logger.warn("Exception in reader coro: %s" % e)
break
@asyncio.coroutine
def _writer_coro(self):
self.logger.debug("Starting writer coro")
out_queue = self.session._out_queue
while True:
try:
packet = yield from out_queue.get()
yield from packet.to_stream(self.session.writer)
except asyncio.CancelledError:
self.logger.warn("Reader coro stopping")
break
except Exception as e:
self.logger.warn("Exception in writer coro: %s" % e)
break

Wyświetl plik

@ -1,7 +1,3 @@
# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
import asyncio
from enum import Enum
class SessionState(Enum):
@ -31,8 +27,6 @@ class Session:
self.scheme = None
self._packet_id = 0
self._out_queue = asyncio.Queue()
@property
def next_packet_id(self):
self._packet_id += 1

54
hbmqtt/version.py 100644
Wyświetl plik

@ -0,0 +1,54 @@
# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
import datetime
import os
import subprocess
# Version Management from https://gist.github.com/gilsondev/2790884
def get_version(version=None):
"Returns a PEP 386-compliant version number from VERSION."
if version is None:
from hbmqtt import VERSION as version
else:
assert len(version) == 5
assert version[3] in ('alpha', 'beta', 'rc', 'final')
# Now build the two parts of the version number:
# main = X.Y[.Z]
# sub = .devN - for pre-alpha releases
# | {a|b|c}N - for alpha, beta and rc releases
parts = 2 if version[2] == 0 else 3
main = '.'.join(str(x) for x in version[:parts])
sub = ''
if version[3] == 'alpha' and version[4] == 0:
git_changeset = get_git_changeset()
if git_changeset:
sub = '.dev%s' % git_changeset
elif version[3] != 'final':
mapping = {'alpha': 'a', 'beta': 'b', 'rc': 'c'}
sub = mapping[version[3]] + str(version[4])
return str(main + sub)
def get_git_changeset():
"""Returns a numeric identifier of the latest git changeset.
The result is the UTC timestamp of the changeset in YYYYMMDDHHMMSS format.
This value isn't guaranteed to be unique, but collisions are very unlikely,
so it's sufficient for generating the development version numbers.
"""
repo_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
git_log = subprocess.Popen('git log --pretty=format:%ct --quiet -1 HEAD',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True, cwd=repo_dir, universal_newlines=True)
timestamp = git_log.communicate()[0]
try:
timestamp = datetime.datetime.utcfromtimestamp(int(timestamp))
except ValueError:
return None
return timestamp.strftime('%Y%m%d%H%M%S')

Wyświetl plik

@ -1 +1,2 @@
transitions
transitions
blinker

Wyświetl plik

@ -0,0 +1,51 @@
import logging
from hbmqtt.client._client import MQTTClient
import asyncio
logger = logging.getLogger(__name__)
C = MQTTClient()
@asyncio.coroutine
def test_coro():
yield from C.connect(uri='mqtt://iot.eclipse.org:1883/', username='testuser', password="passwd")
tasks = [
asyncio.async(C.publish('a/b', b'0123456789')),
asyncio.async(C.publish('a/b', b'0', qos=0x01)),
asyncio.async(C.publish('a/b', b'1', qos=0x01)),
asyncio.async(C.publish('a/b', b'2', qos=0x01)),
asyncio.async(C.publish('a/b', b'3', qos=0x01)),
asyncio.async(C.publish('a/b', b'4', qos=0x01)),
asyncio.async(C.publish('a/b', b'5', qos=0x01)),
asyncio.async(C.publish('a/b', b'6', qos=0x01)),
asyncio.async(C.publish('a/b', b'7', qos=0x01)),
asyncio.async(C.publish('a/b', b'8', qos=0x01)),
asyncio.async(C.publish('a/b', b'9', qos=0x01)),
asyncio.async(C.publish('a/b', b'0', qos=0x02)),
asyncio.async(C.publish('a/b', b'1', qos=0x02)),
asyncio.async(C.publish('a/b', b'2', qos=0x02)),
asyncio.async(C.publish('a/b', b'3', qos=0x02)),
asyncio.async(C.publish('a/b', b'4', qos=0x02)),
asyncio.async(C.publish('a/b', b'5', qos=0x02)),
asyncio.async(C.publish('a/b', b'6', qos=0x02)),
asyncio.async(C.publish('a/b', b'7', qos=0x02)),
asyncio.async(C.publish('a/b', b'8', qos=0x02)),
asyncio.async(C.publish('a/b', b'9', qos=0x02)),
]
yield from asyncio.wait(tasks)
logger.info("messages published")
yield from C.subscribe([
{'filter': 'a/b', 'qos': 0x01},
{'filter': 'c/d', 'qos': 0x02}
])
logger.info("Subscribed")
yield from C.unsubscribe(['a/b', 'c/d'])
logger.info("Unsubscribed")
yield from C.disconnect()
if __name__ == '__main__':
formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())

Wyświetl plik

@ -1,3 +1,29 @@
# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
# See the file license.txt for copying permission.
from setuptools import setup, find_packages
from hbmqtt.version import get_version
setup(
name="hbmqtt",
version=get_version(),
description="HBMQTT - HomeBrew MQTT\nclient/brocker using Python 3.4 asyncio library",
author="Nicolas Jouanin",
author_email='nico@beerfactory.org',
url="https://github.com/beerfactory/hbmqtt",
license='MIT',
packages=find_packages(exclude=['tests']),
install_requires=['transitions', 'blinker'],
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: POSIX',
'Operating System :: MacOS',
'Operating System :: Microsoft :: Windows',
'Programming Language :: Python :: 3.4',
'Topic :: Communications',
'Topic :: Internet'
]
)

Wyświetl plik

@ -1,23 +0,0 @@
import logging
from hbmqtt.client._client import MQTTClient
import asyncio
C=MQTTClient()
@asyncio.coroutine
def test_coro():
yield from C.connect(uri='mqtt://iot.eclipse.org:1883/', username='testuser', password="passwd")
yield from asyncio.sleep(1)
yield from C.publish('a/b', b'0123456789')
yield from C.publish('a/b', b'0123456789', qos=0x01)
yield from C.publish('a/b', b'0123456789', qos=0x02)
yield from C.subscribe([
{'filter': 'a/b', 'qos': 0x01},
{'filter': 'c/d', 'qos': 0x02}
])
#yield from asyncio.sleep(10)
yield from C.disconnect()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
asyncio.get_event_loop().run_until_complete(test_coro())

Wyświetl plik

@ -0,0 +1,91 @@
# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
import unittest
import asyncio
from hbmqtt.mqtt.connect import ConnectPacket, ConnectVariableHeader, ConnectPayload
from hbmqtt.mqtt.protocol import ProtocolHandler
from hbmqtt.session import Session
from hbmqtt.mqtt.packet import PacketType
import logging
logging.basicConfig(level=logging.DEBUG)
ret_packet = None
config = {
'keep_alive': 10,
'ping_delay': 1,
'default_qos': 0,
'default_retain': False,
'inflight-polling-interval': 1,
'subscriptions-polling-interval': 1,
}
class ConnectPacketTest(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
self.logger = logging.getLogger(__name__)
def test_read_loop(self):
data = b'\x10\x3e\x00\x04MQTT\x04\xce\x00\x00\x00\x0a0123456789\x00\x09WillTopic\x00\x0bWillMessage\x00\x04user\x00\x08password'
@asyncio.coroutine
def serve_test(reader, writer):
writer.write(data)
yield from writer.drain()
writer.close()
loop = asyncio.get_event_loop()
coro = asyncio.start_server(serve_test, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)
@asyncio.coroutine
def client():
S = Session()
S.reader, S.writer = yield from asyncio.open_connection('127.0.0.1', 8888,
loop=loop)
h = ProtocolHandler(S, config)
yield from h.start()
incoming_packet = yield from h.incoming_queues[PacketType.CONNECT].get()
yield from h.stop()
return incoming_packet
packet = loop.run_until_complete(client())
server.close()
self.assertEquals(packet.fixed_header.packet_type, PacketType.CONNECT)
def test_write_loop(self):
test_packet = ConnectPacket(vh=ConnectVariableHeader(), payload=ConnectPayload('Id', 'WillTopic', 'WillMessage', 'user', 'password'))
event=asyncio.Event()
@asyncio.coroutine
def serve_test(reader, writer):
global ret_packet
packet = yield from ConnectPacket.from_stream(reader)
ret_packet = packet
writer.close()
event.set()
@asyncio.coroutine
def client():
S = Session()
S.reader, S.writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=loop)
h = ProtocolHandler(S, config)
yield from h.start()
yield from h.outgoing_queue.put(test_packet)
yield from h.stop()
# Start server
loop = asyncio.get_event_loop()
coro = asyncio.start_server(serve_test, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)
# Schedule client
loop.call_soon(asyncio.async, client())
# Wait for server to complete client request
loop.run_until_complete(asyncio.wait([event.wait()]))
server.close()
self.logger.info(ret_packet)
self.assertEquals(ret_packet.fixed_header.packet_type, PacketType.CONNECT)

Wyświetl plik

@ -21,4 +21,4 @@ class PubrelPacketTest(unittest.TestCase):
variable_header = PacketIdVariableHeader(10)
publish = PubrelPacket(variable_header=variable_header)
out = publish.to_bytes()
self.assertEqual(out, b'\x60\x02\x00\x0a')
self.assertEqual(out, b'\x62\x02\x00\x0a')

Wyświetl plik

@ -7,7 +7,7 @@ from hbmqtt.mqtt.suback import SubackPacket, SubackPayload
from hbmqtt.mqtt.packet import PacketIdVariableHeader
from hbmqtt.codecs import *
class SubscribePacketTest(unittest.TestCase):
class SubackPacketTest(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()

Wyświetl plik

@ -31,4 +31,4 @@ class SubscribePacketTest(unittest.TestCase):
])
publish = SubscribePacket(variable_header=variable_header, payload=payload)
out = publish.to_bytes()
self.assertEqual(out, b'\x80\x0e\x00\x0a\x00\x03a/b\x01\x00\x03c/d\x02')
self.assertEqual(out, b'\x82\x0e\x00\x0a\x00\x03a/b\x01\x00\x03c/d\x02')

Wyświetl plik

@ -7,7 +7,7 @@ from hbmqtt.mqtt.unsuback import UnsubackPacket
from hbmqtt.mqtt.packet import PacketIdVariableHeader
from hbmqtt.codecs import *
class SubscribePacketTest(unittest.TestCase):
class UnsubackPacketTest(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()

Wyświetl plik

@ -7,12 +7,12 @@ from hbmqtt.mqtt.unsubscribe import UnsubscribePacket, UnubscribePayload
from hbmqtt.mqtt.packet import PacketIdVariableHeader
from hbmqtt.codecs import *
class SubscribePacketTest(unittest.TestCase):
class UnsubscribePacketTest(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
def test_from_stream(self):
data = b'\xa0\x0c\x00\n\x00\x03a/b\x00\x03c/d'
data = b'\xa2\x0c\x00\n\x00\x03a/b\x00\x03c/d'
stream = asyncio.StreamReader(loop=self.loop)
stream.feed_data(data)
stream.feed_eof()
@ -25,4 +25,4 @@ class SubscribePacketTest(unittest.TestCase):
payload = UnubscribePayload(['a/b', 'c/d'])
publish = UnsubscribePacket(variable_header=variable_header, payload=payload)
out = publish.to_bytes()
self.assertEqual(out, b'\xa0\x0c\x00\n\x00\x03a/b\x00\x03c/d')
self.assertEqual(out, b'\xa2\x0c\x00\n\x00\x03a/b\x00\x03c/d')