kopia lustrzana https://github.com/Yakifo/amqtt
commit
1af9e7f99c
|
@ -10,6 +10,11 @@ python:
|
|||
matrix:
|
||||
allow_failures:
|
||||
- python: "nightly"
|
||||
include:
|
||||
- python: "3.7"
|
||||
env: TOXENV=py37
|
||||
dist: xenial
|
||||
sudo: true
|
||||
|
||||
install:
|
||||
- pip install coveralls tox-travis
|
||||
|
|
|
@ -25,10 +25,6 @@ from hbmqtt.adapters import (
|
|||
WebSocketsWriter)
|
||||
from .plugins.manager import PluginManager, BaseContext
|
||||
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
from asyncio import ensure_future
|
||||
|
||||
_defaults = {
|
||||
'timeout-disconnect-delay': 2,
|
||||
|
@ -292,7 +288,7 @@ class Broker:
|
|||
yield from self.plugins_manager.fire_event(EVENT_BROKER_POST_START)
|
||||
|
||||
#Start broadcast loop
|
||||
self._broadcast_task = ensure_future(self._broadcast_loop(), loop=self._loop)
|
||||
self._broadcast_task = asyncio.ensure_future(self._broadcast_loop(), loop=self._loop)
|
||||
|
||||
self.logger.debug("Broker started")
|
||||
except Exception as e:
|
||||
|
@ -420,10 +416,10 @@ class Broker:
|
|||
yield from self.publish_session_retained_messages(client_session)
|
||||
|
||||
# Init and start loop for handling client messages (publish, subscribe/unsubscribe, disconnect)
|
||||
disconnect_waiter = ensure_future(handler.wait_disconnect(), loop=self._loop)
|
||||
subscribe_waiter = ensure_future(handler.get_next_pending_subscription(), loop=self._loop)
|
||||
unsubscribe_waiter = ensure_future(handler.get_next_pending_unsubscription(), loop=self._loop)
|
||||
wait_deliver = ensure_future(handler.mqtt_deliver_next_message(), loop=self._loop)
|
||||
disconnect_waiter = asyncio.ensure_future(handler.wait_disconnect(), loop=self._loop)
|
||||
subscribe_waiter = asyncio.ensure_future(handler.get_next_pending_subscription(), loop=self._loop)
|
||||
unsubscribe_waiter = asyncio.ensure_future(handler.get_next_pending_unsubscription(), loop=self._loop)
|
||||
wait_deliver = asyncio.ensure_future(handler.mqtt_deliver_next_message(), loop=self._loop)
|
||||
connected = True
|
||||
while connected:
|
||||
try:
|
||||
|
@ -712,7 +708,7 @@ class Broker:
|
|||
(format_client_message(session=broadcast['session']),
|
||||
broadcast['topic'], format_client_message(session=target_session)))
|
||||
handler = self._get_handler(target_session)
|
||||
task = ensure_future(
|
||||
task = asyncio.ensure_future(
|
||||
handler.mqtt_publish(broadcast['topic'], broadcast['data'], qos, retain=False),
|
||||
loop=self._loop)
|
||||
running_tasks.append(task)
|
||||
|
@ -748,7 +744,7 @@ class Broker:
|
|||
handler = self._get_handler(session)
|
||||
while not session.retained_messages.empty():
|
||||
retained = yield from session.retained_messages.get()
|
||||
publish_tasks.append(ensure_future(
|
||||
publish_tasks.append(asyncio.ensure_future(
|
||||
handler.mqtt_publish(
|
||||
retained.topic, retained.data, retained.qos, True), loop=self._loop))
|
||||
if publish_tasks:
|
||||
|
|
|
@ -20,11 +20,7 @@ import websockets
|
|||
from websockets.uri import InvalidURI
|
||||
from websockets.exceptions import InvalidHandshake
|
||||
from collections import deque
|
||||
import sys
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
from asyncio import ensure_future
|
||||
|
||||
|
||||
_defaults = {
|
||||
'keep_alive': 10,
|
||||
|
@ -216,7 +212,7 @@ class MQTTClient:
|
|||
@asyncio.coroutine
|
||||
def _do_connect(self):
|
||||
return_code = yield from self._connect_coro()
|
||||
self._disconnect_task = ensure_future(self.handle_connection_close(), loop=self._loop)
|
||||
self._disconnect_task = asyncio.ensure_future(self.handle_connection_close(), loop=self._loop)
|
||||
return return_code
|
||||
|
||||
@mqtt_connected
|
||||
|
@ -329,7 +325,7 @@ class MQTTClient:
|
|||
:return: instance of :class:`hbmqtt.session.ApplicationMessage` containing received message information flow.
|
||||
:raises: :class:`asyncio.TimeoutError` if timeout occurs before a message is delivered
|
||||
"""
|
||||
deliver_task = ensure_future(self._handler.mqtt_deliver_next_message(), loop=self._loop)
|
||||
deliver_task = asyncio.ensure_future(self._handler.mqtt_deliver_next_message(), loop=self._loop)
|
||||
self.client_tasks.append(deliver_task)
|
||||
self.logger.debug("Waiting message delivery")
|
||||
done, pending = yield from asyncio.wait([deliver_task], loop=self._loop, return_when=asyncio.FIRST_EXCEPTION, timeout=timeout)
|
||||
|
|
|
@ -17,11 +17,6 @@ from hbmqtt.mqtt.connack import ConnackPacket
|
|||
from hbmqtt.session import Session
|
||||
from hbmqtt.plugins.manager import PluginManager
|
||||
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
from asyncio import ensure_future
|
||||
|
||||
|
||||
class ClientProtocolHandler(ProtocolHandler):
|
||||
def __init__(self, plugins_manager: PluginManager, session: Session=None, loop=None):
|
||||
|
@ -94,7 +89,7 @@ class ClientProtocolHandler(ProtocolHandler):
|
|||
try:
|
||||
if not self._ping_task:
|
||||
self.logger.debug("Scheduling Ping")
|
||||
self._ping_task = ensure_future(self.mqtt_ping())
|
||||
self._ping_task = asyncio.ensure_future(self.mqtt_ping())
|
||||
except BaseException as be:
|
||||
self.logger.debug("Exception ignored in ping task: %r" % be)
|
||||
|
||||
|
|
|
@ -33,11 +33,6 @@ from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
|
|||
from hbmqtt.plugins.manager import PluginManager
|
||||
from hbmqtt.errors import HBMQTTException, MQTTException, NoDataException
|
||||
|
||||
import sys
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
from asyncio import ensure_future
|
||||
|
||||
EVENT_MQTT_PACKET_SENT = 'mqtt_packet_sent'
|
||||
EVENT_MQTT_PACKET_RECEIVED = 'mqtt_packet_received'
|
||||
|
@ -387,31 +382,31 @@ class ProtocolHandler:
|
|||
EVENT_MQTT_PACKET_RECEIVED, packet=packet, session=self.session)
|
||||
task = None
|
||||
if packet.fixed_header.packet_type == CONNACK:
|
||||
task = ensure_future(self.handle_connack(packet), loop=self._loop)
|
||||
task = asyncio.ensure_future(self.handle_connack(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == SUBSCRIBE:
|
||||
task = ensure_future(self.handle_subscribe(packet), loop=self._loop)
|
||||
task = asyncio.ensure_future(self.handle_subscribe(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == UNSUBSCRIBE:
|
||||
task = ensure_future(self.handle_unsubscribe(packet), loop=self._loop)
|
||||
task = asyncio.ensure_future(self.handle_unsubscribe(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == SUBACK:
|
||||
task = ensure_future(self.handle_suback(packet), loop=self._loop)
|
||||
task = asyncio.ensure_future(self.handle_suback(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == UNSUBACK:
|
||||
task = ensure_future(self.handle_unsuback(packet), loop=self._loop)
|
||||
task = asyncio.ensure_future(self.handle_unsuback(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == PUBACK:
|
||||
task = ensure_future(self.handle_puback(packet), loop=self._loop)
|
||||
task = asyncio.ensure_future(self.handle_puback(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == PUBREC:
|
||||
task = ensure_future(self.handle_pubrec(packet), loop=self._loop)
|
||||
task = asyncio.ensure_future(self.handle_pubrec(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == PUBREL:
|
||||
task = ensure_future(self.handle_pubrel(packet), loop=self._loop)
|
||||
task = asyncio.ensure_future(self.handle_pubrel(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == PUBCOMP:
|
||||
task = ensure_future(self.handle_pubcomp(packet), loop=self._loop)
|
||||
task = asyncio.ensure_future(self.handle_pubcomp(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == PINGREQ:
|
||||
task = ensure_future(self.handle_pingreq(packet), loop=self._loop)
|
||||
task = asyncio.ensure_future(self.handle_pingreq(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == PINGRESP:
|
||||
task = ensure_future(self.handle_pingresp(packet), loop=self._loop)
|
||||
task = asyncio.ensure_future(self.handle_pingresp(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == PUBLISH:
|
||||
task = ensure_future(self.handle_publish(packet), loop=self._loop)
|
||||
task = asyncio.ensure_future(self.handle_publish(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == DISCONNECT:
|
||||
task = ensure_future(self.handle_disconnect(packet), loop=self._loop)
|
||||
task = asyncio.ensure_future(self.handle_disconnect(packet), loop=self._loop)
|
||||
elif packet.fixed_header.packet_type == CONNECT:
|
||||
self.handle_connect(packet)
|
||||
else:
|
||||
|
|
|
@ -12,10 +12,6 @@ import sys
|
|||
|
||||
from collections import namedtuple
|
||||
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
from asyncio import ensure_future
|
||||
|
||||
Plugin = namedtuple('Plugin', ['name', 'ep', 'object'])
|
||||
|
||||
|
@ -113,7 +109,7 @@ class PluginManager:
|
|||
return self._plugins
|
||||
|
||||
def _schedule_coro(self, coro):
|
||||
return ensure_future(coro, loop=self._loop)
|
||||
return asyncio.ensure_future(coro, loop=self._loop)
|
||||
|
||||
@asyncio.coroutine
|
||||
def fire_event(self, event_name, wait=False, *args, **kwargs):
|
||||
|
|
|
@ -8,10 +8,6 @@ import asyncio
|
|||
import sys
|
||||
from collections import deque
|
||||
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
from asyncio import ensure_future
|
||||
|
||||
DOLLAR_SYS_ROOT = '$SYS/broker/'
|
||||
STAT_BYTES_SENT = 'bytes_sent'
|
||||
|
@ -53,8 +49,10 @@ class BrokerSysPlugin:
|
|||
return (yield from self.context.broadcast_message(topic_basename, data))
|
||||
|
||||
def schedule_broadcast_sys_topic(self, topic_basename, data):
|
||||
return ensure_future(self._broadcast_sys_topic(DOLLAR_SYS_ROOT + topic_basename, data),
|
||||
loop=self.context.loop)
|
||||
return asyncio.ensure_future(
|
||||
self._broadcast_sys_topic(DOLLAR_SYS_ROOT + topic_basename, data),
|
||||
loop=self.context.loop
|
||||
)
|
||||
|
||||
@asyncio.coroutine
|
||||
def on_broker_pre_start(self, *args, **kwargs):
|
||||
|
|
|
@ -42,10 +42,6 @@ from hbmqtt.version import get_version
|
|||
from docopt import docopt
|
||||
from hbmqtt.utils import read_yaml_config
|
||||
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
from asyncio import ensure_future
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -107,7 +103,7 @@ def do_pub(client, arguments):
|
|||
retain = arguments['-r']
|
||||
for message in _get_message(arguments):
|
||||
logger.info("%s Publishing to '%s'" % (client.client_id, topic))
|
||||
task = ensure_future(client.publish(topic, message, qos, retain))
|
||||
task = asyncio.ensure_future(client.publish(topic, message, qos, retain))
|
||||
running_tasks.append(task)
|
||||
if running_tasks:
|
||||
yield from asyncio.wait(running_tasks)
|
||||
|
|
|
@ -25,11 +25,6 @@ from hbmqtt.mqtt import (
|
|||
from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPayload
|
||||
from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
|
||||
|
||||
import sys
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
from asyncio import ensure_future
|
||||
|
||||
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
|
||||
logging.basicConfig(level=logging.DEBUG, format=formatter)
|
||||
|
@ -387,7 +382,7 @@ class BrokerTest(unittest.TestCase):
|
|||
|
||||
publish_1 = PublishPacket.build('/test', b'data', 1, False, QOS_2, False)
|
||||
yield from publish_1.to_stream(writer)
|
||||
ensure_future(PubrecPacket.from_stream(reader), loop=self.loop)
|
||||
asyncio.ensure_future(PubrecPacket.from_stream(reader), loop=self.loop)
|
||||
|
||||
yield from asyncio.sleep(2)
|
||||
|
||||
|
|
1
tox.ini
1
tox.ini
|
@ -8,6 +8,7 @@ envlist =
|
|||
py34,
|
||||
py35,
|
||||
py36,
|
||||
py37,
|
||||
coverage,
|
||||
flake8,
|
||||
check-manifest
|
||||
|
|
Ładowanie…
Reference in New Issue