changedetection.io/changedetectionio/conditions/__init__.py

168 wiersze
6.4 KiB
Python

from json_logic.builtins import BUILTINS
from .exceptions import EmptyConditionRuleRowNotUsable
from .pluggy_interface import plugin_manager # Import the pluggy plugin manager
from . import default_plugin
from loguru import logger
# List of all supported JSON Logic operators
operator_choices = [
(None, "Choose one - Operator"),
(">", "Greater Than"),
("<", "Less Than"),
(">=", "Greater Than or Equal To"),
("<=", "Less Than or Equal To"),
("==", "Equals"),
("!=", "Not Equals"),
("in", "Contains"),
]
# Fields available in the rules
field_choices = [
(None, "Choose one - Field"),
]
# The data we will feed the JSON Rules to see if it passes the test/conditions or not
EXECUTE_DATA = {}
# Define the extended operations dictionary
CUSTOM_OPERATIONS = {
**BUILTINS, # Include all standard operators
}
def filter_complete_rules(ruleset):
rules = [
rule for rule in ruleset
if all(value not in ("", False, "None", None) for value in [rule["operator"], rule["field"], rule["value"]])
]
return rules
def convert_to_jsonlogic(logic_operator: str, rule_dict: list):
"""
Convert a structured rule dict into a JSON Logic rule.
:param rule_dict: Dictionary containing conditions.
:return: JSON Logic rule as a dictionary.
"""
json_logic_conditions = []
for condition in rule_dict:
operator = condition["operator"]
field = condition["field"]
value = condition["value"]
if not operator or operator == 'None' or not value or not field:
raise EmptyConditionRuleRowNotUsable()
# Convert value to int/float if possible
try:
if isinstance(value, str) and "." in value and str != "None":
value = float(value)
else:
value = int(value)
except (ValueError, TypeError):
pass # Keep as a string if conversion fails
# Handle different JSON Logic operators properly
if operator == "in":
json_logic_conditions.append({"in": [value, {"var": field}]}) # value first
elif operator in ("!", "!!", "-"):
json_logic_conditions.append({operator: [{"var": field}]}) # Unary operators
elif operator in ("min", "max", "cat"):
json_logic_conditions.append({operator: value}) # Multi-argument operators
else:
json_logic_conditions.append({operator: [{"var": field}, value]}) # Standard binary operators
return {logic_operator: json_logic_conditions} if len(json_logic_conditions) > 1 else json_logic_conditions[0]
def execute_ruleset_against_all_plugins(current_watch_uuid: str, application_datastruct, ephemeral_data={} ):
"""
Build our data and options by calling our plugins then pass it to jsonlogic and see if the conditions pass
:param ruleset: JSON Logic rule dictionary.
:param extracted_data: Dictionary containing the facts. <-- maybe the app struct+uuid
:return: Dictionary of plugin results.
"""
from json_logic import jsonLogic
EXECUTE_DATA = {}
result = True
watch = application_datastruct['watching'].get(current_watch_uuid)
if watch and watch.get("conditions"):
logic_operator = "and" if watch.get("conditions_match_logic", "ALL") == "ALL" else "or"
complete_rules = filter_complete_rules(watch['conditions'])
if complete_rules:
# Give all plugins a chance to update the data dict again (that we will test the conditions against)
for plugin in plugin_manager.get_plugins():
try:
import concurrent.futures
import time
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
plugin.add_data,
current_watch_uuid=current_watch_uuid,
application_datastruct=application_datastruct,
ephemeral_data=ephemeral_data
)
logger.debug(f"Trying plugin {plugin}....")
# Set a timeout of 10 seconds
try:
new_execute_data = future.result(timeout=10)
if new_execute_data and isinstance(new_execute_data, dict):
EXECUTE_DATA.update(new_execute_data)
except concurrent.futures.TimeoutError:
# The plugin took too long, abort processing for this watch
raise Exception(f"Plugin {plugin.__class__.__name__} took more than 10 seconds to run.")
except Exception as e:
# Log the error but continue with the next plugin
import logging
logging.error(f"Error executing plugin {plugin.__class__.__name__}: {str(e)}")
continue
# Create the ruleset
ruleset = convert_to_jsonlogic(logic_operator=logic_operator, rule_dict=complete_rules)
# Pass the custom operations dictionary to jsonLogic
if not jsonLogic(logic=ruleset, data=EXECUTE_DATA, operations=CUSTOM_OPERATIONS):
result = False
return {'executed_data': EXECUTE_DATA, 'result': result}
# Load plugins dynamically
for plugin in plugin_manager.get_plugins():
new_ops = plugin.register_operators()
if isinstance(new_ops, dict):
CUSTOM_OPERATIONS.update(new_ops)
new_operator_choices = plugin.register_operator_choices()
if isinstance(new_operator_choices, list):
operator_choices.extend(new_operator_choices)
new_field_choices = plugin.register_field_choices()
if isinstance(new_field_choices, list):
field_choices.extend(new_field_choices)
def collect_ui_edit_stats_extras(watch):
"""Collect and combine HTML content from all plugins that implement ui_edit_stats_extras"""
extras_content = []
for plugin in plugin_manager.get_plugins():
try:
content = plugin.ui_edit_stats_extras(watch=watch)
if content:
extras_content.append(content)
except Exception as e:
# Skip plugins that don't implement the hook or have errors
pass
return "\n".join(extras_content) if extras_content else ""