From 6aef6dce935cbeef8ae637e43ca677db640fa0b8 Mon Sep 17 00:00:00 2001 From: Nico Date: Sat, 31 Oct 2015 14:28:30 +0100 Subject: [PATCH] Rename script + complete hbmqtt_pub --- scripts/{hbmqtt.py => broker_script.py} | 4 +- scripts/default_client.yaml | 7 ++ scripts/hbmqtt_sub.py | 0 scripts/pub_script.py | 152 +++++++++++++++++++++++ scripts/{hbmqtt_pub.py => sub_script.py} | 0 setup.py | 6 +- 6 files changed, 164 insertions(+), 5 deletions(-) rename scripts/{hbmqtt.py => broker_script.py} (93%) create mode 100644 scripts/default_client.yaml delete mode 100644 scripts/hbmqtt_sub.py create mode 100644 scripts/pub_script.py rename scripts/{hbmqtt_pub.py => sub_script.py} (100%) diff --git a/scripts/hbmqtt.py b/scripts/broker_script.py similarity index 93% rename from scripts/hbmqtt.py rename to scripts/broker_script.py index 1f2ec0b..3556ce7 100644 --- a/scripts/hbmqtt.py +++ b/scripts/broker_script.py @@ -65,14 +65,14 @@ def main(*args, **kwargs): config = read_yaml_config(arguments['-c']) else: config = read_yaml_config(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default_broker.yaml')) - logger.warning("Using default configuration") + logger.debug("Using default configuration") loop = asyncio.get_event_loop() broker = Broker(config) try: loop.run_until_complete(broker.start()) loop.run_forever() except KeyboardInterrupt: - asyncio.get_event_loop().run_until_complete(broker.shutdown()) + loop.run_until_complete(broker.shutdown()) finally: loop.close() diff --git a/scripts/default_client.yaml b/scripts/default_client.yaml new file mode 100644 index 0000000..8f4007b --- /dev/null +++ b/scripts/default_client.yaml @@ -0,0 +1,7 @@ +keep_alive: 10 +ping_delay: 1 +default_qos: 0 +default_retain: false +auto_reconnect: true +reconnect_max_interval: 10 +reconnect_retries: 2 diff --git a/scripts/hbmqtt_sub.py b/scripts/hbmqtt_sub.py deleted file mode 100644 index e69de29..0000000 diff --git a/scripts/pub_script.py b/scripts/pub_script.py new file mode 100644 index 0000000..7d14c64 --- /dev/null +++ b/scripts/pub_script.py @@ -0,0 +1,152 @@ +# Copyright (c) 2015 Nicolas JOUANIN +# +# See the file license.txt for copying permission. +""" +hbmqtt_pub - MQTT 3.1.1 publisher + +Usage: + hbmqtt_pub --version + hbmqtt_pub (-h | --help) + hbmqtt_pub --url BROKER_URL -t TOPIC (-f FILE | -l | -m MESSAGE | -n | -s) [-c CONFIG_FILE] [-i CLIENT_ID] [-q | --qos QOS] [-d] [-k KEEP_ALIVE] [--clean-session] [--ca-file CAFILE] [--ca-path CAPATH] [--ca-data CADATA] [ --will-topic WILL_TOPIC [--will-message WILL_MESSAGE] [--will-qos WILL_QOS] [--will-retain] ] + +Options: + -h --help Show this screen. + --version Show version. + --url BROKER_URL Broker connection URL (musr conform to MQTT URI scheme (see https://github.com/mqtt/mqtt.github.io/wiki/URI-Scheme>) + -c CONFIG_FILE Broker configuration file (YAML format) + -i CLIENT_ID Id to use as client ID. + -q | --qos QOS Quality of service to use for the message, from 0, 1 and 2. Defaults to 0. + -t TOPIC Message topic + -m MESSAGE Message data to send + -f FILE Read file by line and publish message for each line + -s Read from stdin and publish message for each line + -k KEEP_ALIVE Keep alive timeout in second + --clean-session Clean session on connect (defaults to False) + --ca-file CAFILE] CA file + --ca-path CAPATH] CA Path + --ca-data CADATA CA data + --will-topic WILL_TOPIC + --will-message WILL_MESSAGE + --will-qos WILL_QOS + --will-retain + -d Enable debug messages +""" + +import sys +import logging +import asyncio +import os +from hbmqtt.client import MQTTClient, ConnectException +from hbmqtt.version import get_version +from docopt import docopt +try: + from .utils import read_yaml_config +except: + from utils import read_yaml_config + +logger = logging.getLogger(__name__) + + +def _gen_client_id(): + import os + import socket + pid = os.getpid() + hostname = socket.gethostname() + return "hbmqtt_pub/%d-%s" % (pid, hostname) + + +def _get_qos(arguments): + try: + return int(arguments['--qos'][0]) + except: + return None + + +def _get_message(arguments): + if arguments['-n']: + yield b'' + if arguments['-m']: + yield arguments['-m'].encode(encoding='utf-8') + if arguments['-f']: + try: + with open(arguments['-f'], 'r') as f: + for line in f: + yield line.encode(encoding='utf-8') + except: + logger.error("%s Failed to read file '%s'" % (client.client_id, arguments['-f'])) + if arguments['-s']: + import sys + for line in sys.stdin: + yield line.encode(encoding='utf-8') + + + +@asyncio.coroutine +def do_pub(client, arguments): + + try: + logger.info("%s Connecting to broker" % client.client_id) + + yield from client.connect(uri=arguments['--url'], + cleansession=arguments['--clean-session'], + cafile=arguments['--ca-file'], + capath=arguments['--ca-path'], + cadata=arguments['--ca-data']) + qos = _get_qos(arguments) + topic = arguments['-t'] + for message in _get_message(arguments): + logger.info("%s Publishing to '%s'" % (client.client_id, topic)) + yield from client.publish(topic, message, qos) + yield from client.disconnect() + logger.info("%s Disconnected from broker" % client.client_id) + except KeyboardInterrupt: + yield from client.disconnect() + logger.info("%s Disconnected from broker" % client.client_id) + except ConnectException as ce: + logger.fatal("connection to '%s' failed: %r" % (arguments['--url'], ce)) + + +def main(*args, **kwargs): + if sys.version_info[:2] < (3, 5): + logger.fatal("Error: Python 3.5 is required") + sys.exit(-1) + + arguments = docopt(__doc__, version=get_version()) + #print(arguments) + formatter = "[%(asctime)s] :: %(levelname)s - %(message)s" + + if arguments['-d']: + level = logging.DEBUG + else: + level = logging.INFO + logging.basicConfig(level=level, format=formatter) + + config = None + if arguments['-c']: + config = read_yaml_config(arguments['-c']) + else: + config = read_yaml_config(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default_client.yaml')) + logger.debug("Using default configuration") + loop = asyncio.get_event_loop() + + client_id = arguments.get("-i", None) + if not client_id: + client_id = _gen_client_id() + + if arguments['-k']: + config['keep_alive'] = int(arguments['-k']) + + if arguments['--will-topic'] and arguments['--will-message'] and arguments['--will-qos']: + config['will'] = dict() + config['will']['topic'] = arguments['--will-topic'] + config['will']['message'] = arguments['--will-message'].encode('utf-8') + config['will']['qos'] = int(arguments['--will-qos']) + config['will']['retain'] = arguments['--will-retain'] + + client = MQTTClient(client_id=client_id, config=config, loop=loop) + loop.run_until_complete(do_pub(client, arguments)) + loop.close() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/hbmqtt_pub.py b/scripts/sub_script.py similarity index 100% rename from scripts/hbmqtt_pub.py rename to scripts/sub_script.py diff --git a/setup.py b/setup.py index 88891a0..cfc707a 100644 --- a/setup.py +++ b/setup.py @@ -50,9 +50,9 @@ setup( 'packet_logger_plugin = hbmqtt.plugins.logging:PacketLoggerPlugin', ], 'console_scripts': [ - 'hbmqtt = scripts.hbmqtt:main', - 'hbmqtt_pub = scripts.hbmqtt_pub:main', - 'hbmqtt_sub = scripts.hbmqtt_sub:main', + 'hbmqtt = scripts.broker_script:main', + 'hbmqtt_pub = scripts.pub_script:main', + 'hbmqtt_sub = scripts.sub_script:main', ] } ) \ No newline at end of file