kopia lustrzana https://github.com/Aircoookie/WLED
105 wiersze
3.4 KiB
Python
105 wiersze
3.4 KiB
Python
#!/usr/bin/env python
|
|
import argparse
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
import time
|
|
|
|
# Dependency installed with `pip install paho-mqtt`.
|
|
# https://pypi.org/project/paho-mqtt/
|
|
import paho.mqtt.client as mqtt
|
|
|
|
state = {"label": "None"}
|
|
|
|
|
|
# Define MQTT callbacks
|
|
def on_connect(client, userdata, connect_flags, reason_code, properties):
|
|
print("Connected with result code " + str(reason_code))
|
|
state["start_time"] = None
|
|
client.subscribe(f"{state['root_topic']}#")
|
|
|
|
|
|
def on_message(client, userdata, msg):
|
|
if msg.topic.endswith("roll_label"):
|
|
state["label"] = msg.payload.decode("ascii")
|
|
print(f"Label set to {state['label']}")
|
|
elif msg.topic.endswith("roll"):
|
|
json_str = msg.payload.decode("ascii")
|
|
msg_data = json.loads(json_str)
|
|
# Convert the relative timestamps reported to the dice to an approximate absolute time.
|
|
# The "last_time" check is to detect if the ESP32 was restarted or the counter rolled over.
|
|
if state["start_time"] is None or msg_data["time"] < state["last_time"]:
|
|
state["start_time"] = time.time() - (msg_data["time"] / 1000.0)
|
|
state["last_time"] = msg_data["time"]
|
|
timestamp = state["start_time"] + (msg_data["time"] / 1000.0)
|
|
state["csv_fd"].write(
|
|
f"{timestamp:.3f}, {msg_data['name']}, {state['label']}, {msg_data['state']}, {msg_data['val']}\n"
|
|
)
|
|
state["csv_fd"].flush()
|
|
if msg_data["state"] == 1:
|
|
print(
|
|
f"{timestamp:.3f}: {msg_data['name']} rolled {msg_data['val']}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Log die rolls from WLED MQTT events to CSV.")
|
|
|
|
# IP address (with a default value)
|
|
parser.add_argument(
|
|
"--host",
|
|
type=str,
|
|
default="127.0.0.1",
|
|
help="Host address of broker (default: 127.0.0.1)",
|
|
)
|
|
parser.add_argument(
|
|
"--port", type=int, default=1883, help="Broker TCP port (default: 1883)"
|
|
)
|
|
parser.add_argument("--user", type=str, help="Optional MQTT username")
|
|
parser.add_argument("--password", type=str, help="Optional MQTT password")
|
|
parser.add_argument(
|
|
"--topic",
|
|
type=str,
|
|
help="Optional MQTT topic to listen to. For example if topic is 'wled/e5a658/dice/', subscript to to 'wled/e5a658/dice/#'. By default, listen to all topics looking for ones that end in 'roll_label' and 'roll'.",
|
|
)
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output-dir",
|
|
type=Path,
|
|
default=Path(__file__).absolute().parent / "logs",
|
|
help="Directory to log to",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
timestr = time.strftime("%Y-%m-%d")
|
|
os.makedirs(args.output_dir, exist_ok=True)
|
|
state["csv_fd"] = open(args.output_dir / f"roll_log_{timestr}.csv", "a")
|
|
|
|
# Create `an MQTT client
|
|
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
|
|
|
# Set MQTT callbacks
|
|
client.on_connect = on_connect
|
|
client.on_message = on_message
|
|
|
|
if args.user and args.password:
|
|
client.username_pw_set(args.user, args.password)
|
|
|
|
state["root_topic"] = ""
|
|
|
|
# Connect to the MQTT broker
|
|
client.connect(args.host, args.port, 60)
|
|
|
|
try:
|
|
while client.loop(timeout=1.0) == mqtt.MQTT_ERR_SUCCESS:
|
|
time.sleep(0.1)
|
|
except KeyboardInterrupt:
|
|
exit(0)
|
|
|
|
print("Connection Failure")
|
|
exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|