kopia lustrzana https://github.com/Yakifo/amqtt
Add support for both Python 3.4 and 3.5
rodzic
4dcf8eb477
commit
8dab60a444
|
@ -5,7 +5,12 @@ import logging
|
|||
import ssl
|
||||
import websockets
|
||||
import asyncio
|
||||
import sys
|
||||
from asyncio import Queue, CancelledError
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
from asyncio import ensure_future
|
||||
from collections import deque
|
||||
|
||||
from functools import partial
|
||||
|
@ -282,7 +287,7 @@ class Broker:
|
|||
yield from self.plugins_manager.fire_event(EVENT_BROKER_POST_START)
|
||||
|
||||
#Start broadcast loop
|
||||
self._broadcast_task = asyncio.ensure_future(self._broadcast_loop(), loop=self._loop)
|
||||
self._broadcast_task = ensure_future(self._broadcast_loop(), loop=self._loop)
|
||||
|
||||
self.logger.debug("Broker started")
|
||||
except Exception as e:
|
||||
|
@ -403,10 +408,10 @@ class Broker:
|
|||
yield from self.publish_session_retained_messages(client_session)
|
||||
|
||||
# Init and start loop for handling client messages (publish, subscribe/unsubscribe, disconnect)
|
||||
disconnect_waiter = asyncio.ensure_future(handler.wait_disconnect(), loop=self._loop)
|
||||
subscribe_waiter = asyncio.ensure_future(handler.get_next_pending_subscription(), loop=self._loop)
|
||||
unsubscribe_waiter = asyncio.ensure_future(handler.get_next_pending_unsubscription(), loop=self._loop)
|
||||
wait_deliver = asyncio.ensure_future(handler.mqtt_deliver_next_message(), loop=self._loop)
|
||||
disconnect_waiter = ensure_future(handler.wait_disconnect(), loop=self._loop)
|
||||
subscribe_waiter = ensure_future(handler.get_next_pending_subscription(), loop=self._loop)
|
||||
unsubscribe_waiter = ensure_future(handler.get_next_pending_unsubscription(), loop=self._loop)
|
||||
wait_deliver = ensure_future(handler.mqtt_deliver_next_message(), loop=self._loop)
|
||||
connected = True
|
||||
while connected:
|
||||
try:
|
||||
|
@ -647,7 +652,7 @@ class Broker:
|
|||
(format_client_message(session=broadcast['session']),
|
||||
broadcast['topic'], format_client_message(session=target_session)))
|
||||
handler = self._get_handler(target_session)
|
||||
task = asyncio.ensure_future(
|
||||
task = ensure_future(
|
||||
handler.mqtt_publish(broadcast['topic'], broadcast['data'], qos, retain=False),
|
||||
loop=self._loop)
|
||||
running_tasks.append(task)
|
||||
|
@ -683,7 +688,7 @@ class Broker:
|
|||
handler = self._get_handler(session)
|
||||
while not session.retained_messages.empty():
|
||||
retained = yield from session.retained_messages.get()
|
||||
publish_tasks.append(asyncio.ensure_future(
|
||||
publish_tasks.append(ensure_future(
|
||||
handler.mqtt_publish(
|
||||
retained.topic, retained.data, retained.qos, True), loop=self._loop))
|
||||
if publish_tasks:
|
||||
|
|
|
@ -19,6 +19,11 @@ import websockets
|
|||
from websockets.uri import InvalidURI
|
||||
from websockets.handshake import InvalidHandshake
|
||||
from collections import deque
|
||||
import sys
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
from asyncio import ensure_future
|
||||
|
||||
_defaults = {
|
||||
'keep_alive': 10,
|
||||
|
@ -191,7 +196,7 @@ class MQTTClient:
|
|||
@asyncio.coroutine
|
||||
def _do_connect(self):
|
||||
return_code = yield from self._connect_coro()
|
||||
self._disconnect_task = asyncio.ensure_future(self.handle_connection_close(), loop=self._loop)
|
||||
self._disconnect_task = ensure_future(self.handle_connection_close(), loop=self._loop)
|
||||
return return_code
|
||||
|
||||
@mqtt_connected
|
||||
|
@ -244,7 +249,7 @@ class MQTTClient:
|
|||
|
||||
@asyncio.coroutine
|
||||
def deliver_message(self, timeout=None):
|
||||
deliver_task = asyncio.ensure_future(self._handler.mqtt_deliver_next_message(), loop=self._loop)
|
||||
deliver_task = ensure_future(self._handler.mqtt_deliver_next_message(), loop=self._loop)
|
||||
self.client_tasks.append(deliver_task)
|
||||
self.logger.debug("Waiting message delivery")
|
||||
yield from asyncio.wait([deliver_task], loop=self._loop, return_when=asyncio.FIRST_EXCEPTION, timeout=timeout)
|
||||
|
|
|
@ -3,6 +3,11 @@
|
|||
# See the file license.txt for copying permission.
|
||||
import asyncio
|
||||
from asyncio import futures
|
||||
import sys
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
from asyncio import ensure_future
|
||||
from hbmqtt.mqtt.protocol.handler import ProtocolHandler, EVENT_MQTT_PACKET_RECEIVED
|
||||
from hbmqtt.mqtt.packet import *
|
||||
from hbmqtt.mqtt.disconnect import DisconnectPacket
|
||||
|
@ -92,7 +97,7 @@ class ClientProtocolHandler(ProtocolHandler):
|
|||
try:
|
||||
self.logger.debug("Scheduling Ping")
|
||||
if not self._ping_task:
|
||||
self._ping_task = asyncio.ensure_future(self.mqtt_ping())
|
||||
self._ping_task = ensure_future(self.mqtt_ping())
|
||||
except BaseException as be:
|
||||
self.logger.debug("Exception ignored in ping task: %r" % be)
|
||||
|
||||
|
|
|
@ -32,6 +32,8 @@ from hbmqtt.errors import HBMQTTException
|
|||
import sys
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
from asyncio import ensure_future
|
||||
|
||||
EVENT_MQTT_PACKET_SENT = 'mqtt_packet_sent'
|
||||
EVENT_MQTT_PACKET_RECEIVED = 'mqtt_packet_received'
|
||||
|
@ -377,31 +379,31 @@ class ProtocolHandler:
|
|||
EVENT_MQTT_PACKET_RECEIVED, packet=packet, session=self.session)
|
||||
task = None
|
||||
if packet.fixed_header.packet_type == CONNACK:
|
||||
task = asyncio.ensure_future(self.handle_connack(packet), loop=self._loop)
|
||||
task = ensure_future(self.handle_connack(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == SUBSCRIBE:
|
||||
task = asyncio.ensure_future(self.handle_subscribe(packet), loop=self._loop)
|
||||
task = ensure_future(self.handle_subscribe(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == UNSUBSCRIBE:
|
||||
task = asyncio.ensure_future(self.handle_unsubscribe(packet), loop=self._loop)
|
||||
task = ensure_future(self.handle_unsubscribe(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == SUBACK:
|
||||
task = asyncio.ensure_future(self.handle_suback(packet), loop=self._loop)
|
||||
task = ensure_future(self.handle_suback(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == UNSUBACK:
|
||||
task = asyncio.ensure_future(self.handle_unsuback(packet), loop=self._loop)
|
||||
task = ensure_future(self.handle_unsuback(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == PUBACK:
|
||||
task = asyncio.ensure_future(self.handle_puback(packet), loop=self._loop)
|
||||
task = ensure_future(self.handle_puback(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == PUBREC:
|
||||
task = asyncio.ensure_future(self.handle_pubrec(packet), loop=self._loop)
|
||||
task = ensure_future(self.handle_pubrec(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == PUBREL:
|
||||
task = asyncio.ensure_future(self.handle_pubrel(packet), loop=self._loop)
|
||||
task = ensure_future(self.handle_pubrel(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == PUBCOMP:
|
||||
task = asyncio.ensure_future(self.handle_pubcomp(packet), loop=self._loop)
|
||||
task = ensure_future(self.handle_pubcomp(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == PINGREQ:
|
||||
task = asyncio.ensure_future(self.handle_pingreq(packet), loop=self._loop)
|
||||
task = ensure_future(self.handle_pingreq(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == PINGRESP:
|
||||
task = asyncio.ensure_future(self.handle_pingresp(packet), loop=self._loop)
|
||||
task = ensure_future(self.handle_pingresp(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == PUBLISH:
|
||||
task = asyncio.ensure_future(self.handle_publish(packet), loop=self._loop)
|
||||
task = ensure_future(self.handle_publish(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == DISCONNECT:
|
||||
task = asyncio.ensure_future(self.handle_disconnect(packet), loop=self._loop)
|
||||
task = ensure_future(self.handle_disconnect(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == CONNECT:
|
||||
self.handle_connect(packet)
|
||||
else:
|
||||
|
|
|
@ -8,6 +8,11 @@ import pkg_resources
|
|||
import logging
|
||||
import asyncio
|
||||
import copy
|
||||
import sys
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
from asyncio import ensure_future
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
|
@ -107,7 +112,7 @@ class PluginManager:
|
|||
return self._plugins
|
||||
|
||||
def _schedule_coro(self, coro):
|
||||
return asyncio.ensure_future(coro, loop=self._loop)
|
||||
return ensure_future(coro, loop=self._loop)
|
||||
|
||||
@asyncio.coroutine
|
||||
def fire_event(self, event_name, wait=False, *args, **kwargs):
|
||||
|
|
|
@ -5,6 +5,10 @@ from datetime import datetime
|
|||
from hbmqtt.mqtt.packet import PUBLISH
|
||||
from hbmqtt.codecs import int_to_bytes_str
|
||||
import asyncio
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
from asyncio import ensure_future
|
||||
from collections import deque
|
||||
|
||||
DOLLAR_SYS_ROOT = '$SYS/broker/'
|
||||
|
@ -47,7 +51,7 @@ class BrokerSysPlugin:
|
|||
return (yield from self.context.broadcast_message(DOLLAR_SYS_ROOT + topic_basename, data))
|
||||
|
||||
def schedule_broadcast_sys_topic(self, topic_basename, data):
|
||||
return asyncio.ensure_future(self._broadcast_sys_topic(DOLLAR_SYS_ROOT + topic_basename, data),
|
||||
return ensure_future(self._broadcast_sys_topic(DOLLAR_SYS_ROOT + topic_basename, data),
|
||||
loop=self.context.loop)
|
||||
|
||||
@asyncio.coroutine
|
||||
|
|
2
tox.ini
2
tox.ini
|
@ -1,6 +1,6 @@
|
|||
[tox]
|
||||
envlist =
|
||||
#py34,
|
||||
py34,
|
||||
py35,
|
||||
coverage,
|
||||
#flake8
|
||||
|
|
Ładowanie…
Reference in New Issue