amqtt/samples/client_publish_acl.py

43 wiersze
1.2 KiB
Python

import asyncio
import logging
from amqtt.client import ConnectError, MQTTClient
from amqtt.mqtt.constants import QOS_1
"""
This sample shows how to publish messages to broker running either `samples/broker_acl.py`
or `samples/broker_taboo.py`.
"""
logger = logging.getLogger(__name__)
async def test_coro() -> None:
try:
client = MQTTClient()
await client.connect("mqtt://0.0.0.0:1883")
await client.publish("data/classified", b"TOP SECRET", qos=QOS_1)
await client.publish("data/memes", b"REAL FUN", qos=QOS_1)
await client.publish("repositories/amqtt/master", b"NEW STABLE RELEASE", qos=QOS_1)
await client.publish(
"repositories/amqtt/devel",
b"THIS NEEDS TO BE CHECKED",
qos=QOS_1,
)
await client.publish("calendar/amqtt/releases", b"NEW RELEASE", qos=QOS_1)
logger.info("messages published")
await client.disconnect()
except ConnectError as ce:
logger.exception("ERROR: Connection failed")
def __main__():
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.run(test_coro())
if __name__ == "__main__":
__main__()