New major functionality CONDITIONS - Compare values, check numbers within range, etc

pull/3029/head^2
dgtlmoon 2025-03-17 19:20:24 +01:00 zatwierdzone przez GitHub
rodzic 1c2cfc37aa
commit 7e7d5dc383
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
22 zmienionych plików z 884 dodań i 14 usunięć

Wyświetl plik

@ -2,6 +2,7 @@ recursive-include changedetectionio/api *
recursive-include changedetectionio/apprise_plugin *
recursive-include changedetectionio/blueprint *
recursive-include changedetectionio/content_fetchers *
recursive-include changedetectionio/conditions *
recursive-include changedetectionio/model *
recursive-include changedetectionio/processors *
recursive-include changedetectionio/static *

Wyświetl plik

@ -0,0 +1,135 @@
from flask import Blueprint
from json_logic.builtins import BUILTINS
from .exceptions import EmptyConditionRuleRowNotUsable
from .pluggy_interface import plugin_manager # Import the pluggy plugin manager
from . import default_plugin
# List of all supported JSON Logic operators
operator_choices = [
(None, "Choose one"),
(">", "Greater Than"),
("<", "Less Than"),
(">=", "Greater Than or Equal To"),
("<=", "Less Than or Equal To"),
("==", "Equals"),
("!=", "Not Equals"),
("in", "Contains"),
("!in", "Does Not Contain"),
]
# Fields available in the rules
field_choices = [
(None, "Choose one"),
]
# The data we will feed the JSON Rules to see if it passes the test/conditions or not
EXECUTE_DATA = {}
# Define the extended operations dictionary
CUSTOM_OPERATIONS = {
**BUILTINS, # Include all standard operators
}
def filter_complete_rules(ruleset):
rules = [
rule for rule in ruleset
if all(value not in ("", False, "None", None) for value in [rule["operator"], rule["field"], rule["value"]])
]
return rules
def convert_to_jsonlogic(logic_operator: str, rule_dict: list):
"""
Convert a structured rule dict into a JSON Logic rule.
:param rule_dict: Dictionary containing conditions.
:return: JSON Logic rule as a dictionary.
"""
json_logic_conditions = []
for condition in rule_dict:
operator = condition["operator"]
field = condition["field"]
value = condition["value"]
if not operator or operator == 'None' or not value or not field:
raise EmptyConditionRuleRowNotUsable()
# Convert value to int/float if possible
try:
if isinstance(value, str) and "." in value and str != "None":
value = float(value)
else:
value = int(value)
except (ValueError, TypeError):
pass # Keep as a string if conversion fails
# Handle different JSON Logic operators properly
if operator == "in":
json_logic_conditions.append({"in": [value, {"var": field}]}) # value first
elif operator in ("!", "!!", "-"):
json_logic_conditions.append({operator: [{"var": field}]}) # Unary operators
elif operator in ("min", "max", "cat"):
json_logic_conditions.append({operator: value}) # Multi-argument operators
else:
json_logic_conditions.append({operator: [{"var": field}, value]}) # Standard binary operators
return {logic_operator: json_logic_conditions} if len(json_logic_conditions) > 1 else json_logic_conditions[0]
def execute_ruleset_against_all_plugins(current_watch_uuid: str, application_datastruct, ephemeral_data={} ):
"""
Build our data and options by calling our plugins then pass it to jsonlogic and see if the conditions pass
:param ruleset: JSON Logic rule dictionary.
:param extracted_data: Dictionary containing the facts. <-- maybe the app struct+uuid
:return: Dictionary of plugin results.
"""
from json_logic import jsonLogic
EXECUTE_DATA = {}
result = True
ruleset_settings = application_datastruct['watching'].get(current_watch_uuid)
if ruleset_settings.get("conditions"):
logic_operator = "and" if ruleset_settings.get("conditions_match_logic", "ALL") == "ALL" else "or"
complete_rules = filter_complete_rules(ruleset_settings['conditions'])
if complete_rules:
# Give all plugins a chance to update the data dict again (that we will test the conditions against)
for plugin in plugin_manager.get_plugins():
new_execute_data = plugin.add_data(current_watch_uuid=current_watch_uuid,
application_datastruct=application_datastruct,
ephemeral_data=ephemeral_data)
if new_execute_data and isinstance(new_execute_data, dict):
EXECUTE_DATA.update(new_execute_data)
# Create the ruleset
ruleset = convert_to_jsonlogic(logic_operator=logic_operator, rule_dict=complete_rules)
# Pass the custom operations dictionary to jsonLogic
if not jsonLogic(logic=ruleset, data=EXECUTE_DATA, operations=CUSTOM_OPERATIONS):
result = False
return result
# Load plugins dynamically
for plugin in plugin_manager.get_plugins():
new_ops = plugin.register_operators()
if isinstance(new_ops, dict):
CUSTOM_OPERATIONS.update(new_ops)
new_operator_choices = plugin.register_operator_choices()
if isinstance(new_operator_choices, list):
operator_choices.extend(new_operator_choices)
new_field_choices = plugin.register_field_choices()
if isinstance(new_field_choices, list):
field_choices.extend(new_field_choices)

Wyświetl plik

@ -0,0 +1,78 @@
# Flask Blueprint Definition
import json
from flask import Blueprint
from changedetectionio.conditions import execute_ruleset_against_all_plugins
def construct_blueprint(datastore):
from changedetectionio.flask_app import login_optionally_required
conditions_blueprint = Blueprint('conditions', __name__, template_folder="templates")
@conditions_blueprint.route("/<string:watch_uuid>/verify-condition-single-rule", methods=['POST'])
@login_optionally_required
def verify_condition_single_rule(watch_uuid):
"""Verify a single condition rule against the current snapshot"""
from changedetectionio.processors.text_json_diff import prepare_filter_prevew
from flask import request, jsonify
from copy import deepcopy
ephemeral_data = {}
# Get the watch data
watch = datastore.data['watching'].get(watch_uuid)
if not watch:
return jsonify({'status': 'error', 'message': 'Watch not found'}), 404
# First use prepare_filter_prevew to process the form data
# This will return text_after_filter which is after all current form settings are applied
# Create ephemeral data with the text from the current snapshot
try:
# Call prepare_filter_prevew to get a processed version of the content with current form settings
# We'll ignore the returned response and just use the datastore which is modified by the function
# this should apply all filters etc so then we can run the CONDITIONS against the final output text
result = prepare_filter_prevew(datastore=datastore,
form_data=request.form,
watch_uuid=watch_uuid)
ephemeral_data['text'] = result.get('after_filter', '')
# Create a temporary watch data structure with this single rule
tmp_watch_data = deepcopy(datastore.data['watching'].get(watch_uuid))
# Override the conditions in the temporary watch
rule_json = request.args.get("rule")
rule = json.loads(rule_json) if rule_json else None
tmp_watch_data['conditions'] = [rule]
tmp_watch_data['conditions_match_logic'] = "ALL" # Single rule, so use ALL
# Create a temporary application data structure for the rule check
temp_app_data = {
'watching': {
watch_uuid: tmp_watch_data
}
}
# Execute the rule against the current snapshot with form data
result = execute_ruleset_against_all_plugins(
current_watch_uuid=watch_uuid,
application_datastruct=temp_app_data,
ephemeral_data=ephemeral_data
)
return jsonify({
'status': 'success',
'result': result,
'message': 'Condition passes' if result else 'Condition does not pass'
})
except Exception as e:
return jsonify({
'status': 'error',
'message': f'Error verifying condition: {str(e)}'
}), 500
return conditions_blueprint

Wyświetl plik

@ -0,0 +1,78 @@
import re
import pluggy
from price_parser import Price
from loguru import logger
hookimpl = pluggy.HookimplMarker("changedetectionio_conditions")
@hookimpl
def register_operators():
def starts_with(_, text, prefix):
return text.lower().strip().startswith(str(prefix).strip().lower())
def ends_with(_, text, suffix):
return text.lower().strip().endswith(str(suffix).strip().lower())
def length_min(_, text, strlen):
return len(text) >= int(strlen)
def length_max(_, text, strlen):
return len(text) <= int(strlen)
# ✅ Custom function for case-insensitive regex matching
def contains_regex(_, text, pattern):
"""Returns True if `text` contains `pattern` (case-insensitive regex match)."""
return bool(re.search(pattern, str(text), re.IGNORECASE))
# ✅ Custom function for NOT matching case-insensitive regex
def not_contains_regex(_, text, pattern):
"""Returns True if `text` does NOT contain `pattern` (case-insensitive regex match)."""
return not bool(re.search(pattern, str(text), re.IGNORECASE))
return {
"!contains_regex": not_contains_regex,
"contains_regex": contains_regex,
"ends_with": ends_with,
"length_max": length_max,
"length_min": length_min,
"starts_with": starts_with,
}
@hookimpl
def register_operator_choices():
return [
("starts_with", "Text Starts With"),
("ends_with", "Text Ends With"),
("length_min", "Length minimum"),
("length_max", "Length maximum"),
("contains_regex", "Text Matches Regex"),
("!contains_regex", "Text Does NOT Match Regex"),
]
@hookimpl
def register_field_choices():
return [
("extracted_number", "Extracted number after 'Filters & Triggers'"),
# ("meta_description", "Meta Description"),
# ("meta_keywords", "Meta Keywords"),
("page_filtered_text", "Page text after 'Filters & Triggers'"),
#("page_title", "Page <title>"), # actual page title <title>
]
@hookimpl
def add_data(current_watch_uuid, application_datastruct, ephemeral_data):
res = {}
if 'text' in ephemeral_data:
res['page_filtered_text'] = ephemeral_data['text']
# Better to not wrap this in try/except so that the UI can see any errors
price = Price.fromstring(ephemeral_data.get('text'))
if price and price.amount != None:
# This is slightly misleading, it's extracting a PRICE not a Number..
res['extracted_number'] = float(price.amount)
logger.debug(f"Extracted number result: '{price}' - returning float({res['extracted_number']})")
return res

Wyświetl plik

@ -0,0 +1,6 @@
class EmptyConditionRuleRowNotUsable(Exception):
def __init__(self):
super().__init__("One of the 'conditions' rulesets is incomplete, cannot run.")
def __str__(self):
return self.args[0]

Wyświetl plik

@ -0,0 +1,44 @@
# Condition Rule Form (for each rule row)
from wtforms import Form, SelectField, StringField, validators
from wtforms import validators
class ConditionFormRow(Form):
# ✅ Ensure Plugins Are Loaded BEFORE Importing Choices
from changedetectionio.conditions import plugin_manager
from changedetectionio.conditions import operator_choices, field_choices
field = SelectField(
"Field",
choices=field_choices,
validators=[validators.Optional()]
)
operator = SelectField(
"Operator",
choices=operator_choices,
validators=[validators.Optional()]
)
value = StringField("Value", validators=[validators.Optional()])
def validate(self, extra_validators=None):
# First, run the default validators
if not super().validate(extra_validators):
return False
# Custom validation logic
# If any of the operator/field/value is set, then they must be all set
if any(value not in ("", False, "None", None) for value in [self.operator.data, self.field.data, self.value.data]):
if not self.operator.data or self.operator.data == 'None':
self.operator.errors.append("Operator is required.")
return False
if not self.field.data or self.field.data == 'None':
self.field.errors.append("Field is required.")
return False
if not self.value.data:
self.value.errors.append("Value is required.")
return False
return True # Only return True if all conditions pass

Wyświetl plik

@ -0,0 +1,44 @@
import pluggy
from . import default_plugin # Import the default plugin
# ✅ Ensure that the namespace in HookspecMarker matches PluginManager
PLUGIN_NAMESPACE = "changedetectionio_conditions"
hookspec = pluggy.HookspecMarker(PLUGIN_NAMESPACE)
hookimpl = pluggy.HookimplMarker(PLUGIN_NAMESPACE)
class ConditionsSpec:
"""Hook specifications for extending JSON Logic conditions."""
@hookspec
def register_operators():
"""Return a dictionary of new JSON Logic operators."""
pass
@hookspec
def register_operator_choices():
"""Return a list of new operator choices."""
pass
@hookspec
def register_field_choices():
"""Return a list of new field choices."""
pass
@hookspec
def add_data(current_watch_uuid, application_datastruct, ephemeral_data):
"""Add to the datadict"""
pass
# ✅ Set up Pluggy Plugin Manager
plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE)
# ✅ Register hookspecs (Ensures they are detected)
plugin_manager.add_hookspecs(ConditionsSpec)
# ✅ Register built-in plugins manually
plugin_manager.register(default_plugin, "default_plugin")
# ✅ Discover installed plugins from external packages (if any)
plugin_manager.load_setuptools_entrypoints(PLUGIN_NAMESPACE)

Wyświetl plik

@ -1382,8 +1382,10 @@ def changedetection_app(config=None, datastore_o=None):
@login_optionally_required
def watch_get_preview_rendered(uuid):
'''For when viewing the "preview" of the rendered text from inside of Edit'''
from flask import jsonify
from .processors.text_json_diff import prepare_filter_prevew
return prepare_filter_prevew(watch_uuid=uuid, datastore=datastore)
result = prepare_filter_prevew(watch_uuid=uuid, form_data=request.form, datastore=datastore)
return jsonify(result)
@app.route("/form/add/quickwatch", methods=['POST'])
@ -1684,6 +1686,9 @@ def changedetection_app(config=None, datastore_o=None):
import changedetectionio.blueprint.backups as backups
app.register_blueprint(backups.construct_blueprint(datastore), url_prefix='/backups')
import changedetectionio.conditions.blueprint as conditions
app.register_blueprint(conditions.construct_blueprint(datastore), url_prefix='/conditions')
# @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
threading.Thread(target=notification_runner).start()

Wyświetl plik

@ -3,6 +3,7 @@ import re
from loguru import logger
from wtforms.widgets.core import TimeInput
from changedetectionio.conditions.form import ConditionFormRow
from changedetectionio.strtobool import strtobool
from wtforms import (
@ -305,8 +306,10 @@ class ValidateAppRiseServers(object):
def __call__(self, form, field):
import apprise
apobj = apprise.Apprise()
# so that the custom endpoints are registered
from changedetectionio.apprise_plugin import apprise_custom_api_call_wrapper
from .apprise_asset import asset
for server_url in field.data:
url = server_url.strip()
if url.startswith("#"):
@ -509,6 +512,7 @@ class quickWatchForm(Form):
edit_and_watch_submit_button = SubmitField('Edit > Watch', render_kw={"class": "pure-button pure-button-primary"})
# Common to a single watch and the global settings
class commonSettingsForm(Form):
from . import processors
@ -596,6 +600,10 @@ class processor_text_json_diff_form(commonSettingsForm):
notification_muted = BooleanField('Notifications Muted / Off', default=False)
notification_screenshot = BooleanField('Attach screenshot to notification (where possible)', default=False)
conditions_match_logic = RadioField(u'Match', choices=[('ALL', 'Match all of the following'),('ANY', 'Match any of the following')], default='ALL')
conditions = FieldList(FormField(ConditionFormRow), min_entries=1) # Add rule logic here
def extra_tab_content(self):
return None

Wyświetl plik

@ -28,13 +28,13 @@ def _task(watch, update_handler):
return text_after_filter
def prepare_filter_prevew(datastore, watch_uuid):
def prepare_filter_prevew(datastore, watch_uuid, form_data):
'''Used by @app.route("/edit/<string:uuid>/preview-rendered", methods=['POST'])'''
from changedetectionio import forms, html_tools
from changedetectionio.model.Watch import model as watch_model
from concurrent.futures import ProcessPoolExecutor
from copy import deepcopy
from flask import request, jsonify
from flask import request
import brotli
import importlib
import os
@ -50,12 +50,12 @@ def prepare_filter_prevew(datastore, watch_uuid):
if tmp_watch and tmp_watch.history and os.path.isdir(tmp_watch.watch_data_dir):
# Splice in the temporary stuff from the form
form = forms.processor_text_json_diff_form(formdata=request.form if request.method == 'POST' else None,
data=request.form
form = forms.processor_text_json_diff_form(formdata=form_data if request.method == 'POST' else None,
data=form_data
)
# Only update vars that came in via the AJAX post
p = {k: v for k, v in form.data.items() if k in request.form.keys()}
p = {k: v for k, v in form.data.items() if k in form_data.keys()}
tmp_watch.update(p)
blank_watch_no_filters = watch_model()
blank_watch_no_filters['url'] = tmp_watch.get('url')
@ -103,13 +103,12 @@ def prepare_filter_prevew(datastore, watch_uuid):
logger.trace(f"Parsed in {time.time() - now:.3f}s")
return jsonify(
{
return ({
'after_filter': text_after_filter,
'before_filter': text_before_filter.decode('utf-8') if isinstance(text_before_filter, bytes) else text_before_filter,
'duration': time.time() - now,
'trigger_line_numbers': trigger_line_numbers,
'ignore_line_numbers': ignore_line_numbers,
}
)
})

Wyświetl plik

@ -6,6 +6,7 @@ import os
import re
import urllib3
from changedetectionio.conditions import execute_ruleset_against_all_plugins
from changedetectionio.processors import difference_detection_processor
from changedetectionio.html_tools import PERL_STYLE_REGEX, cdata_in_document_to_text, TRANSLATE_WHITESPACE_TABLE
from changedetectionio import html_tools, content_fetchers
@ -331,6 +332,16 @@ class perform_site_check(difference_detection_processor):
if result:
blocked = True
# And check if 'conditions' will let this pass through
if watch.get('conditions') and watch.get('conditions_match_logic'):
if not execute_ruleset_against_all_plugins(current_watch_uuid=watch.get('uuid'),
application_datastruct=self.datastore.data,
ephemeral_data={
'text': stripped_text_from_html
}
):
# Conditions say "Condition not met" so we block it.
blocked = True
# Looks like something changed, but did it match all the rules?
if blocked:

Wyświetl plik

@ -0,0 +1,150 @@
$(document).ready(function () {
// Function to set up button event handlers
function setupButtonHandlers() {
// Unbind existing handlers first to prevent duplicates
$(".addRuleRow, .removeRuleRow, .verifyRuleRow").off("click");
// Add row button handler
$(".addRuleRow").on("click", function(e) {
e.preventDefault();
let currentRow = $(this).closest("tr");
// Clone without events
let newRow = currentRow.clone(false);
// Reset input values in the cloned row
newRow.find("input").val("");
newRow.find("select").prop("selectedIndex", 0);
// Insert the new row after the current one
currentRow.after(newRow);
// Reindex all rows
reindexRules();
});
// Remove row button handler
$(".removeRuleRow").on("click", function(e) {
e.preventDefault();
// Only remove if there's more than one row
if ($("#rulesTable tbody tr").length > 1) {
$(this).closest("tr").remove();
reindexRules();
}
});
// Verify rule button handler
$(".verifyRuleRow").on("click", function(e) {
e.preventDefault();
let row = $(this).closest("tr");
let field = row.find("select[name$='field']").val();
let operator = row.find("select[name$='operator']").val();
let value = row.find("input[name$='value']").val();
// Validate that all fields are filled
if (!field || field === "None" || !operator || operator === "None" || !value) {
alert("Please fill in all fields (Field, Operator, and Value) before verifying.");
return;
}
// Create a rule object
const rule = {
field: field,
operator: operator,
value: value
};
// Show a spinner or some indication that verification is in progress
const $button = $(this);
const originalHTML = $button.html();
$button.html("⌛").prop("disabled", true);
// Collect form data - similar to request_textpreview_update() in watch-settings.js
let formData = new FormData();
$('#edit-text-filter textarea, #edit-text-filter input').each(function() {
const $element = $(this);
const name = $element.attr('name');
if (name) {
if ($element.is(':checkbox')) {
formData.append(name, $element.is(':checked') ? $element.val() : false);
} else {
formData.append(name, $element.val());
}
}
});
// Also collect select values
$('#edit-text-filter select').each(function() {
const $element = $(this);
const name = $element.attr('name');
if (name) {
formData.append(name, $element.val());
}
});
// Send the request to verify the rule
$.ajax({
url: verify_condition_rule_url+"?"+ new URLSearchParams({ rule: JSON.stringify(rule) }).toString(),
type: "POST",
data: formData,
processData: false, // Prevent jQuery from converting FormData to a string
contentType: false, // Let the browser set the correct content type
success: function (response) {
if (response.status === "success") {
if (response.result) {
alert("✅ Condition PASSES verification against current snapshot!");
} else {
alert("❌ Condition FAILS verification against current snapshot.");
}
} else {
alert("Error: " + response.message);
}
$button.html(originalHTML).prop("disabled", false);
},
error: function (xhr) {
let errorMsg = "Error verifying condition.";
if (xhr.responseJSON && xhr.responseJSON.message) {
errorMsg = xhr.responseJSON.message;
}
alert(errorMsg);
$button.html(originalHTML).prop("disabled", false);
}
});
});
}
// Function to reindex form elements and re-setup event handlers
function reindexRules() {
// Unbind all button handlers first
$(".addRuleRow, .removeRuleRow, .verifyRuleRow").off("click");
// Reindex all form elements
$("#rulesTable tbody tr").each(function(index) {
$(this).find("select, input").each(function() {
let oldName = $(this).attr("name");
let oldId = $(this).attr("id");
if (oldName) {
let newName = oldName.replace(/\d+/, index);
$(this).attr("name", newName);
}
if (oldId) {
let newId = oldId.replace(/\d+/, index);
$(this).attr("id", newId);
}
});
});
// Reattach event handlers after reindexing
setupButtonHandlers();
}
// Initial setup of button handlers
setupButtonHandlers();
});

Wyświetl plik

@ -26,7 +26,6 @@ function set_active_tab() {
if (tab.length) {
tab[0].parentElement.className = "active";
}
}
function focus_error_tab() {

Wyświetl plik

@ -0,0 +1,9 @@
ul#conditions_match_logic {
list-style: none;
input, label, li {
display: inline-block;
}
li {
padding-right: 1em;
}
}

Wyświetl plik

@ -13,6 +13,7 @@
@import "parts/_menu";
@import "parts/_love";
@import "parts/preview_text_filter";
@import "parts/_edit";
body {
color: var(--color-text);

Wyświetl plik

@ -523,6 +523,13 @@ body.preview-text-enabled {
z-index: 3;
box-shadow: 1px 1px 4px var(--color-shadow-jump); }
ul#conditions_match_logic {
list-style: none; }
ul#conditions_match_logic input, ul#conditions_match_logic label, ul#conditions_match_logic li {
display: inline-block; }
ul#conditions_match_logic li {
padding-right: 1em; }
body {
color: var(--color-text);
background: var(--color-background-page);

Wyświetl plik

@ -61,6 +61,43 @@
{{ field(**kwargs)|safe }}
{% endmacro %}
{% macro render_fieldlist_of_formfields_as_table(fieldlist, table_id="rulesTable") %}
<table class="fieldlist_formfields pure-table" id="{{ table_id }}">
<thead>
<tr>
{% for subfield in fieldlist[0] %}
<th>{{ subfield.label }}</th>
{% endfor %}
<th>Actions</th>
</tr>
</thead>
<tbody>
{% for form_row in fieldlist %}
<tr {% if form_row.errors %} class="error-row" {% endif %}>
{% for subfield in form_row %}
<td>
{{ subfield()|safe }}
{% if subfield.errors %}
<ul class="errors">
{% for error in subfield.errors %}
<li class="error">{{ error }}</li>
{% endfor %}
</ul>
{% endif %}
</td>
{% endfor %}
<td>
<button type="button" class="addRuleRow">+</button>
<button type="button" class="removeRuleRow">-</button>
<button type="button" class="verifyRuleRow" title="Verify this rule against current snapshot"></button>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endmacro %}
{% macro playwright_warning() %}
<p><strong>Error - Playwright support for Chrome based fetching is not enabled.</strong> Alternatively try our <a href="https://changedetection.io">very affordable subscription based service which has all this setup for you</a>.</p>
<p>You may need to <a href="https://github.com/dgtlmoon/changedetection.io/blob/09ebc6ec6338545bdd694dc6eee57f2e9d2b8075/docker-compose.yml#L31">Enable playwright environment variable</a> and uncomment the <strong>sockpuppetbrowser</strong> in the <a href="https://github.com/dgtlmoon/changedetection.io/blob/master/docker-compose.yml">docker-compose.yml</a> file.</p>

Wyświetl plik

@ -1,11 +1,14 @@
{% extends 'base.html' %}
{% block content %}
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, playwright_warning, only_webdriver_type_watches_warning %}
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, playwright_warning, only_webdriver_type_watches_warning, render_fieldlist_of_formfields_as_table %}
{% from '_common_fields.html' import render_common_settings_form %}
<script src="{{url_for('static_content', group='js', filename='tabs.js')}}" defer></script>
<script src="{{url_for('static_content', group='js', filename='vis.js')}}" defer></script>
<script src="{{url_for('static_content', group='js', filename='global-settings.js')}}" defer></script>
<script src="{{url_for('static_content', group='js', filename='scheduler.js')}}" defer></script>
<script src="{{url_for('static_content', group='js', filename='conditions.js')}}" defer></script>
<script>
const browser_steps_available_screenshots=JSON.parse('{{ watch.get_browsersteps_available_screenshots|tojson }}');
const browser_steps_config=JSON.parse('{{ browser_steps_config|tojson }}');
@ -50,6 +53,7 @@
{% if watch['processor'] == 'text_json_diff' %}
<li class="tab"><a id="visualselector-tab" href="#visualselector">Visual Filter Selector</a></li>
<li class="tab" id="filters-and-triggers-tab"><a href="#filters-and-triggers">Filters &amp; Triggers</a></li>
<li class="tab" id="conditions-tab"><a href="#conditions">Conditions</a></li>
{% endif %}
<li class="tab"><a href="#notifications">Notifications</a></li>
<li class="tab"><a href="#stats">Stats</a></li>
@ -274,13 +278,39 @@ Math: {{ 1 + 1 }}") }}
</div>
{% endif %}
<a href="#notifications" id="notification-setting-reset-to-default" class="pure-button button-xsmall" style="right: 20px; top: 20px; position: absolute; background-color: #5f42dd; border-radius: 4px; font-size: 70%; color: #fff">Use system defaults</a>
{{ render_common_settings_form(form, emailprefix, settings_application, extra_notification_token_placeholder_info) }}
</div>
</fieldset>
</div>
{% if watch['processor'] == 'text_json_diff' %}
<div class="tab-pane-inner" id="conditions">
<script>
const verify_condition_rule_url="{{url_for('conditions.verify_condition_single_rule', watch_uuid=uuid)}}";
</script>
<style>
.verifyRuleRow {
background-color: #4caf50;
color: white;
border: none;
cursor: pointer;
font-weight: bold;
}
.verifyRuleRow:hover {
background-color: #45a049;
}
</style>
<div class="pure-control-group">
{{ render_field(form.conditions_match_logic) }}
{{ render_fieldlist_of_formfields_as_table(form.conditions) }}
<div class="pure-form-message-inline">
<br>
Use the verify (✓) button to test if a condition passes against the current snapshot.<br><br>
Did you know that <strong>conditions</strong> can be extended with your own custom plugin? tutorials coming soon!<br>
</div>
</div>
</div>
<div class="tab-pane-inner" id="filters-and-triggers">
<span id="activate-text-preview" class="pure-button pure-button-primary button-xsmall">Activate preview</span>
<div>

Wyświetl plik

@ -165,6 +165,7 @@ def test_check_add_line_contains_trigger(client, live_server, measure_memory_usa
client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("index"))
assert b'unviewed' in res.data
# Takes a moment for apprise to fire

Wyświetl plik

@ -0,0 +1,133 @@
#!/usr/bin/env python3
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
def set_original_response(number="50"):
test_return_data = f"""<html>
<body>
<h1>Test Page for Conditions</h1>
<p>This page contains a number that will be tested with conditions.</p>
<div class="number-container">Current value: {number}</div>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
def set_number_in_range_response(number="75"):
test_return_data = f"""<html>
<body>
<h1>Test Page for Conditions</h1>
<p>This page contains a number that will be tested with conditions.</p>
<div class="number-container">Current value: {number}</div>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
def set_number_out_of_range_response(number="150"):
test_return_data = f"""<html>
<body>
<h1>Test Page for Conditions</h1>
<p>This page contains a number that will be tested with conditions.</p>
<div class="number-container">Current value: {number}</div>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
def test_conditions_with_text_and_number(client, live_server):
"""Test that both text and number conditions work together with AND logic."""
set_original_response("50")
live_server_setup(live_server)
test_url = url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
# Configure the watch with two conditions connected with AND:
# 1. The page filtered text must contain "5" (first digit of value)
# 2. The extracted number should be >= 20 and <= 100
res = client.post(
url_for("edit_page", uuid="first"),
data={
"url": test_url,
"fetch_backend": "html_requests",
"include_filters": ".number-container",
"title": "Number AND Text Condition Test",
"conditions_match_logic": "ALL", # ALL = AND logic
"conditions-0-operator": "in",
"conditions-0-field": "page_filtered_text",
"conditions-0-value": "5",
"conditions-1-operator": ">=",
"conditions-1-field": "extracted_number",
"conditions-1-value": "20",
"conditions-2-operator": "<=",
"conditions-2-field": "extracted_number",
"conditions-2-value": "100",
# So that 'operations' from pluggy discovery are tested
"conditions-3-operator": "length_min",
"conditions-3-field": "page_filtered_text",
"conditions-3-value": "1",
# So that 'operations' from pluggy discovery are tested
"conditions-4-operator": "length_max",
"conditions-4-field": "page_filtered_text",
"conditions-4-value": "100",
# So that 'operations' from pluggy discovery are tested
"conditions-5-operator": "contains_regex",
"conditions-5-field": "page_filtered_text",
"conditions-5-value": "\d",
},
follow_redirects=True
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
client.get(url_for("mark_all_viewed"), follow_redirects=True)
wait_for_all_checks(client)
# Case 1
set_number_in_range_response("70.5")
client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# 75 is > 20 and < 100 and contains "5"
res = client.get(url_for("index"))
assert b'unviewed' in res.data
# Case 2: Change with one condition violated
# Number out of range (150) but contains '5'
client.get(url_for("mark_all_viewed"), follow_redirects=True)
set_number_out_of_range_response("150.5")
client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Should NOT be marked as having changes since not all conditions are met
res = client.get(url_for("index"))
assert b'unviewed' not in res.data
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data

Wyświetl plik

@ -0,0 +1,82 @@
from changedetectionio.conditions import execute_ruleset_against_all_plugins
from changedetectionio.store import ChangeDetectionStore
import shutil
import tempfile
import time
import unittest
import uuid
class TestTriggerConditions(unittest.TestCase):
def setUp(self):
# Create a temporary directory for the test datastore
self.test_datastore_path = tempfile.mkdtemp()
# Initialize ChangeDetectionStore with our test path and no default watches
self.store = ChangeDetectionStore(
datastore_path=self.test_datastore_path,
include_default_watches=False
)
# Add a test watch
watch_url = "https://example.com"
self.watch_uuid = self.store.add_watch(url=watch_url)
def tearDown(self):
# Clean up the test datastore
self.store.stop_thread = True
time.sleep(0.5) # Give thread time to stop
shutil.rmtree(self.test_datastore_path)
def test_conditions_execution_pass(self):
# Get the watch object
watch = self.store.data['watching'][self.watch_uuid]
# Create and save a snapshot
first_content = "I saw 100 people at a rock show"
timestamp1 = int(time.time())
snapshot_id1 = str(uuid.uuid4())
watch.save_history_text(contents=first_content,
timestamp=timestamp1,
snapshot_id=snapshot_id1)
# Add another snapshot
second_content = "I saw 200 people at a rock show"
timestamp2 = int(time.time()) + 60
snapshot_id2 = str(uuid.uuid4())
watch.save_history_text(contents=second_content,
timestamp=timestamp2,
snapshot_id=snapshot_id2)
# Verify both snapshots are stored
history = watch.history
self.assertEqual(len(history), 2)
# Retrieve and check snapshots
#snapshot1 = watch.get_history_snapshot(str(timestamp1))
#snapshot2 = watch.get_history_snapshot(str(timestamp2))
self.store.data['watching'][self.watch_uuid].update(
{
"conditions_match_logic": "ALL",
"conditions": [
{"operator": ">=", "field": "extracted_number", "value": "10"},
{"operator": "<=", "field": "extracted_number", "value": "5000"},
{"operator": "in", "field": "page_text", "value": "rock"},
#{"operator": "starts_with", "field": "page_text", "value": "I saw"},
]
}
)
# ephemeral_data - some data that could exist before the watch saved a new version
result = execute_ruleset_against_all_plugins(current_watch_uuid=self.watch_uuid,
application_datastruct=self.store.data,
ephemeral_data={'text': "I saw 500 people at a rock show"})
# @todo - now we can test that 'Extract number' increased more than X since last time
self.assertTrue(result)
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -98,5 +98,17 @@ greenlet >= 3.0.3
# Pinned or it causes problems with flask_expects_json which seems unmaintained
referencing==0.35.1
# For conditions
panzi-json-logic
# For conditions - extracted number from a body of text
price-parser
# Scheduler - Windows seemed to miss a lot of default timezone info (even "UTC" !)
tzdata
#typing_extensions ==4.8.0
pluggy ~= 1.5