kopia lustrzana https://github.com/Yakifo/amqtt
Make client.publish calls async
rodzic
0db853e0b5
commit
f3d136a773
|
@ -44,6 +44,10 @@ try:
|
|||
from .utils import read_yaml_config
|
||||
except:
|
||||
from utils import read_yaml_config
|
||||
if sys.version_info < (3, 5):
|
||||
from asyncio import async as ensure_future
|
||||
else:
|
||||
from asyncio import ensure_future
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -90,6 +94,7 @@ def _get_message(arguments):
|
|||
|
||||
@asyncio.coroutine
|
||||
def do_pub(client, arguments):
|
||||
running_tasks = []
|
||||
|
||||
try:
|
||||
logger.info("%s Connecting to broker" % client.client_id)
|
||||
|
@ -103,7 +108,10 @@ def do_pub(client, arguments):
|
|||
topic = arguments['-t']
|
||||
for message in _get_message(arguments):
|
||||
logger.info("%s Publishing to '%s'" % (client.client_id, topic))
|
||||
yield from client.publish(topic, message, qos, arguments['-r'])
|
||||
task = ensure_future(client.publish(topic, message, qos, arguments['-r']))
|
||||
running_tasks.append(task)
|
||||
if running_tasks:
|
||||
yield from asyncio.wait(running_tasks)
|
||||
yield from client.disconnect()
|
||||
logger.info("%s Disconnected from broker" % client.client_id)
|
||||
except KeyboardInterrupt:
|
||||
|
|
Ładowanie…
Reference in New Issue