Merge branch 'master' into browser-notifications

browser-notifications
dgtlmoon 2025-09-17 18:38:58 +02:00
commit 8a0ca45b26
88 zmienionych plików z 1463 dodań i 245 usunięć

Wyświetl plik

@ -33,7 +33,6 @@ venv/
# Test and development files
test-datastore/
tests/
docs/
*.md
!README.md

Wyświetl plik

@ -41,7 +41,7 @@ jobs:
steps:
- uses: actions/checkout@v5
- name: Set up Python 3.11
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: 3.11

Wyświetl plik

@ -9,7 +9,7 @@ jobs:
steps:
- uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: "3.11"
- name: Install pypa/build
@ -39,7 +39,7 @@ jobs:
name: python-package-distributions
path: dist/
- name: Set up Python 3.11
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: '3.11'
- name: Test that the basic pip built package runs without error

Wyświetl plik

@ -48,7 +48,7 @@ jobs:
steps:
- uses: actions/checkout@v5
- name: Set up Python 3.11
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: 3.11

Wyświetl plik

@ -24,7 +24,7 @@ jobs:
# Mainly just for link/flake8
- name: Set up Python ${{ env.PYTHON_VERSION }}
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: ${{ env.PYTHON_VERSION }}

Wyświetl plik

@ -84,6 +84,11 @@ EXPOSE 5000
# The actual flask app module
COPY changedetectionio /app/changedetectionio
# Also for OpenAPI validation wrapper - needs the YML
RUN [ ! -d "/app/docs" ] && mkdir /app/docs
COPY docs/api-spec.yaml /app/docs/api-spec.yaml
# Starting wrapper
COPY changedetection.py /app/changedetection.py

Wyświetl plik

@ -1,7 +1,7 @@
recursive-include changedetectionio/api *
recursive-include changedetectionio/blueprint *
recursive-include changedetectionio/content_fetchers *
recursive-include changedetectionio/conditions *
recursive-include changedetectionio/content_fetchers *
recursive-include changedetectionio/model *
recursive-include changedetectionio/notification *
recursive-include changedetectionio/processors *
@ -9,6 +9,7 @@ recursive-include changedetectionio/realtime *
recursive-include changedetectionio/static *
recursive-include changedetectionio/templates *
recursive-include changedetectionio/tests *
recursive-include changedetectionio/widgets *
prune changedetectionio/static/package-lock.json
prune changedetectionio/static/styles/node_modules
prune changedetectionio/static/styles/package-lock.json

Wyświetl plik

@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
__version__ = '0.50.11'
__version__ = '0.50.14'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError

Wyświetl plik

@ -14,6 +14,39 @@ import copy
from . import schema, schema_create_watch, schema_update_watch, validate_openapi_request
def validate_time_between_check_required(json_data):
"""
Validate that at least one time interval is specified when not using default settings.
Returns None if valid, or error message string if invalid.
Defaults to using global settings if time_between_check_use_default is not provided.
"""
# Default to using global settings if not specified
use_default = json_data.get('time_between_check_use_default', True)
# If using default settings, no validation needed
if use_default:
return None
# If not using defaults, check if time_between_check exists and has at least one non-zero value
time_check = json_data.get('time_between_check')
if not time_check:
# No time_between_check provided and not using defaults - this is an error
return "At least one time interval (weeks, days, hours, minutes, or seconds) must be specified when not using global settings."
# time_between_check exists, check if it has at least one non-zero value
if any([
(time_check.get('weeks') or 0) > 0,
(time_check.get('days') or 0) > 0,
(time_check.get('hours') or 0) > 0,
(time_check.get('minutes') or 0) > 0,
(time_check.get('seconds') or 0) > 0
]):
return None
# time_between_check exists but all values are 0 or empty - this is an error
return "At least one time interval (weeks, days, hours, minutes, or seconds) must be specified when not using global settings."
class Watch(Resource):
def __init__(self, **kwargs):
# datastore is a black box dependency
@ -55,6 +88,8 @@ class Watch(Resource):
# attr .last_changed will check for the last written text snapshot on change
watch['last_changed'] = watch.last_changed
watch['viewed'] = watch.viewed
watch['link'] = watch.link,
return watch
@auth.check_token
@ -81,6 +116,11 @@ class Watch(Resource):
if not request.json.get('proxy') in plist:
return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400
# Validate time_between_check when not using defaults
validation_error = validate_time_between_check_required(request.json)
if validation_error:
return validation_error, 400
watch.update(request.json)
return "OK", 200
@ -196,6 +236,11 @@ class CreateWatch(Resource):
if not json_data.get('proxy') in plist:
return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400
# Validate time_between_check when not using defaults
validation_error = validate_time_between_check_required(json_data)
if validation_error:
return validation_error, 400
extras = copy.deepcopy(json_data)
# Because we renamed 'tag' to 'tags' but don't want to change the API (can do this in v2 of the API)
@ -230,6 +275,8 @@ class CreateWatch(Resource):
'last_changed': watch.last_changed,
'last_checked': watch['last_checked'],
'last_error': watch['last_error'],
'link': watch.link,
'page_title': watch['page_title'],
'title': watch['title'],
'url': watch['url'],
'viewed': watch.viewed

Wyświetl plik

@ -2,6 +2,7 @@ import copy
import yaml
import functools
from flask import request, abort
from loguru import logger
from openapi_core import OpenAPI
from openapi_core.contrib.flask import FlaskOpenAPIRequest
from . import api_schema
@ -13,6 +14,7 @@ schema = api_schema.build_watch_json_schema(watch_base_config)
schema_create_watch = copy.deepcopy(schema)
schema_create_watch['required'] = ['url']
del schema_create_watch['properties']['last_viewed']
schema_update_watch = copy.deepcopy(schema)
schema_update_watch['additionalProperties'] = False
@ -30,17 +32,13 @@ schema_create_notification_urls['required'] = ['notification_urls']
schema_delete_notification_urls = copy.deepcopy(schema_notification_urls)
schema_delete_notification_urls['required'] = ['notification_urls']
# Load OpenAPI spec for validation
_openapi_spec = None
@functools.cache
def get_openapi_spec():
global _openapi_spec
if _openapi_spec is None:
import os
spec_path = os.path.join(os.path.dirname(__file__), '../../docs/api-spec.yaml')
with open(spec_path, 'r') as f:
spec_dict = yaml.safe_load(f)
_openapi_spec = OpenAPI.from_dict(spec_dict)
import os
spec_path = os.path.join(os.path.dirname(__file__), '../../docs/api-spec.yaml')
with open(spec_path, 'r') as f:
spec_dict = yaml.safe_load(f)
_openapi_spec = OpenAPI.from_dict(spec_dict)
return _openapi_spec
def validate_openapi_request(operation_id):
@ -49,16 +47,25 @@ def validate_openapi_request(operation_id):
@functools.wraps(f)
def wrapper(*args, **kwargs):
try:
spec = get_openapi_spec()
openapi_request = FlaskOpenAPIRequest(request)
result = spec.unmarshal_request(openapi_request, operation_id)
if result.errors:
abort(400, message=f"OpenAPI validation failed: {result.errors}")
return f(*args, **kwargs)
# Skip OpenAPI validation for GET requests since they don't have request bodies
if request.method.upper() != 'GET':
spec = get_openapi_spec()
openapi_request = FlaskOpenAPIRequest(request)
result = spec.unmarshal_request(openapi_request)
if result.errors:
from werkzeug.exceptions import BadRequest
error_details = []
for error in result.errors:
error_details.append(str(error))
raise BadRequest(f"OpenAPI validation failed: {error_details}")
except BadRequest:
# Re-raise BadRequest exceptions (validation failures)
raise
except Exception as e:
# If OpenAPI validation fails, log but don't break existing functionality
print(f"OpenAPI validation warning for {operation_id}: {e}")
return f(*args, **kwargs)
# If OpenAPI spec loading fails, log but don't break existing functionality
logger.critical(f"OpenAPI validation warning for {operation_id}: {e}")
abort(500)
return f(*args, **kwargs)
return wrapper
return decorator
@ -68,3 +75,4 @@ from .Tags import Tags, Tag
from .Import import Import
from .SystemInfo import SystemInfo
from .Notifications import Notifications

Wyświetl plik

@ -78,6 +78,13 @@ def build_watch_json_schema(d):
]:
schema['properties'][v]['anyOf'].append({'type': 'string', "maxLength": 5000})
for v in ['last_viewed']:
schema['properties'][v] = {
"type": "integer",
"description": "Unix timestamp in seconds of the last time the watch was viewed.",
"minimum": 0
}
# None or Boolean
schema['properties']['track_ldjson_price_data']['anyOf'].append({'type': 'boolean'})
@ -112,6 +119,12 @@ def build_watch_json_schema(d):
schema['properties']['time_between_check'] = build_time_between_check_json_schema()
schema['properties']['time_between_check_use_default'] = {
"type": "boolean",
"default": True,
"description": "Whether to use global settings for time between checks - defaults to true if not set"
}
schema['properties']['browser_steps'] = {
"anyOf": [
{

Wyświetl plik

@ -310,15 +310,6 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
continue
if process_changedetection_results:
# Extract title if needed
if datastore.data['settings']['application'].get('extract_title_as_title') or watch['extract_title_as_title']:
if not watch['title'] or not len(watch['title']):
try:
update_obj['title'] = html_tools.extract_element(find='title', html_content=update_handler.fetcher.content)
logger.info(f"UUID: {uuid} Extract <title> updated title to '{update_obj['title']}")
except Exception as e:
logger.warning(f"UUID: {uuid} Extract <title> as watch title was enabled, but couldn't find a <title>.")
try:
datastore.update_watch(uuid=uuid, update_obj=update_obj)
@ -357,6 +348,14 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
# Always record attempt count
count = watch.get('check_count', 0) + 1
# Always record page title (used in notifications, and can change even when the content is the same)
try:
page_title = html_tools.extract_title(data=update_handler.fetcher.content)
logger.debug(f"UUID: {uuid} Page <title> is '{page_title}'")
datastore.update_watch(uuid=uuid, update_obj={'page_title': page_title})
except Exception as e:
logger.warning(f"UUID: {uuid} Exception when extracting <title> - {str(e)}")
# Record server header
try:
server_header = update_handler.fetcher.headers.get('server', '').strip().lower()[:255]

Wyświetl plik

@ -108,10 +108,13 @@ def construct_blueprint(datastore: ChangeDetectionStore):
fe.link(link=diff_link)
# @todo watch should be a getter - watch.get('title') (internally if URL else..)
# Same logic as watch-overview.html
if datastore.data['settings']['application']['ui'].get('use_page_title_in_list') or watch.get('use_page_title_in_list'):
watch_label = watch.label
else:
watch_label = watch.get('url')
watch_title = watch.get('title') if watch.get('title') else watch.get('url')
fe.title(title=watch_title)
fe.title(title=watch_label)
try:
html_diff = diff.render_diff(previous_version_file_contents=watch.get_history_snapshot(dates[-2]),
@ -127,7 +130,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# @todo User could decide if <link> goes to the diff page, or to the watch link
rss_template = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_title}}</a></h4>\n<p>{{html_diff}}</p>\n</body></html>\n"
content = jinja_render(template_str=rss_template, watch_title=watch_title, html_diff=html_diff, watch_url=watch.link)
content = jinja_render(template_str=rss_template, watch_title=watch_label, html_diff=html_diff, watch_url=watch.link)
# Out of range chars could also break feedgen
if scan_invalid_chars_in_rss(content):

Wyświetl plik

@ -1,7 +1,7 @@
{% extends 'base.html' %}
{% block content %}
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form %}
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, render_ternary_field %}
{% from '_common_fields.html' import render_common_settings_form %}
<script>
const notification_base_url="{{url_for('ui.ui_notification.ajax_callback_send_notification_test', mode="global-settings")}}";
@ -75,18 +75,10 @@
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.pager_size) }}
<span class="pure-form-message-inline">Number of items per page in the watch overview list, 0 to disable.</span>
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_content_format) }}
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.extract_title_as_title) }}
<span class="pure-form-message-inline">Note: This will automatically apply to all existing watches.</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.empty_pages_are_a_change) }}
<span class="pure-form-message-inline">When a request returns no content, or the HTML does not contain any text, is this considered a change?</span>
@ -260,6 +252,13 @@ nav
{{ render_checkbox_field(form.application.form.ui.form.favicons_enabled, class="") }}
<span class="pure-form-message-inline">Enable or Disable Favicons next to the watch list</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.ui.use_page_title_in_list) }}
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.pager_size) }}
<span class="pure-form-message-inline">Number of items per page in the watch overview list, 0 to disable.</span>
</div>
</div>
<div class="tab-pane-inner" id="proxies">
@ -324,8 +323,8 @@ nav
<div id="actions">
<div class="pure-control-group">
{{ render_button(form.save_button) }}
<a href="{{url_for('watchlist.index')}}" class="pure-button button-small button-cancel">Back</a>
<a href="{{url_for('ui.clear_all_history')}}" class="pure-button button-small button-error">Clear Snapshot History</a>
<a href="{{url_for('watchlist.index')}}" class="pure-button button-cancel">Back</a>
<a href="{{url_for('ui.clear_all_history')}}" class="pure-button button-error">Clear Snapshot History</a>
</div>
</div>
</form>

Wyświetl plik

@ -1,6 +1,6 @@
{% extends 'base.html' %}
{% block content %}
{% from '_helpers.html' import render_field, render_checkbox_field, render_button %}
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_ternary_field %}
{% from '_common_fields.html' import render_common_settings_form %}
<script>
const notification_base_url="{{url_for('ui.ui_notification.ajax_callback_send_notification_test', mode="group-settings")}}";
@ -64,7 +64,7 @@
<div class="tab-pane-inner" id="notifications">
<fieldset>
<div class="pure-control-group inline-radio">
{{ render_checkbox_field(form.notification_muted) }}
{{ render_ternary_field(form.notification_muted, BooleanField=True) }}
</div>
{% if 1 %}
<div class="pure-control-group inline-radio">

Wyświetl plik

@ -242,6 +242,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
'available_timezones': sorted(available_timezones()),
'browser_steps_config': browser_step_ui_config,
'emailprefix': os.getenv('NOTIFICATION_MAIL_BUTTON_PREFIX', False),
'extra_classes': 'checking-now' if worker_handler.is_watch_running(uuid) else '',
'extra_notification_token_placeholder_info': datastore.get_unique_notification_token_placeholders_available(),
'extra_processor_config': form.extra_tab_content(),
'extra_title': f" - Edit - {watch.label}",

Wyświetl plik

@ -1,6 +1,6 @@
{% extends 'base.html' %}
{% block content %}
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, playwright_warning, only_playwright_type_watches_warning, render_conditions_fieldlist_of_formfields_as_table %}
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, playwright_warning, only_playwright_type_watches_warning, render_conditions_fieldlist_of_formfields_as_table, render_ternary_field %}
{% from '_common_fields.html' import render_common_settings_form %}
<script src="{{url_for('static_content', group='js', filename='tabs.js')}}" defer></script>
<script src="{{url_for('static_content', group='js', filename='vis.js')}}" defer></script>
@ -72,15 +72,16 @@
<div class="pure-form-message">Some sites use JavaScript to create the content, for this you should <a href="https://github.com/dgtlmoon/changedetection.io/wiki/Fetching-pages-with-WebDriver">use the Chrome/WebDriver Fetcher</a></div>
<div class="pure-form-message">Variables are supported in the URL (<a href="https://github.com/dgtlmoon/changedetection.io/wiki/Handling-variables-in-the-watched-URL">help and examples here</a>).</div>
</div>
<div class="pure-control-group">
{{ render_field(form.tags) }}
<span class="pure-form-message-inline">Organisational tag/group name used in the main listing page</span>
</div>
<div class="pure-control-group inline-radio">
{{ render_field(form.processor) }}
</div>
<div class="pure-control-group">
{{ render_field(form.title, class="m-d") }}
</div>
<div class="pure-control-group">
{{ render_field(form.tags) }}
<span class="pure-form-message-inline">Organisational tag/group name used in the main listing page</span>
{{ render_field(form.title, class="m-d", placeholder=watch.label) }}
<span class="pure-form-message-inline">Automatically uses the page title if found, you can also use your own title/description here</span>
</div>
<div class="pure-control-group time-between-check border-fieldset">
@ -101,15 +102,16 @@
</div>
<br>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.extract_title_as_title) }}
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.filter_failure_notification_send) }}
<span class="pure-form-message-inline">
Sends a notification when the filter can no longer be seen on the page, good for knowing when the page changed and your filter will not work anymore.
</span>
</div>
<div class="pure-control-group">
{{ render_ternary_field(form.use_page_title_in_list) }}
</div>
</fieldset>
</div>
@ -262,7 +264,7 @@ Math: {{ 1 + 1 }}") }}
<div class="tab-pane-inner" id="notifications">
<fieldset>
<div class="pure-control-group inline-radio">
{{ render_checkbox_field(form.notification_muted) }}
{{ render_ternary_field(form.notification_muted, BooleanField=true) }}
</div>
{% if watch_needs_selenium_or_playwright %}
<div class="pure-control-group inline-radio">
@ -469,11 +471,11 @@ Math: {{ 1 + 1 }}") }}
<div class="pure-control-group">
{{ render_button(form.save_button) }}
<a href="{{url_for('ui.form_delete', uuid=uuid)}}"
class="pure-button button-small button-error ">Delete</a>
class="pure-button button-error ">Delete</a>
{% if watch.history_n %}<a href="{{url_for('ui.clear_watch_history', uuid=uuid)}}"
class="pure-button button-small button-error ">Clear History</a>{% endif %}
class="pure-button button-error">Clear History</a>{% endif %}
<a href="{{url_for('ui.form_clone', uuid=uuid)}}"
class="pure-button button-small ">Clone &amp; Edit</a>
class="pure-button">Clone &amp; Edit</a>
</div>
</div>
</form>

Wyświetl plik

@ -44,12 +44,16 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
# Sort by last_changed and add the uuid which is usually the key..
sorted_watches = []
with_errors = request.args.get('with_errors') == "1"
unread_only = request.args.get('unread') == "1"
errored_count = 0
search_q = request.args.get('q').strip().lower() if request.args.get('q') else False
for uuid, watch in datastore.data['watching'].items():
if with_errors and not watch.get('last_error'):
continue
if unread_only and (watch.viewed or watch.last_changed == 0) :
continue
if active_tag_uuid and not active_tag_uuid in watch['tags']:
continue
if watch.get('last_error'):

Wyświetl plik

@ -118,7 +118,8 @@ document.addEventListener('DOMContentLoaded', function() {
{%- set checking_now = is_checking_now(watch) -%}
{%- set history_n = watch.history_n -%}
{%- set favicon = watch.get_favicon_filename() -%}
{# Mirror in changedetectionio/static/js/realtime.js for the frontend #}
{%- set system_use_url_watchlist = datastore.data['settings']['application']['ui'].get('use_page_title_in_list') -%}
{# Class settings mirrored in changedetectionio/static/js/realtime.js for the frontend #}
{%- set row_classes = [
loop.cycle('pure-table-odd', 'pure-table-even'),
'processor-' ~ watch['processor'],
@ -133,7 +134,8 @@ document.addEventListener('DOMContentLoaded', function() {
'checking-now' if checking_now else '',
'notification_muted' if watch.notification_muted else '',
'single-history' if history_n == 1 else '',
'multiple-history' if history_n >= 2 else '',
'multiple-history' if history_n >= 2 else '',
'use-html-title' if system_use_url_watchlist else 'no-html-title',
] -%}
<tr id="{{ watch.uuid }}" data-watch-uuid="{{ watch.uuid }}" class="{{ row_classes | reject('equalto', '') | join(' ') }}">
<td class="inline checkbox-uuid" ><div><input name="uuids" type="checkbox" value="{{ watch.uuid}} " > <span class="counter-i">{{ loop.index+pagination.skip }}</span></div></td>
@ -155,7 +157,12 @@ document.addEventListener('DOMContentLoaded', function() {
{% endif %}
<div>
<span class="watch-title">
{{watch.title if watch.title is not none and watch.title|length > 0 else watch.url}}&nbsp;<a class="external" target="_blank" rel="noopener" href="{{ watch.link.replace('source:','') }}">&nbsp;</a>
{% if system_use_url_watchlist or watch.get('use_page_title_in_list') %}
{{ watch.label }}
{% else %}
{{ watch.get('title') or watch.link }}
{% endif %}
<a class="external" target="_blank" rel="noopener" href="{{ watch.link.replace('source:','') }}">&nbsp;</a>
</span>
<div class="error-text" style="display:none;">{{ watch.compile_error_texts(has_proxies=datastore.proxy_list) }}</div>
{%- if watch['processor'] == 'text_json_diff' -%}
@ -245,6 +252,9 @@ document.addEventListener('DOMContentLoaded', function() {
<a href="{{url_for('ui.mark_all_viewed', tag=active_tag_uuid) }}" class="pure-button button-tag " id="mark-all-viewed">Mark all viewed in '{{active_tag.title}}'</a>
</li>
{%- endif -%}
<li id="post-list-unread" class="{%- if has_unviewed -%}has-unviewed{%- endif -%}" style="display: none;" >
<a href="{{url_for('watchlist.index', unread=1, tag=request.args.get('tag')) }}" class="pure-button button-tag">Unread</a>
</li>
<li>
<a href="{{ url_for('ui.form_watch_checknow', tag=active_tag_uuid, with_errors=request.args.get('with_errors',0)) }}" class="pure-button button-tag" id="recheck-all">Recheck
all {% if active_tag_uuid %} in '{{active_tag.title}}'{%endif%}</a>

Wyświetl plik

@ -47,6 +47,7 @@ async () => {
'nicht lieferbar',
'nicht verfügbar',
'nicht vorrätig',
'nicht mehr lieferbar',
'nicht zur verfügung',
'nie znaleziono produktów',
'niet beschikbaar',

Wyświetl plik

@ -23,11 +23,14 @@ from wtforms import (
)
from flask_wtf.file import FileField, FileAllowed
from wtforms.fields import FieldList
from wtforms.utils import unset_value
from wtforms.validators import ValidationError
from validators.url import url as url_validator
from changedetectionio.widgets import TernaryNoneBooleanField
# default
# each select <option data-enabled="enabled-0-0"
@ -54,6 +57,8 @@ valid_method = {
default_method = 'GET'
allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False'))
REQUIRE_ATLEAST_ONE_TIME_PART_MESSAGE_DEFAULT='At least one time interval (weeks, days, hours, minutes, or seconds) must be specified.'
REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT='At least one time interval (weeks, days, hours, minutes, or seconds) must be specified when not using global settings.'
class StringListField(StringField):
widget = widgets.TextArea()
@ -210,6 +215,35 @@ class ScheduleLimitForm(Form):
self.sunday.form.enabled.label.text = "Sunday"
def validate_time_between_check_has_values(form):
"""
Custom validation function for TimeBetweenCheckForm.
Returns True if at least one time interval field has a value > 0.
"""
res = any([
form.weeks.data and int(form.weeks.data) > 0,
form.days.data and int(form.days.data) > 0,
form.hours.data and int(form.hours.data) > 0,
form.minutes.data and int(form.minutes.data) > 0,
form.seconds.data and int(form.seconds.data) > 0
])
return res
class RequiredTimeInterval(object):
"""
WTForms validator that ensures at least one time interval field has a value > 0.
Use this with FormField(TimeBetweenCheckForm, validators=[RequiredTimeInterval()]).
"""
def __init__(self, message=None):
self.message = message or 'At least one time interval (weeks, days, hours, minutes, or seconds) must be specified.'
def __call__(self, form, field):
if not validate_time_between_check_has_values(field.form):
raise ValidationError(self.message)
class TimeBetweenCheckForm(Form):
weeks = IntegerField('Weeks', validators=[validators.Optional(), validators.NumberRange(min=0, message="Should contain zero or more seconds")])
days = IntegerField('Days', validators=[validators.Optional(), validators.NumberRange(min=0, message="Should contain zero or more seconds")])
@ -218,6 +252,123 @@ class TimeBetweenCheckForm(Form):
seconds = IntegerField('Seconds', validators=[validators.Optional(), validators.NumberRange(min=0, message="Should contain zero or more seconds")])
# @todo add total seconds minimum validatior = minimum_seconds_recheck_time
def __init__(self, formdata=None, obj=None, prefix="", data=None, meta=None, **kwargs):
super().__init__(formdata, obj, prefix, data, meta, **kwargs)
self.require_at_least_one = kwargs.get('require_at_least_one', False)
self.require_at_least_one_message = kwargs.get('require_at_least_one_message', REQUIRE_ATLEAST_ONE_TIME_PART_MESSAGE_DEFAULT)
def validate(self, **kwargs):
"""Custom validation that can optionally require at least one time interval."""
# Run normal field validation first
if not super().validate(**kwargs):
return False
# Apply optional "at least one" validation
if self.require_at_least_one:
if not validate_time_between_check_has_values(self):
# Add error to the form's general errors (not field-specific)
if not hasattr(self, '_formdata_errors'):
self._formdata_errors = []
self._formdata_errors.append(self.require_at_least_one_message)
return False
return True
class EnhancedFormField(FormField):
"""
An enhanced FormField that supports conditional validation with top-level error messages.
Adds a 'top_errors' property for validation errors at the FormField level.
"""
def __init__(self, form_class, label=None, validators=None, separator="-",
conditional_field=None, conditional_message=None, conditional_test_function=None, **kwargs):
"""
Initialize EnhancedFormField with optional conditional validation.
:param conditional_field: Name of the field this FormField depends on (e.g. 'time_between_check_use_default')
:param conditional_message: Error message to show when validation fails
:param conditional_test_function: Custom function to test if FormField has valid values.
Should take self.form as parameter and return True if valid.
"""
super().__init__(form_class, label, validators, separator, **kwargs)
self.top_errors = []
self.conditional_field = conditional_field
self.conditional_message = conditional_message or "At least one field must have a value when not using defaults."
self.conditional_test_function = conditional_test_function
def validate(self, form, extra_validators=()):
"""
Custom validation that supports conditional logic and stores top-level errors.
"""
self.top_errors = []
# First run the normal FormField validation
base_valid = super().validate(form, extra_validators)
# Apply conditional validation if configured
if self.conditional_field and hasattr(form, self.conditional_field):
conditional_field_obj = getattr(form, self.conditional_field)
# If the conditional field is False/unchecked, check if this FormField has any values
if not conditional_field_obj.data:
# Use custom test function if provided, otherwise use generic fallback
if self.conditional_test_function:
has_any_value = self.conditional_test_function(self.form)
else:
# Generic fallback - check if any field has truthy data
has_any_value = any(field.data for field in self.form if hasattr(field, 'data') and field.data)
if not has_any_value:
self.top_errors.append(self.conditional_message)
base_valid = False
return base_valid
class RequiredFormField(FormField):
"""
A FormField that passes require_at_least_one=True to TimeBetweenCheckForm.
Use this when you want the sub-form to always require at least one value.
"""
def __init__(self, form_class, label=None, validators=None, separator="-", **kwargs):
super().__init__(form_class, label, validators, separator, **kwargs)
def process(self, formdata, data=unset_value, extra_filters=None):
if extra_filters:
raise TypeError(
"FormField cannot take filters, as the encapsulated"
"data is not mutable."
)
if data is unset_value:
try:
data = self.default()
except TypeError:
data = self.default
self._obj = data
self.object_data = data
prefix = self.name + self.separator
# Pass require_at_least_one=True to the sub-form
if isinstance(data, dict):
self.form = self.form_class(formdata=formdata, prefix=prefix, require_at_least_one=True, **data)
else:
self.form = self.form_class(formdata=formdata, obj=data, prefix=prefix, require_at_least_one=True)
@property
def errors(self):
"""Include sub-form validation errors"""
form_errors = self.form.errors
# Add any general form errors to a special 'form' key
if hasattr(self.form, '_formdata_errors') and self.form._formdata_errors:
form_errors = dict(form_errors) # Make a copy
form_errors['form'] = self.form._formdata_errors
return form_errors
# Separated by key:value
class StringDictKeyValue(StringField):
widget = widgets.TextArea()
@ -346,7 +497,7 @@ class ValidateJinja2Template(object):
joined_data = ' '.join(map(str, field.data)) if isinstance(field.data, list) else f"{field.data}"
try:
jinja2_env = ImmutableSandboxedEnvironment(loader=BaseLoader)
jinja2_env = ImmutableSandboxedEnvironment(loader=BaseLoader, extensions=['jinja2_time.TimeExtension'])
jinja2_env.globals.update(notification.valid_tokens)
# Extra validation tokens provided on the form_class(... extra_tokens={}) setup
if hasattr(field, 'extra_notification_tokens'):
@ -548,7 +699,6 @@ class commonSettingsForm(Form):
self.notification_title.extra_notification_tokens = kwargs.get('extra_notification_tokens', {})
self.notification_urls.extra_notification_tokens = kwargs.get('extra_notification_tokens', {})
extract_title_as_title = BooleanField('Extract <title> from document and use as watch title', default=False)
fetch_backend = RadioField(u'Fetch Method', choices=content_fetchers.available_fetchers(), validators=[ValidateContentFetcherIsReady()])
notification_body = TextAreaField('Notification Body', default='{{ watch_url }} had a change.', validators=[validators.Optional(), ValidateJinja2Template()])
notification_format = SelectField('Notification format', choices=valid_notification_formats.keys())
@ -583,11 +733,16 @@ class processor_text_json_diff_form(commonSettingsForm):
url = fields.URLField('URL', validators=[validateURL()])
tags = StringTagUUID('Group tag', [validators.Optional()], default='')
time_between_check = FormField(TimeBetweenCheckForm)
time_between_check = EnhancedFormField(
TimeBetweenCheckForm,
conditional_field='time_between_check_use_default',
conditional_message=REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT,
conditional_test_function=validate_time_between_check_has_values
)
time_schedule_limit = FormField(ScheduleLimitForm)
time_between_check_use_default = BooleanField('Use global settings for time between check', default=False)
time_between_check_use_default = BooleanField('Use global settings for time between check and scheduler.', default=False)
include_filters = StringListField('CSS/JSONPath/JQ/XPath Filters', [ValidateCSSJSONXPATHInput()], default='')
@ -617,18 +772,18 @@ class processor_text_json_diff_form(commonSettingsForm):
text_should_not_be_present = StringListField('Block change-detection while text matches', [validators.Optional(), ValidateListRegex()])
webdriver_js_execute_code = TextAreaField('Execute JavaScript before change detection', render_kw={"rows": "5"}, validators=[validators.Optional()])
save_button = SubmitField('Save', render_kw={"class": "pure-button button-small pure-button-primary"})
save_button = SubmitField('Save', render_kw={"class": "pure-button pure-button-primary"})
proxy = RadioField('Proxy')
# filter_failure_notification_send @todo make ternary
filter_failure_notification_send = BooleanField(
'Send a notification when the filter can no longer be found on the page', default=False)
notification_muted = BooleanField('Notifications Muted / Off', default=False)
notification_muted = TernaryNoneBooleanField('Notifications', default=None, yes_text="Muted", no_text="On")
notification_screenshot = BooleanField('Attach screenshot to notification (where possible)', default=False)
conditions_match_logic = RadioField(u'Match', choices=[('ALL', 'Match all of the following'),('ANY', 'Match any of the following')], default='ALL')
conditions = FieldList(FormField(ConditionFormRow), min_entries=1) # Add rule logic here
use_page_title_in_list = TernaryNoneBooleanField('Use page <title> in list', default=None)
def extra_tab_content(self):
return None
@ -728,7 +883,7 @@ class DefaultUAInputForm(Form):
# datastore.data['settings']['requests']..
class globalSettingsRequestForm(Form):
time_between_check = FormField(TimeBetweenCheckForm)
time_between_check = RequiredFormField(TimeBetweenCheckForm)
time_schedule_limit = FormField(ScheduleLimitForm)
proxy = RadioField('Proxy')
jitter_seconds = IntegerField('Random jitter seconds ± check',
@ -756,6 +911,7 @@ class globalSettingsApplicationUIForm(Form):
open_diff_in_new_tab = BooleanField("Open 'History' page in a new tab", default=True, validators=[validators.Optional()])
socket_io_enabled = BooleanField('Realtime UI Updates Enabled', default=True, validators=[validators.Optional()])
favicons_enabled = BooleanField('Favicons Enabled', default=True, validators=[validators.Optional()])
use_page_title_in_list = BooleanField('Use page <title> in watch overview list') #BooleanField=True
# datastore.data['settings']['application']..
class globalSettingsApplicationForm(commonSettingsForm):
@ -780,7 +936,7 @@ class globalSettingsApplicationForm(commonSettingsForm):
removepassword_button = SubmitField('Remove password', render_kw={"class": "pure-button pure-button-primary"})
render_anchor_tag_content = BooleanField('Render anchor tag content', default=False)
shared_diff_access = BooleanField('Allow access to view diff page when password is enabled', default=False, validators=[validators.Optional()])
shared_diff_access = BooleanField('Allow anonymous access to watch history page when password is enabled', default=False, validators=[validators.Optional()])
rss_hide_muted_watches = BooleanField('Hide muted watches from RSS feed', default=True,
validators=[validators.Optional()])
filter_failure_notification_threshold_attempts = IntegerField('Number of times the filter can be missing before sending a notification',
@ -802,7 +958,7 @@ class globalSettingsForm(Form):
requests = FormField(globalSettingsRequestForm)
application = FormField(globalSettingsApplicationForm)
save_button = SubmitField('Save', render_kw={"class": "pure-button button-small pure-button-primary"})
save_button = SubmitField('Save', render_kw={"class": "pure-button pure-button-primary"})
class extractDataForm(Form):

Wyświetl plik

@ -1,6 +1,7 @@
from loguru import logger
from lxml import etree
from typing import List
import html
import json
import re
@ -9,6 +10,11 @@ TEXT_FILTER_LIST_LINE_SUFFIX = "<br>"
TRANSLATE_WHITESPACE_TABLE = str.maketrans('', '', '\r\n\t ')
PERL_STYLE_REGEX = r'^/(.*?)/([a-z]*)?$'
TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.I | re.S)
META_CS = re.compile(r'<meta[^>]+charset=["\']?\s*([a-z0-9_\-:+.]+)', re.I)
META_CT = re.compile(r'<meta[^>]+http-equiv=["\']?content-type["\']?[^>]*content=["\'][^>]*charset=([a-z0-9_\-:+.]+)', re.I)
# 'price' , 'lowPrice', 'highPrice' are usually under here
# All of those may or may not appear on different websites - I didnt find a way todo case-insensitive searching here
LD_JSON_PRODUCT_OFFER_SELECTORS = ["json:$..offers", "json:$..Offers"]
@ -510,3 +516,43 @@ def get_triggered_text(content, trigger_text):
i += 1
return triggered_text
def extract_title(data: bytes | str, sniff_bytes: int = 2048, scan_chars: int = 8192) -> str | None:
try:
# Only decode/process the prefix we need for title extraction
match data:
case bytes() if data.startswith((b"\xff\xfe", b"\xfe\xff")):
prefix = data[:scan_chars * 2].decode("utf-16", errors="replace")
case bytes() if data.startswith((b"\xff\xfe\x00\x00", b"\x00\x00\xfe\xff")):
prefix = data[:scan_chars * 4].decode("utf-32", errors="replace")
case bytes():
try:
prefix = data[:scan_chars].decode("utf-8")
except UnicodeDecodeError:
try:
head = data[:sniff_bytes].decode("ascii", errors="ignore")
if m := (META_CS.search(head) or META_CT.search(head)):
enc = m.group(1).lower()
else:
enc = "cp1252"
prefix = data[:scan_chars * 2].decode(enc, errors="replace")
except Exception as e:
logger.error(f"Title extraction encoding detection failed: {e}")
return None
case str():
prefix = data[:scan_chars] if len(data) > scan_chars else data
case _:
logger.error(f"Title extraction received unsupported data type: {type(data)}")
return None
# Search only in the prefix
if m := TITLE_RE.search(prefix):
title = html.unescape(" ".join(m.group(1).split())).strip()
# Some safe limit
return title[:2000]
return None
except Exception as e:
logger.error(f"Title extraction failed: {e}")
return None

Wyświetl plik

@ -39,12 +39,12 @@ class model(dict):
'api_access_token_enabled': True,
'base_url' : None,
'empty_pages_are_a_change': False,
'extract_title_as_title': False,
'fetch_backend': getenv("DEFAULT_FETCH_BACKEND", "html_requests"),
'filter_failure_notification_threshold_attempts': _FILTER_FAILURE_THRESHOLD_ATTEMPTS_DEFAULT,
'global_ignore_text': [], # List of text to ignore when calculating the comparison checksum
'global_subtractive_selectors': [],
'ignore_whitespace': True,
'ignore_status_codes': False, #@todo implement, as ternary.
'notification_body': default_notification_body,
'notification_format': default_notification_format,
'notification_title': default_notification_title,
@ -57,10 +57,11 @@ class model(dict):
'rss_hide_muted_watches': True,
'schema_version' : 0,
'shared_diff_access': False,
'webdriver_delay': None , # Extra delay in seconds before extracting text
'tags': {}, #@todo use Tag.model initialisers
'timezone': None, # Default IANA timezone name
'webdriver_delay': None , # Extra delay in seconds before extracting text
'ui': {
'use_page_title_in_list': True,
'open_diff_in_new_tab': True,
'socket_io_enabled': True,
'favicons_enabled': True

Wyświetl plik

@ -169,8 +169,8 @@ class model(watch_base):
@property
def label(self):
# Used for sorting
return self.get('title') if self.get('title') else self.get('url')
# Used for sorting, display, etc
return self.get('title') or self.get('page_title') or self.link
@property
def last_changed(self):

Wyświetl plik

@ -24,7 +24,6 @@ class watch_base(dict):
'content-type': None,
'date_created': None,
'extract_text': [], # Extract text by regex after filters
'extract_title_as_title': False,
'fetch_backend': 'system', # plaintext, playwright etc
'fetch_time': 0.0,
'filter_failure_notification_send': strtobool(os.getenv('FILTER_FAILURE_NOTIFICATION_SEND_DEFAULT', 'True')),
@ -35,6 +34,7 @@ class watch_base(dict):
'has_ldjson_price_data': None,
'headers': {}, # Extra headers to send
'ignore_text': [], # List of text to ignore when calculating the comparison checksum
'ignore_status_codes': None,
'in_stock_only': True, # Only trigger change on going to instock from out-of-stock
'include_filters': [],
'last_checked': 0,
@ -49,6 +49,7 @@ class watch_base(dict):
'notification_screenshot': False, # Include the latest screenshot if available and supported by the apprise URL
'notification_title': None,
'notification_urls': [], # List of URLs to add to the notification Queue (Usually AppRise)
'page_title': None, # <title> from the page
'paused': False,
'previous_md5': False,
'previous_md5_before_filters': False, # Used for skipping changedetection entirely
@ -122,12 +123,13 @@ class watch_base(dict):
}
},
},
'title': None,
'title': None, # An arbitrary field that overrides 'page_title'
'track_ldjson_price_data': None,
'trim_text_whitespace': False,
'remove_duplicate_lines': False,
'trigger_text': [], # List of text or regex to wait for until a change is detected
'url': '',
'use_page_title_in_list': None, # None = use system settings
'uuid': str(uuid.uuid4()),
'webdriver_delay': None,
'webdriver_js_execute_code': None, # Run before change-detection

Wyświetl plik

@ -149,7 +149,7 @@ def create_notification_parameters(n_object, datastore):
uuid = n_object['uuid'] if 'uuid' in n_object else ''
if uuid:
watch_title = datastore.data['watching'][uuid].get('title', '')
watch_title = datastore.data['watching'][uuid].label
tag_list = []
tags = datastore.get_all_tags_for_watch(uuid)
if tags:

Wyświetl plik

@ -251,8 +251,7 @@ class perform_site_check(difference_detection_processor):
update_obj["last_check_status"] = self.fetcher.get_last_status_code()
# 615 Extract text by regex
extract_text = watch.get('extract_text', [])
extract_text += self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='extract_text')
extract_text = list(dict.fromkeys(watch.get('extract_text', []) + self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='extract_text')))
if len(extract_text) > 0:
regex_matched_output = []
for s_re in extract_text:
@ -311,8 +310,7 @@ class perform_site_check(difference_detection_processor):
############ Blocking rules, after checksum #################
blocked = False
trigger_text = watch.get('trigger_text', [])
trigger_text += self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='trigger_text')
trigger_text = list(dict.fromkeys(watch.get('trigger_text', []) + self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='trigger_text')))
if len(trigger_text):
# Assume blocked
blocked = True
@ -326,8 +324,7 @@ class perform_site_check(difference_detection_processor):
if result:
blocked = False
text_should_not_be_present = watch.get('text_should_not_be_present', [])
text_should_not_be_present += self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='text_should_not_be_present')
text_should_not_be_present = list(dict.fromkeys(watch.get('text_should_not_be_present', []) + self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='text_should_not_be_present')))
if len(text_should_not_be_present):
# If anything matched, then we should block a change from happening
result = html_tools.strip_ignore_text(content=str(stripped_text_from_html),

Wyświetl plik

@ -153,6 +153,7 @@ $(document).ready(function () {
// Tabs at bottom of list
$('#post-list-mark-views').toggleClass("has-unviewed", general_stats.has_unviewed);
$('#post-list-unread').toggleClass("has-unviewed", general_stats.has_unviewed);
$('#post-list-with-errors').toggleClass("has-error", general_stats.count_errors !== 0)
$('#post-list-with-errors a').text(`With errors (${ general_stats.count_errors })`);

Wyświetl plik

@ -51,6 +51,7 @@ $(document).ready(function () {
$('#notification_body').val('');
$('#notification_format').val('System default');
$('#notification_urls').val('');
$('#notification_muted_none').prop('checked', true); // in the case of a ternary field
e.preventDefault();
});
$("#notification-token-toggle").click(function (e) {

Wyświetl plik

@ -24,6 +24,9 @@ body.checking-now {
#post-list-mark-views.has-unviewed {
display: inline-block !important;
}
#post-list-unread.has-unviewed {
display: inline-block !important;
}
}

Wyświetl plik

@ -0,0 +1,115 @@
// Ternary radio button group component
.ternary-radio-group {
display: flex;
gap: 0;
border: 1px solid var(--color-grey-750);
border-radius: 4px;
overflow: hidden;
width: fit-content;
background: var(--color-background);
.ternary-radio-option {
position: relative;
cursor: pointer;
margin: 0;
display: flex;
align-items: center;
input[type="radio"] {
position: absolute;
opacity: 0;
width: 0;
height: 0;
}
.ternary-radio-label {
padding: 8px 16px;
background: var(--color-grey-900);
border: none;
border-right: 1px solid var(--color-grey-750);
font-size: 13px;
font-weight: 500;
color: var(--color-text);
transition: all 0.2s ease;
cursor: pointer;
display: block;
min-width: 60px;
text-align: center;
}
&:last-child .ternary-radio-label {
border-right: none;
}
input:checked + .ternary-radio-label {
background: var(--color-link);
color: var(--color-text-button);
font-weight: 600;
&.ternary-default {
background: var(--color-grey-600);
color: var(--color-text-button);
}
&:hover {
background: #1a7bc4;
&.ternary-default {
background: var(--color-grey-500);
}
}
}
&:hover .ternary-radio-label {
background: var(--color-grey-800);
}
}
@media (max-width: 480px) {
width: 100%;
.ternary-radio-label {
flex: 1;
min-width: auto;
}
}
}
// Standard radio button styling
input[type="radio"].pure-radio:checked + label,
input[type="radio"].pure-radio:checked {
background: var(--color-link);
color: var(--color-text-button);
}
html[data-darkmode="true"] {
.ternary-radio-group {
.ternary-radio-option {
.ternary-radio-label {
background: var(--color-grey-350);
}
&:hover .ternary-radio-label {
background: var(--color-grey-400);
}
input:checked + .ternary-radio-label {
background: var(--color-link);
color: var(--color-text-button);
&.ternary-default {
background: var(--color-grey-600);
}
&:hover {
background: #1a7bc4;
&.ternary-default {
background: var(--color-grey-500);
}
}
}
}
}
}

Wyświetl plik

@ -20,7 +20,7 @@
@use "parts/lister_extra";
@use "parts/socket";
@use "parts/visualselector";
@use "parts/widgets";
body {
color: var(--color-text);

File diff suppressed because one or more lines are too long

Wyświetl plik

@ -284,11 +284,6 @@ class ChangeDetectionStore:
extras = deepcopy(self.data['watching'][uuid])
new_uuid = self.add_watch(url=url, extras=extras)
watch = self.data['watching'][new_uuid]
if self.data['settings']['application'].get('extract_title_as_title') or watch['extract_title_as_title']:
# Because it will be recalculated on the next fetch
self.data['watching'][new_uuid]['title'] = None
return new_uuid
def url_exists(self, url):
@ -330,7 +325,6 @@ class ChangeDetectionStore:
'browser_steps',
'css_filter',
'extract_text',
'extract_title_as_title',
'headers',
'ignore_text',
'include_filters',
@ -345,6 +339,7 @@ class ChangeDetectionStore:
'title',
'trigger_text',
'url',
'use_page_title_in_list',
'webdriver_js_execute_code',
]:
if res.get(k):
@ -995,6 +990,16 @@ class ChangeDetectionStore:
f_d.write(zlib.compress(f_j.read()))
os.unlink(json_path)
def update_20(self):
for uuid, watch in self.data['watching'].items():
if self.data['watching'][uuid].get('extract_title_as_title'):
self.data['watching'][uuid]['use_page_title_in_list'] = self.data['watching'][uuid].get('extract_title_as_title')
del self.data['watching'][uuid]['extract_title_as_title']
if self.data['settings']['application'].get('extract_title_as_title'):
self.data['settings']['application']['ui']['use_page_title_in_list'] = self.data['settings']['application'].get('extract_title_as_title')
def add_notification_url(self, notification_url):
logger.debug(f">>> Adding new notification_url - '{notification_url}'")

Wyświetl plik

@ -98,7 +98,7 @@
</tr>
<tr>
<td><code>{{ '{{watch_title}}' }}</code></td>
<td>The title of the watch.</td>
<td>The page title of the watch, uses &lt;title&gt; if not set, falls back to URL</td>
</tr>
<tr>
<td><code>{{ '{{watch_tag}}' }}</code></td>

Wyświetl plik

@ -1,14 +1,29 @@
{% macro render_field(field) %}
<div {% if field.errors %} class="error" {% endif %}>{{ field.label }}</div>
<div {% if field.errors %} class="error" {% endif %}>{{ field(**kwargs)|safe }}
{% if field.errors %}
<ul class=errors>
{% for error in field.errors %}
<li>{{ error }}</li>
{% endfor %}
</ul>
{% endif %}
</div>
<div {% if field.errors or field.top_errors %} class="error" {% endif %}>{{ field.label }}</div>
<div {% if field.errors or field.top_errors %} class="error" {% endif %}>{{ field(**kwargs)|safe }}
{% if field.top_errors %}
top
<ul class="errors top-errors">
{% for error in field.top_errors %}
<li>{{ error }}</li>
{% endfor %}
</ul>
{% endif %}
{% if field.errors %}
<ul class=errors>
{% if field.errors is mapping and 'form' in field.errors %}
{# and subfield form errors, such as used in RequiredFormField() for TimeBetweenCheckForm sub form #}
{% set errors = field.errors['form'] %}
{% else %}
{# regular list of errors with this field #}
{% set errors = field.errors %}
{% endif %}
{% for error in errors %}
<li>{{ error }}</li>
{% endfor %}
</ul>
{% endif %}
</div>
{% endmacro %}
{% macro render_checkbox_field(field) %}
@ -24,6 +39,23 @@
</div>
{% endmacro %}
{% macro render_ternary_field(field, BooleanField=false) %}
{% if BooleanField %}
{% set _ = field.__setattr__('boolean_mode', true) %}
{% endif %}
<div class="ternary-field {% if field.errors %} error {% endif %}">
<div class="ternary-field-label">{{ field.label }}</div>
<div class="ternary-field-widget">{{ field(**kwargs)|safe }}</div>
{% if field.errors %}
<ul class=errors>
{% for error in field.errors %}
<li>{{ error }}</li>
{% endfor %}
</ul>
{% endif %}
</div>
{% endmacro %}
{% macro render_simple_field(field) %}
<span class="label {% if field.errors %}error{% endif %}">{{ field.label }}</span>

Wyświetl plik

@ -5,6 +5,7 @@
<meta charset="utf-8" >
<meta name="viewport" content="width=device-width, initial-scale=1.0" >
<meta name="description" content="Self hosted website change detection." >
<meta name="robots" content="noindex">
<title>Change Detection{{extra_title}}</title>
{% if app_rss_token %}
<link rel="alternate" type="application/rss+xml" title="Changedetection.io » Feed{% if active_tag_uuid %}- {{active_tag.title}}{% endif %}" href="{{ url_for('rss.feed', tag=active_tag_uuid , token=app_rss_token)}}" >
@ -41,7 +42,7 @@
{% endif %}
</head>
<body class="">
<body class="{{extra_classes}}">
<div class="header">
<div class="pure-menu-fixed" style="width: 100%;">
<div class="home-menu pure-menu pure-menu-horizontal" id="nav-menu">

Wyświetl plik

@ -55,7 +55,8 @@ def do_test(client, live_server, make_test_use_extra_browser=False):
"tags": "",
"headers": "",
'fetch_backend': f"extra_browser_{custom_browser_name}",
'webdriver_js_execute_code': ''
'webdriver_js_execute_code': '',
"time_between_check_use_default": "y"
},
follow_redirects=True
)

Wyświetl plik

@ -28,6 +28,7 @@ def test_execute_custom_js(client, live_server, measure_memory_usage):
'fetch_backend': "html_webdriver",
'webdriver_js_execute_code': 'document.querySelector("button[name=test-button]").click();',
'headers': "testheader: yes\buser-agent: MyCustomAgent",
"time_between_check_use_default": "y",
},
follow_redirects=True
)

Wyświetl plik

@ -27,6 +27,7 @@ def test_preferred_proxy(client, live_server, measure_memory_usage):
"proxy": "proxy-two",
"tags": "",
"url": url,
"time_between_check_use_default": "y",
},
follow_redirects=True
)

Wyświetl plik

@ -62,6 +62,7 @@ def test_noproxy_option(client, live_server, measure_memory_usage):
"proxy": "no-proxy",
"tags": "",
"url": url,
"time_between_check_use_default": "y",
},
follow_redirects=True
)

Wyświetl plik

@ -44,6 +44,7 @@ def test_proxy_noconnect_custom(client, live_server, measure_memory_usage):
"url": test_url,
"fetch_backend": "html_webdriver" if os.getenv('PLAYWRIGHT_DRIVER_URL') or os.getenv("WEBDRIVER_URL") else "html_requests",
"proxy": "ui-0custom-test-proxy",
"time_between_check_use_default": "y",
}
res = client.post(

Wyświetl plik

@ -66,6 +66,7 @@ def test_socks5(client, live_server, measure_memory_usage):
"proxy": "ui-0socks5proxy",
"tags": "",
"url": test_url,
"time_between_check_use_default": "y",
},
follow_redirects=True
)

Wyświetl plik

@ -53,6 +53,7 @@ def test_socks5_from_proxiesjson_file(client, live_server, measure_memory_usage)
"proxy": "socks5proxy",
"tags": "",
"url": test_url,
"time_between_check_use_default": "y",
},
follow_redirects=True
)

Wyświetl plik

@ -157,7 +157,8 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
data={
"url": test_url,
"notification_format": 'HTML',
'fetch_backend': "html_requests"},
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data

Wyświetl plik

@ -61,7 +61,8 @@ def test_check_removed_line_contains_trigger(client, live_server, measure_memory
data={"trigger_text": 'The golden line',
"url": test_url,
'fetch_backend': "html_requests",
'filter_text_removed': 'y'},
'filter_text_removed': 'y',
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -154,7 +155,8 @@ def test_check_add_line_contains_trigger(client, live_server, measure_memory_usa
'processor': 'text_json_diff',
'fetch_backend': "html_requests",
'filter_text_removed': '',
'filter_text_added': 'y'},
'filter_text_added': 'y',
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data

Wyświetl plik

@ -311,7 +311,7 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
"value": "." # contains anything
}
],
"conditions_match_logic": "ALL"
"conditions_match_logic": "ALL",
}
),
headers={'content-type': 'application/json', 'x-api-key': api_key},
@ -328,6 +328,7 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
)
watch_uuid = list(res.json.keys())[0]
assert not res.json[watch_uuid].get('viewed'), 'A newly created watch can only be unviewed'
# Check in the edit page just to be sure
res = client.get(
@ -341,7 +342,12 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"title": "new title", 'time_between_check': {'minutes': 552}, 'headers': {'cookie': 'all eaten'}}),
data=json.dumps({
"title": "new title",
'time_between_check': {'minutes': 552},
'headers': {'cookie': 'all eaten'},
'last_viewed': int(time.time())
}),
)
assert res.status_code == 200, "HTTP PUT update was sent OK"
@ -351,6 +357,7 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
headers={'x-api-key': api_key}
)
assert res.json.get('title') == 'new title'
assert res.json.get('viewed'), 'With the timestamp greater than "changed" a watch can be updated to viewed'
# Check in the edit page just to be sure
res = client.get(
@ -383,13 +390,13 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
def test_api_import(client, live_server, measure_memory_usage):
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
res = client.post(
url_for("import") + "?tag=import-test",
data='https://website1.com\r\nhttps://website2.com',
headers={'x-api-key': api_key},
headers={'x-api-key': api_key, 'content-type': 'text/plain'},
follow_redirects=True
)

Wyświetl plik

@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""
OpenAPI validation tests for ChangeDetection.io API
This test file specifically verifies that OpenAPI validation is working correctly
by testing various scenarios that should trigger validation errors.
"""
import time
import json
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
def test_openapi_validation_invalid_content_type_on_create_watch(client, live_server, measure_memory_usage):
"""Test that creating a watch with invalid content-type triggers OpenAPI validation error."""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# Try to create a watch with JSON data but without proper content-type header
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": "https://example.com", "title": "Test Watch"}),
headers={'x-api-key': api_key}, # Missing 'content-type': 'application/json'
follow_redirects=True
)
# Should get 400 error due to OpenAPI validation failure
assert res.status_code == 400, f"Expected 400 but got {res.status_code}"
assert b"OpenAPI validation failed" in res.data, "Should contain OpenAPI validation error message"
def test_openapi_validation_missing_required_field_create_watch(client, live_server, measure_memory_usage):
"""Test that creating a watch without required URL field triggers OpenAPI validation error."""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# Try to create a watch without the required 'url' field
res = client.post(
url_for("createwatch"),
data=json.dumps({"title": "Test Watch Without URL"}), # Missing required 'url' field
headers={'x-api-key': api_key, 'content-type': 'application/json'},
follow_redirects=True
)
# Should get 400 error due to missing required field
assert res.status_code == 400, f"Expected 400 but got {res.status_code}"
assert b"OpenAPI validation failed" in res.data, "Should contain OpenAPI validation error message"
def test_openapi_validation_invalid_field_in_request_body(client, live_server, measure_memory_usage):
"""Test that including invalid fields triggers OpenAPI validation error."""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# First create a valid watch
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": "https://example.com", "title": "Test Watch"}),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
follow_redirects=True
)
assert res.status_code == 201, "Watch creation should succeed"
# Get the watch list to find the UUID
res = client.get(
url_for("createwatch"),
headers={'x-api-key': api_key}
)
assert res.status_code == 200
watch_uuid = list(res.json.keys())[0]
# Now try to update the watch with an invalid field
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({
"title": "Updated title",
"invalid_field_that_doesnt_exist": "this should cause validation error"
}),
)
# Should get 400 error due to invalid field (this will be caught by internal validation)
# Note: This tests the flow where OpenAPI validation passes but internal validation catches it
assert res.status_code == 400, f"Expected 400 but got {res.status_code}"
assert b"Additional properties are not allowed" in res.data, "Should contain validation error about additional properties"
def test_openapi_validation_import_wrong_content_type(client, live_server, measure_memory_usage):
"""Test that import endpoint with wrong content-type triggers OpenAPI validation error."""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# Try to import URLs with JSON content-type instead of text/plain
res = client.post(
url_for("import") + "?tag=test-import",
data='https://website1.com\nhttps://website2.com',
headers={'x-api-key': api_key, 'content-type': 'application/json'}, # Wrong content-type
follow_redirects=True
)
# Should get 400 error due to content-type mismatch
assert res.status_code == 400, f"Expected 400 but got {res.status_code}"
assert b"OpenAPI validation failed" in res.data, "Should contain OpenAPI validation error message"
def test_openapi_validation_import_correct_content_type_succeeds(client, live_server, measure_memory_usage):
"""Test that import endpoint with correct content-type succeeds (positive test)."""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# Import URLs with correct text/plain content-type
res = client.post(
url_for("import") + "?tag=test-import",
data='https://website1.com\nhttps://website2.com',
headers={'x-api-key': api_key, 'content-type': 'text/plain'}, # Correct content-type
follow_redirects=True
)
# Should succeed
assert res.status_code == 200, f"Expected 200 but got {res.status_code}"
assert len(res.json) == 2, "Should import 2 URLs"
def test_openapi_validation_get_requests_bypass_validation(client, live_server, measure_memory_usage):
"""Test that GET requests bypass OpenAPI validation entirely."""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# Disable API token requirement first
res = client.post(
url_for("settings.settings_page"),
data={
"requests-time_between_check-minutes": 180,
"application-fetch_backend": "html_requests",
"application-api_access_token_enabled": ""
},
follow_redirects=True
)
assert b"Settings updated." in res.data
# Make GET request to list watches - should succeed even without API key or content-type
res = client.get(url_for("createwatch")) # No headers needed for GET
assert res.status_code == 200, f"GET requests should succeed without OpenAPI validation, got {res.status_code}"
# Should return JSON with watch list (empty in this case)
assert isinstance(res.json, dict), "Should return JSON dictionary for watch list"
def test_openapi_validation_create_tag_missing_required_title(client, live_server, measure_memory_usage):
"""Test that creating a tag without required title triggers OpenAPI validation error."""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# Try to create a tag without the required 'title' field
res = client.post(
url_for("tag"),
data=json.dumps({"notification_urls": ["mailto:test@example.com"]}), # Missing required 'title' field
headers={'x-api-key': api_key, 'content-type': 'application/json'},
follow_redirects=True
)
# Should get 400 error due to missing required field
assert res.status_code == 400, f"Expected 400 but got {res.status_code}"
assert b"OpenAPI validation failed" in res.data, "Should contain OpenAPI validation error message"
def test_openapi_validation_watch_update_allows_partial_updates(client, live_server, measure_memory_usage):
"""Test that watch updates allow partial updates without requiring all fields (positive test)."""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# First create a valid watch
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": "https://example.com", "title": "Test Watch"}),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
follow_redirects=True
)
assert res.status_code == 201, "Watch creation should succeed"
# Get the watch list to find the UUID
res = client.get(
url_for("createwatch"),
headers={'x-api-key': api_key}
)
assert res.status_code == 200
watch_uuid = list(res.json.keys())[0]
# Update only the title (partial update) - should succeed
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"title": "Updated Title Only"}), # Only updating title, not URL
)
# Should succeed because UpdateWatch schema allows partial updates
assert res.status_code == 200, f"Partial updates should succeed, got {res.status_code}"
# Verify the update worked
res = client.get(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key}
)
assert res.status_code == 200
assert res.json.get('title') == 'Updated Title Only', "Title should be updated"
assert res.json.get('url') == 'https://example.com', "URL should remain unchanged"

Wyświetl plik

@ -23,7 +23,7 @@ def test_basic_auth(client, live_server, measure_memory_usage):
# Check form validation
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": "", "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
data={"include_filters": "", "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data

Wyświetl plik

@ -89,7 +89,7 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
assert b'CDATA' in res.data
assert expected_url.encode('utf-8') in res.data
#
# Following the 'diff' link, it should no longer display as 'unviewed' even after we recheck it a few times
res = client.get(url_for("ui.ui_views.diff_history_page", uuid=uuid))
assert b'selected=""' in res.data, "Confirm diff history page loaded"
@ -104,26 +104,34 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
wait_for_all_checks(client)
# Do this a few times.. ensures we dont accidently set the status
for n in range(2):
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Do this a few times.. ensures we don't accidently set the status
for n in range(2):
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("watchlist.index"))
assert b'unviewed' not in res.data
assert b'class="has-unviewed' not in res.data
assert b'head title' not in res.data # Should not be present because this is off by default
assert b'head title' in res.data # Should be ON by default
assert b'test-endpoint' in res.data
set_original_response()
# Recheck it but only with a title change, content wasnt changed
set_original_response(extra_title=" and more")
# Enable auto pickup of <title> in settings
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'head title and more' in res.data
# disable <title> pickup
res = client.post(
url_for("settings.settings_page"),
data={"application-extract_title_as_title": "1", "requests-time_between_check-minutes": 180,
data={"application-ui-use_page_title_in_list": "", "requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
@ -134,16 +142,14 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
res = client.get(url_for("watchlist.index"))
assert b'unviewed' in res.data
assert b'class="has-unviewed' in res.data
assert b'head title' not in res.data # should now be off
# It should have picked up the <title>
assert b'head title' in res.data
# Be sure the last_viewed is going to be greater than the last snapshot
time.sleep(1)
# hit the mark all viewed link
res = client.get(url_for("ui.mark_all_viewed"), follow_redirects=True)
time.sleep(0.2)
assert b'class="has-unviewed' not in res.data
assert b'unviewed' not in res.data

Wyświetl plik

@ -86,7 +86,8 @@ def test_check_block_changedetection_text_NOT_present(client, live_server, measu
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"text_should_not_be_present": ignore_text,
"url": test_url,
'fetch_backend': "html_requests"
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)

Wyświetl plik

@ -105,6 +105,7 @@ def test_conditions_with_text_and_number(client, live_server):
"conditions-5-operator": "contains_regex",
"conditions-5-field": "page_filtered_text",
"conditions-5-value": "\d",
"time_between_check_use_default": "y",
},
follow_redirects=True
)
@ -288,7 +289,8 @@ def test_lev_conditions_plugin(client, live_server, measure_memory_usage):
"conditions_match_logic": CONDITIONS_MATCH_LOGIC_DEFAULT, # ALL = AND logic
"conditions-0-field": "levenshtein_ratio",
"conditions-0-operator": "<",
"conditions-0-value": "0.8" # needs to be more of a diff to trigger a change
"conditions-0-value": "0.8", # needs to be more of a diff to trigger a change
"time_between_check_use_default": "y"
},
follow_redirects=True
)

Wyświetl plik

@ -95,7 +95,7 @@ def test_check_markup_include_filters_restriction(client, live_server, measure_m
# Add our URL to the import page
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": include_filters, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
data={"include_filters": include_filters, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -154,7 +154,8 @@ def test_check_multiple_filters(client, live_server, measure_memory_usage):
"url": test_url,
"tags": "",
"headers": "",
'fetch_backend': "html_requests"},
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -208,7 +209,8 @@ def test_filter_is_empty_help_suggestion(client, live_server, measure_memory_usa
"url": test_url,
"tags": "",
"headers": "",
'fetch_backend': "html_requests"},
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data

Wyświetl plik

@ -171,6 +171,7 @@ def test_element_removal_full(client, live_server, measure_memory_usage):
"tags": "",
"headers": "",
"fetch_backend": "html_requests",
"time_between_check_use_default": "y",
},
follow_redirects=True,
)
@ -245,6 +246,7 @@ body > table > tr:nth-child(3) > td:nth-child(3)""",
"url": test_url,
"tags": "",
"fetch_backend": "html_requests",
"time_between_check_use_default": "y",
},
follow_redirects=True,
)

Wyświetl plik

@ -127,7 +127,8 @@ def test_low_level_errors_clear_correctly(client, live_server, measure_memory_us
url_for("ui.ui_edit.edit_page", uuid="first"),
data={
"url": test_url,
"fetch_backend": "html_requests"},
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)

Wyświetl plik

@ -95,7 +95,8 @@ def test_check_filter_multiline(client, live_server, measure_memory_usage):
"url": test_url,
"tags": "",
"headers": "",
'fetch_backend': "html_requests"
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)
@ -149,7 +150,8 @@ def test_check_filter_and_regex_extract(client, live_server, measure_memory_usag
"url": test_url,
"tags": "",
"headers": "",
'fetch_backend': "html_requests"
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)
@ -222,7 +224,8 @@ def test_regex_error_handling(client, live_server, measure_memory_usage):
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"extract_text": '/something bad\d{3/XYZ',
"url": test_url,
"fetch_backend": "html_requests"},
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)

Wyświetl plik

@ -94,7 +94,8 @@ def test_filter_doesnt_exist_then_exists_should_get_notification(client, live_se
"title": "my title",
"headers": "",
"include_filters": '.ticket-available',
"fetch_backend": "html_requests"})
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"})
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),

Wyświetl plik

@ -72,6 +72,7 @@ def run_filter_test(client, live_server, content_filter):
"notification_format": "Text",
"fetch_backend": "html_requests",
"filter_failure_notification_send": 'y',
"time_between_check_use_default": "y",
"headers": "",
"tags": "my tag",
"title": "my title 123",

Wyświetl plik

@ -424,7 +424,8 @@ def test_order_of_filters_tag_filter_and_watch_filter(client, live_server, measu
"url": test_url,
"tags": "test-tag-keep-order",
"headers": "",
'fetch_backend': "html_requests"},
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data

Wyświetl plik

@ -111,7 +111,7 @@ def test_check_ignore_text_functionality(client, live_server, measure_memory_usa
# Add our URL to the import page
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"ignore_text": ignore_text, "url": test_url, 'fetch_backend': "html_requests"},
data={"ignore_text": ignore_text, "url": test_url, 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -205,7 +205,7 @@ def _run_test_global_ignore(client, as_source=False, extra_ignore=""):
#Adding some ignore text should not trigger a change
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"ignore_text": "something irrelevent but just to check", "url": test_url, 'fetch_backend': "html_requests"},
data={"ignore_text": "something irrelevent but just to check", "url": test_url, 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data

Wyświetl plik

@ -108,7 +108,7 @@ def test_403_page_check_works_with_ignore_status_code(client, live_server, measu
# Add our URL to the import page
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"ignore_status_codes": "y", "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
data={"ignore_status_codes": "y", "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data

Wyświetl plik

@ -257,7 +257,8 @@ def check_json_filter(json_filter, client, live_server):
"url": test_url,
"tags": "",
"headers": "",
"fetch_backend": "html_requests"
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)
@ -328,7 +329,8 @@ def check_json_filter_bool_val(json_filter, client, live_server):
"url": test_url,
"tags": "",
"headers": "",
"fetch_backend": "html_requests"
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)
@ -393,7 +395,8 @@ def check_json_ext_filter(json_filter, client, live_server):
"url": test_url,
"tags": "",
"headers": "",
"fetch_backend": "html_requests"
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)

Wyświetl plik

@ -38,6 +38,7 @@ def test_content_filter_live_preview(client, live_server, measure_memory_usage):
"ignore_text": "something to ignore",
"trigger_text": "something to trigger",
"url": test_url,
"time_between_check_use_default": "y",
},
follow_redirects=True
)

Wyświetl plik

@ -108,7 +108,8 @@ def test_check_notification(client, live_server, measure_memory_usage):
"tags": "my tag, my second tag",
"title": "my title",
"headers": "",
"fetch_backend": "html_requests"})
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"})
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
@ -225,7 +226,8 @@ def test_check_notification(client, live_server, measure_memory_usage):
"notification_title": '',
"notification_body": '',
"notification_format": default_notification_format,
"fetch_backend": "html_requests"},
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data

Wyświetl plik

@ -36,7 +36,8 @@ def test_check_notification_error_handling(client, live_server, measure_memory_u
"title": "",
"headers": "",
"time_between_check-minutes": "180",
"fetch_backend": "html_requests"},
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data

Wyświetl plik

@ -44,7 +44,8 @@ def test_headers_in_request(client, live_server, measure_memory_usage):
"url": test_url,
"tags": "",
"fetch_backend": 'html_webdriver' if os.getenv('PLAYWRIGHT_DRIVER_URL') else 'html_requests',
"headers": "jinja2:{{ 1+1 }}\nxxx:ooo\ncool:yeah\r\ncookie:"+cookie_header},
"headers": "jinja2:{{ 1+1 }}\nxxx:ooo\ncool:yeah\r\ncookie:"+cookie_header,
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -109,7 +110,8 @@ def test_body_in_request(client, live_server, measure_memory_usage):
"tags": "",
"method": "POST",
"fetch_backend": "html_requests",
"body": "something something"},
"body": "something something",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -126,7 +128,8 @@ def test_body_in_request(client, live_server, measure_memory_usage):
"tags": "",
"method": "POST",
"fetch_backend": "html_requests",
"body": body_value},
"body": body_value,
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -172,7 +175,8 @@ def test_body_in_request(client, live_server, measure_memory_usage):
"tags": "",
"method": "GET",
"fetch_backend": "html_requests",
"body": "invalid"},
"body": "invalid",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Body must be empty when Request Method is set to GET" in res.data
@ -211,7 +215,8 @@ def test_method_in_request(client, live_server, measure_memory_usage):
"url": test_url,
"tags": "",
"fetch_backend": "html_requests",
"method": "invalid"},
"method": "invalid",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Not a valid choice" in res.data
@ -223,7 +228,8 @@ def test_method_in_request(client, live_server, measure_memory_usage):
"url": test_url,
"tags": "",
"fetch_backend": "html_requests",
"method": "PATCH"},
"method": "PATCH",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -297,7 +303,8 @@ def test_ua_global_override(client, live_server, measure_memory_usage):
"tags": "testtag",
"fetch_backend": 'html_requests',
# Important - also test case-insensitive
"headers": "User-AGent: agent-from-watch"},
"headers": "User-AGent: agent-from-watch",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -365,7 +372,8 @@ def test_headers_textfile_in_request(client, live_server, measure_memory_usage):
"url": test_url,
"tags": "testtag",
"fetch_backend": 'html_webdriver' if os.getenv('PLAYWRIGHT_DRIVER_URL') else 'html_requests',
"headers": "xxx:ooo\ncool:yeah\r\n"},
"headers": "xxx:ooo\ncool:yeah\r\n",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -440,7 +448,8 @@ def test_headers_validation(client, live_server):
data={
"url": test_url,
"fetch_backend": 'html_requests',
"headers": "User-AGent agent-from-watch\r\nsadfsadfsadfsdaf\r\n:foobar"},
"headers": "User-AGent agent-from-watch\r\nsadfsadfsadfsdaf\r\n:foobar",
"time_between_check_use_default": "y"},
follow_redirects=True
)

Wyświetl plik

@ -121,7 +121,7 @@ def test_itemprop_price_change(client, live_server):
set_original_response(props_markup=instock_props[0], price='120.45')
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"restock_settings-follow_price_changes": "", "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
data={"restock_settings-follow_price_changes": "", "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -155,7 +155,8 @@ def _run_test_minmax_limit(client, extra_watch_edit_form):
"url": test_url,
"headers": "",
"time_between_check-hours": 5,
'fetch_backend': "html_requests"
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"
}
data.update(extra_watch_edit_form)
res = client.post(
@ -278,7 +279,8 @@ def test_itemprop_percent_threshold(client, live_server):
"url": test_url,
"tags": "",
"headers": "",
'fetch_backend': "html_requests"
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)

Wyświetl plik

@ -158,6 +158,7 @@ def test_rss_xpath_filtering(client, live_server, measure_memory_usage):
"proxy": "no-proxy",
"tags": "",
"url": test_url,
"time_between_check_use_default": "y",
},
follow_redirects=True
)

Wyświetl plik

@ -1,10 +1,13 @@
#!/usr/bin/env python3
import time
from copy import copy
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, extract_UUID_from_client
from ..forms import REQUIRE_ATLEAST_ONE_TIME_PART_MESSAGE_DEFAULT, REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT
# def test_setup(client, live_server):
# live_server_setup(live_server) # Setup on conftest per function
@ -42,11 +45,12 @@ def test_check_basic_scheduler_functionality(client, live_server, measure_memory
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
# Setup all the days of the weeks using XXX as the placeholder for monday/tuesday/etc
last_check = copy(live_server.app.config['DATASTORE'].data['watching'][uuid]['last_checked'])
tpl = {
"time_schedule_limit-XXX-start_time": "00:00",
"time_schedule_limit-XXX-duration-hours": 24,
"time_schedule_limit-XXX-duration-minutes": 0,
"time_between_check-seconds": 1,
"time_schedule_limit-XXX-enabled": '', # All days are turned off
"time_schedule_limit-enabled": 'y', # Scheduler is enabled, all days however are off.
}
@ -58,13 +62,13 @@ def test_check_basic_scheduler_functionality(client, live_server, measure_memory
new_key = key.replace("XXX", day)
scheduler_data[new_key] = value
last_check = live_server.app.config['DATASTORE'].data['watching'][uuid]['last_checked']
data = {
"url": test_url,
"fetch_backend": "html_requests"
"fetch_backend": "html_requests",
"time_between_check_use_default": "" # no
}
data.update(scheduler_data)
time.sleep(1)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data=data,
@ -77,6 +81,7 @@ def test_check_basic_scheduler_functionality(client, live_server, measure_memory
# "Edit" should not trigger a check because it's not enabled in the schedule.
time.sleep(2)
# "time_schedule_limit-XXX-enabled": '', # All days are turned off, therefor, nothing should happen here..
assert live_server.app.config['DATASTORE'].data['watching'][uuid]['last_checked'] == last_check
# Enabling today in Kiritimati should work flawless
@ -177,3 +182,44 @@ def test_check_basic_global_scheduler_functionality(client, live_server, measure
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
def test_validation_time_interval_field(client, live_server, measure_memory_usage):
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"trigger_text": 'The golden line',
"url": test_url,
'fetch_backend': "html_requests",
'filter_text_removed': 'y',
"time_between_check_use_default": ""
},
follow_redirects=True
)
assert REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT.encode('utf-8') in res.data
# Now set atleast something
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"trigger_text": 'The golden line',
"url": test_url,
'fetch_backend': "html_requests",
"time_between_check-minutes": 1,
"time_between_check_use_default": ""
},
follow_redirects=True
)
assert REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT.encode('utf-8') not in res.data

Wyświetl plik

@ -27,7 +27,7 @@ def test_basic_search(client, live_server, measure_memory_usage):
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"title": "xxx-title", "url": urls[0], "tags": "", "headers": "", 'fetch_backend': "html_requests"},
data={"title": "xxx-title", "url": urls[0], "tags": "", "headers": "", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -62,7 +62,7 @@ def test_search_in_tag_limit(client, live_server, measure_memory_usage):
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"title": "xxx-title", "url": urls[0].split(' ')[0], "tags": urls[0].split(' ')[1], "headers": "",
'fetch_backend': "html_requests"},
'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data

Wyświetl plik

@ -41,7 +41,8 @@ def test_bad_access(client, live_server, measure_memory_usage):
"tags": "",
"method": "GET",
"fetch_backend": "html_requests",
"body": ""},
"body": "",
"time_between_check_use_default": "y"},
follow_redirects=True
)
@ -150,7 +151,8 @@ def test_xss_watch_last_error(client, live_server, measure_memory_usage):
data={
"include_filters": '<a href="https://foobar"></a><script>alert(123);</script>',
"url": url_for('test_endpoint', _external=True),
'fetch_backend': "html_requests"
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)

Wyświetl plik

@ -29,7 +29,7 @@ def test_share_watch(client, live_server, measure_memory_usage):
# Add our URL to the import page
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": include_filters, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
data={"include_filters": include_filters, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data

Wyświetl plik

@ -77,7 +77,7 @@ def test_check_ignore_elements(client, live_server, measure_memory_usage):
client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": 'span,p', "url": test_url, "tags": "", "subtractive_selectors": ".foobar-detection", 'fetch_backend': "html_requests"},
data={"include_filters": 'span,p', "url": test_url, "tags": "", "subtractive_selectors": ".foobar-detection", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)

Wyświetl plik

@ -81,7 +81,8 @@ def test_trigger_functionality(client, live_server, measure_memory_usage):
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"trigger_text": trigger_text,
"url": test_url,
"fetch_backend": "html_requests"},
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data

Wyświetl plik

@ -49,7 +49,8 @@ def test_trigger_regex_functionality(client, live_server, measure_memory_usage):
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"trigger_text": '/something \d{3}/',
"url": test_url,
"fetch_backend": "html_requests"},
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
wait_for_all_checks(client)

Wyświetl plik

@ -50,7 +50,8 @@ def test_trigger_regex_functionality_with_filter(client, live_server, measure_me
data={"trigger_text": "/cool.stuff/",
"url": test_url,
"include_filters": '#in-here',
"fetch_backend": "html_requests"},
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)

Wyświetl plik

@ -2,12 +2,107 @@
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
from ..forms import REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT, REQUIRE_ATLEAST_ONE_TIME_PART_MESSAGE_DEFAULT
def test_recheck_time_field_validation_global_settings(client, live_server):
"""
Tests that the global settings time field has atleast one value for week/day/hours/minute/seconds etc entered
class globalSettingsRequestForm(Form):
time_between_check = RequiredFormField(TimeBetweenCheckForm)
"""
res = client.post(
url_for("settings.settings_page"),
data={
"requests-time_between_check-weeks": '',
"requests-time_between_check-days": '',
"requests-time_between_check-hours": '',
"requests-time_between_check-minutes": '',
"requests-time_between_check-seconds": '',
},
follow_redirects=True
)
assert REQUIRE_ATLEAST_ONE_TIME_PART_MESSAGE_DEFAULT.encode('utf-8') in res.data
def test_recheck_time_field_validation_single_watch(client, live_server):
"""
Tests that the global settings time field has atleast one value for week/day/hours/minute/seconds etc entered
class globalSettingsRequestForm(Form):
time_between_check = RequiredFormField(TimeBetweenCheckForm)
"""
test_url = url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={
"url": test_url,
'fetch_backend': "html_requests",
"time_between_check_use_default": "", # OFF
"time_between_check-weeks": '',
"time_between_check-days": '',
"time_between_check-hours": '',
"time_between_check-minutes": '',
"time_between_check-seconds": '',
},
follow_redirects=True
)
assert REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT.encode('utf-8') in res.data
# Now set some time
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={
"url": test_url,
'fetch_backend': "html_requests",
"time_between_check_use_default": "", # OFF
"time_between_check-weeks": '',
"time_between_check-days": '',
"time_between_check-hours": '',
"time_between_check-minutes": '5',
"time_between_check-seconds": '',
},
follow_redirects=True
)
assert b"Updated watch." in res.data
assert REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT.encode('utf-8') not in res.data
# Now set to use defaults
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={
"url": test_url,
'fetch_backend': "html_requests",
"time_between_check_use_default": "y", # ON YES
"time_between_check-weeks": '',
"time_between_check-days": '',
"time_between_check-hours": '',
"time_between_check-minutes": '',
"time_between_check-seconds": '',
},
follow_redirects=True
)
assert b"Updated watch." in res.data
assert REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT.encode('utf-8') not in res.data
def test_checkbox_open_diff_in_new_tab(client, live_server):
set_original_response()
# live_server_setup(live_server) # Setup on conftest per function
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
@ -78,3 +173,78 @@ def test_checkbox_open_diff_in_new_tab(client, live_server):
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
def test_page_title_listing_behaviour(client, live_server):
set_original_response(extra_title="custom html")
# either the manually entered title/description or the page link should be visible
res = client.post(
url_for("settings.settings_page"),
data={"application-ui-use_page_title_in_list": "",
"requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Settings updated." in res.data
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": url_for('test_endpoint', _external=True)},
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
# We see the URL only, no title/description was manually entered
res = client.get(url_for("watchlist.index"))
assert url_for('test_endpoint', _external=True).encode('utf-8') in res.data
# Now 'my title' should override
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={
"url": url_for('test_endpoint', _external=True),
"title": "my title",
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
res = client.get(url_for("watchlist.index"))
assert b"my title" in res.data
# Now we enable page <title> and unset the override title/description
res = client.post(
url_for("settings.settings_page"),
data={"application-ui-use_page_title_in_list": "y",
"requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Settings updated." in res.data
# Page title description override should take precedence
res = client.get(url_for("watchlist.index"))
assert b"my title" in res.data
# Remove page title description override and it should fall back to title
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={
"url": url_for('test_endpoint', _external=True),
"title": "",
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
# No page title description, and 'use_page_title_in_list' is on, it should show the <title>
res = client.get(url_for("watchlist.index"))
assert b"head titlecustom html" in res.data

Wyświetl plik

@ -92,7 +92,8 @@ def test_unique_lines_functionality(client, live_server, measure_memory_usage):
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"check_unique_lines": "y",
"url": test_url,
"fetch_backend": "html_requests"},
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -140,7 +141,8 @@ def test_sort_lines_functionality(client, live_server, measure_memory_usage):
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"sort_text_alphabetically": "n",
"url": test_url,
"fetch_backend": "html_requests"},
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -192,7 +194,8 @@ def test_extra_filters(client, live_server, measure_memory_usage):
"trim_text_whitespace": "y",
"sort_text_alphabetically": "", # leave this OFF for testing
"url": test_url,
"fetch_backend": "html_requests"},
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data

Wyświetl plik

@ -28,7 +28,8 @@ def test_check_watch_field_storage(client, live_server, measure_memory_usage):
"url": test_url,
"tags": "woohoo",
"headers": "curl:foo",
'fetch_backend': "html_requests"
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)

Wyświetl plik

@ -92,7 +92,7 @@ def test_check_xpath_filter_utf8(client, live_server, measure_memory_usage):
wait_for_all_checks(client)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": filter, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
data={"include_filters": filter, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -146,7 +146,7 @@ def test_check_xpath_text_function_utf8(client, live_server, measure_memory_usag
wait_for_all_checks(client)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": filter, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
data={"include_filters": filter, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -188,7 +188,7 @@ def test_check_markup_xpath_filter_restriction(client, live_server, measure_memo
# Add our URL to the import page
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": xpath_filter, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
data={"include_filters": xpath_filter, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
@ -226,7 +226,7 @@ def test_xpath_validation(client, live_server, measure_memory_usage):
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": "/something horrible", "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
data={"include_filters": "/something horrible", "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"is not a valid XPath expression" in res.data
@ -247,7 +247,7 @@ def test_xpath23_prefix_validation(client, live_server, measure_memory_usage):
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": "xpath:/something horrible", "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
data={"include_filters": "xpath:/something horrible", "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"is not a valid XPath expression" in res.data
@ -298,7 +298,7 @@ def test_xpath1_lxml(client, live_server, measure_memory_usage):
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": "xpath1://title/text()", "url": test_url, "tags": "", "headers": "",
'fetch_backend': "html_requests"},
'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
@ -331,7 +331,7 @@ def test_xpath1_validation(client, live_server, measure_memory_usage):
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": "xpath1:/something horrible", "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests"},
data={"include_filters": "xpath1:/something horrible", "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"is not a valid XPath expression" in res.data
@ -359,7 +359,7 @@ def test_check_with_prefix_include_filters(client, live_server, measure_memory_u
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": "xpath://*[contains(@class, 'sametext')]", "url": test_url, "tags": "", "headers": "",
'fetch_backend': "html_requests"},
'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
@ -413,7 +413,8 @@ def test_various_rules(client, live_server, measure_memory_usage):
"url": test_url,
"tags": "",
"headers": "",
'fetch_backend': "html_requests"},
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
wait_for_all_checks(client)
@ -444,7 +445,8 @@ def test_xpath_20(client, live_server, measure_memory_usage):
"url": test_url,
"tags": "",
"headers": "",
'fetch_backend': "html_requests"},
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
@ -481,7 +483,8 @@ def test_xpath_20_function_count(client, live_server, measure_memory_usage):
"url": test_url,
"tags": "",
"headers": "",
'fetch_backend': "html_requests"},
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
@ -517,7 +520,8 @@ def test_xpath_20_function_count2(client, live_server, measure_memory_usage):
"url": test_url,
"tags": "",
"headers": "",
'fetch_backend': "html_requests"},
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)
@ -554,7 +558,8 @@ def test_xpath_20_function_string_join_matches(client, live_server, measure_memo
"url": test_url,
"tags": "",
"headers": "",
'fetch_backend': "html_requests"},
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"},
follow_redirects=True
)

Wyświetl plik

@ -6,9 +6,9 @@ from flask import url_for
import logging
import time
def set_original_response():
test_return_data = """<html>
<head><title>head title</title></head>
def set_original_response(extra_title=''):
test_return_data = f"""<html>
<head><title>head title{extra_title}</title></head>
<body>
Some initial text<br>
<p>Which is across multiple lines</p>

Wyświetl plik

@ -36,6 +36,7 @@ def test_visual_selector_content_ready(client, live_server, measure_memory_usage
# For now, cookies doesnt work in headers because it must be a full cookiejar object
'headers': "testheader: yes\buser-agent: MyCustomAgent",
'fetch_backend': "html_webdriver",
"time_between_check_use_default": "y",
},
follow_redirects=True
)
@ -116,6 +117,7 @@ def test_basic_browserstep(client, live_server, measure_memory_usage):
'browser_steps-1-optional_value': '',
# For now, cookies doesnt work in headers because it must be a full cookiejar object
'headers': "testheader: yes\buser-agent: MyCustomAgent",
"time_between_check_use_default": "y",
},
follow_redirects=True
)
@ -167,7 +169,8 @@ def test_non_200_errors_report_browsersteps(client, live_server):
'fetch_backend': "html_webdriver",
'browser_steps-0-operation': 'Click element',
'browser_steps-0-selector': 'button[name=test-button]',
'browser_steps-0-optional_value': ''
'browser_steps-0-optional_value': '',
"time_between_check_use_default": "y"
},
follow_redirects=True
)

Wyświetl plik

@ -0,0 +1,3 @@
from .ternary_boolean import TernaryNoneBooleanWidget, TernaryNoneBooleanField
__all__ = ['TernaryNoneBooleanWidget', 'TernaryNoneBooleanField']

Wyświetl plik

@ -0,0 +1,104 @@
from wtforms import Field
from wtforms import widgets
from markupsafe import Markup
class TernaryNoneBooleanWidget:
"""
A widget that renders a horizontal radio button group with either two options (Yes/No)
or three options (Yes/No/Default), depending on the field's boolean_mode setting.
"""
def __call__(self, field, **kwargs):
html = ['<div class="ternary-radio-group pure-form">']
field_id = kwargs.pop('id', field.id)
boolean_mode = getattr(field, 'boolean_mode', False)
# Get custom text or use defaults
yes_text = getattr(field, 'yes_text', 'Yes')
no_text = getattr(field, 'no_text', 'No')
none_text = getattr(field, 'none_text', 'Main settings')
# True option
checked_true = ' checked' if field.data is True else ''
html.append(f'''
<label class="ternary-radio-option">
<input type="radio" name="{field.name}" value="true" id="{field_id}_true"{checked_true} class="pure-radio">
<span class="ternary-radio-label pure-button-primary">{yes_text}</span>
</label>
''')
# False option
checked_false = ' checked' if field.data is False else ''
html.append(f'''
<label class="ternary-radio-option">
<input type="radio" name="{field.name}" value="false" id="{field_id}_false"{checked_false} class="pure-radio">
<span class="ternary-radio-label">{no_text}</span>
</label>
''')
# None option (only show if not in boolean mode)
if not boolean_mode:
checked_none = ' checked' if field.data is None else ''
html.append(f'''
<label class="ternary-radio-option">
<input type="radio" name="{field.name}" value="none" id="{field_id}_none"{checked_none} class="pure-radio">
<span class="ternary-radio-label ternary-default">{none_text}</span>
</label>
''')
html.append('</div>')
return Markup(''.join(html))
class TernaryNoneBooleanField(Field):
"""
A field that can handle True, False, or None values, represented as a horizontal radio group.
When boolean_mode=True, it acts like a BooleanField (only Yes/No options).
When boolean_mode=False (default), it shows Yes/No/Default options.
Custom text can be provided for each option:
- yes_text: Text for True option (default: "Yes")
- no_text: Text for False option (default: "No")
- none_text: Text for None option (default: "Default")
"""
widget = TernaryNoneBooleanWidget()
def __init__(self, label=None, validators=None, false_values=None, boolean_mode=False,
yes_text="Yes", no_text="No", none_text="Main settings", **kwargs):
super(TernaryNoneBooleanField, self).__init__(label, validators, **kwargs)
self.boolean_mode = boolean_mode
self.yes_text = yes_text
self.no_text = no_text
self.none_text = none_text
if false_values is None:
self.false_values = {'false', ''}
else:
self.false_values = false_values
def process_formdata(self, valuelist):
if not valuelist or not valuelist[0]:
# In boolean mode, default to False instead of None
self.data = False if self.boolean_mode else None
elif valuelist[0].lower() == 'true':
self.data = True
elif valuelist[0].lower() == 'false':
self.data = False
elif valuelist[0].lower() == 'none':
# In boolean mode, treat 'none' as False
self.data = False if self.boolean_mode else None
else:
self.data = False if self.boolean_mode else None
def _value(self):
if self.data is True:
return 'true'
elif self.data is False:
return 'false'
else:
# In boolean mode, None should be treated as False
if self.boolean_mode:
return 'false'
else:
return 'none'

Wyświetl plik

@ -0,0 +1,135 @@
#!/usr/bin/env python3
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..'))
from changedetectionio.widgets import TernaryNoneBooleanField
from wtforms import Form
class TestForm(Form):
# Default text
default_field = TernaryNoneBooleanField('Default Field', default=None)
# Custom text with HTML icons
notification_field = TernaryNoneBooleanField(
'Notifications',
default=False,
yes_text='🔕 Muted',
no_text='🔔 Unmuted',
none_text='⚙️ System default'
)
# HTML with styling
styled_field = TernaryNoneBooleanField(
'Status',
default=None,
yes_text='<strong style="color: green;">✅ Active</strong>',
no_text='<strong style="color: red;">❌ Inactive</strong>',
none_text='<em style="color: gray;">🔧 Auto</em>'
)
# Boolean mode with custom text
boolean_field = TernaryNoneBooleanField(
'Boolean Field',
default=True,
boolean_mode=True,
yes_text="Enabled",
no_text="Disabled"
)
# FontAwesome example
fontawesome_field = TernaryNoneBooleanField(
'Notifications with FontAwesome',
default=None,
yes_text='<i class="fa fa-bell-slash"></i> Muted',
no_text='<i class="fa fa-bell"></i> Unmuted',
none_text='<i class="fa fa-cogs"></i> System default'
)
def test_custom_text():
"""Test custom text functionality"""
form = TestForm()
print("=== Testing TernaryNoneBooleanField Custom Text ===")
# Test default field
print("\n--- Default Field ---")
default_field = form.default_field
default_html = default_field.widget(default_field)
print(f"Contains 'Yes': {'Yes' in default_html}")
print(f"Contains 'No': {'No' in default_html}")
print(f"Contains 'Default': {'Default' in default_html}")
assert 'Yes' in default_html and 'No' in default_html and 'Default' in default_html
# Test custom text field
print("\n--- Custom Text Field with Emojis ---")
notification_field = form.notification_field
notification_html = notification_field.widget(notification_field)
print(f"Contains '🔕 Muted': {'🔕 Muted' in notification_html}")
print(f"Contains '🔔 Unmuted': {'🔔 Unmuted' in notification_html}")
print(f"Contains '⚙️ System default': {'⚙️ System default' in notification_html}")
print(f"Does NOT contain 'Yes': {'Yes' not in notification_html}")
print(f"Does NOT contain 'No': {'No' not in notification_html}")
assert '🔕 Muted' in notification_html and '🔔 Unmuted' in notification_html
assert 'Yes' not in notification_html and 'No' not in notification_html
# Test HTML styling
print("\n--- HTML Styled Field ---")
styled_field = form.styled_field
styled_html = styled_field.widget(styled_field)
print(f"Contains HTML tags: {'<strong' in styled_html}")
print(f"Contains color styling: {'color: green' in styled_html}")
print(f"Contains emojis: {'' in styled_html and '' in styled_html}")
assert '<strong' in styled_html and 'color: green' in styled_html
# Test boolean mode with custom text
print("\n--- Boolean Field with Custom Text ---")
boolean_field = form.boolean_field
boolean_html = boolean_field.widget(boolean_field)
print(f"Contains 'Enabled': {'Enabled' in boolean_html}")
print(f"Contains 'Disabled': {'Disabled' in boolean_html}")
print(f"Does NOT contain 'System default': {'System default' not in boolean_html}")
print(f"Does NOT contain 'Default': {'Default' not in boolean_html}")
assert 'Enabled' in boolean_html and 'Disabled' in boolean_html
assert 'System default' not in boolean_html and 'Default' not in boolean_html
# Test FontAwesome field
print("\n--- FontAwesome Icons Field ---")
fontawesome_field = form.fontawesome_field
fontawesome_html = fontawesome_field.widget(fontawesome_field)
print(f"Contains FontAwesome classes: {'fa fa-bell' in fontawesome_html}")
print(f"Contains multiple FA icons: {'fa fa-bell-slash' in fontawesome_html and 'fa fa-cogs' in fontawesome_html}")
assert 'fa fa-bell' in fontawesome_html
print("\n✅ All custom text tests passed!")
print("\n--- Example Usage ---")
print("TernaryNoneBooleanField('Status', yes_text='🟢 Online', no_text='🔴 Offline', none_text='🟡 Auto')")
print("TernaryNoneBooleanField('Notifications', yes_text='<i class=\"fa fa-bell-slash\"></i> Muted', ...)")
def test_data_processing():
"""Test that custom text doesn't affect data processing"""
print("\n=== Testing Data Processing ===")
form = TestForm()
field = form.notification_field
# Test form data processing
field.process_formdata(['true'])
assert field.data is True, "Custom text should not affect data processing"
print("✅ True processing works with custom text")
field.process_formdata(['false'])
assert field.data is False, "Custom text should not affect data processing"
print("✅ False processing works with custom text")
field.process_formdata(['none'])
assert field.data is None, "Custom text should not affect data processing"
print("✅ None processing works with custom text")
print("✅ All data processing tests passed!")
if __name__ == '__main__':
test_custom_text()
test_data_processing()

Wyświetl plik

@ -84,6 +84,7 @@ services:
# Comment out ports: when using behind a reverse proxy , enable networks: etc.
# Mac users! Use "127.0.0.1:5050:5000" (port 5050) so theres no conflict with Airplay etc. (https://github.com/dgtlmoon/changedetection.io/issues/3401)
ports:
- 127.0.0.1:5000:5000
restart: unless-stopped

Wyświetl plik

@ -1,4 +1,4 @@
openapi: 3.0.3
openapi: 3.1.0
info:
title: ChangeDetection.io API
description: |
@ -28,7 +28,7 @@ info:
For example: `x-api-key: YOUR_API_KEY`
version: 0.1.0
version: 0.1.1
contact:
name: ChangeDetection.io
url: https://github.com/dgtlmoon/changedetection.io
@ -119,14 +119,9 @@ components:
Enter your API key in the "Authorize" button above to automatically populate all code examples.
schemas:
Watch:
WatchBase:
type: object
properties:
uuid:
type: string
format: uuid
description: Unique identifier for the web page change monitor (watch)
readOnly: true
url:
type: string
format: uri
@ -134,7 +129,7 @@ components:
maxLength: 5000
title:
type: string
description: Custom title for the web page change monitor (watch)
description: Custom title for the web page change monitor (watch), not to be confused with page_title
maxLength: 5000
tag:
type: string
@ -193,6 +188,10 @@ components:
seconds:
type: integer
description: Time intervals between checks
time_between_check_use_default:
type: boolean
default: true
description: Whether to use global settings for time between checks - defaults to true if not set
notification_urls:
type: array
items:
@ -229,28 +228,56 @@ components:
maxLength: 5000
required: [operation, selector, optional_value]
description: Browser automation steps
last_checked:
type: integer
description: Unix timestamp of last check
readOnly: true
last_changed:
type: integer
description: Unix timestamp of last change
readOnly: true
last_error:
type: string
description: Last error message
readOnly: true
required:
- url
Watch:
allOf:
- $ref: '#/components/schemas/WatchBase'
- type: object
properties:
uuid:
type: string
format: uuid
description: Unique identifier for the web page change monitor (watch)
readOnly: true
last_checked:
type: integer
description: Unix timestamp of last check
readOnly: true
last_changed:
type: integer
description: Unix timestamp of last change
readOnly: true
last_error:
type: string
description: Last error message
readOnly: true
last_viewed:
type: integer
description: Unix timestamp in seconds of the last time the watch was viewed. Setting it to a value higher than `last_changed` in the "Update watch" endpoint marks the watch as viewed.
minimum: 0
link:
type: string
format: string
description: The watch URL rendered in case of any Jinja2 markup, always use this for listing.
readOnly: true
CreateWatch:
allOf:
- $ref: '#/components/schemas/Watch'
- type: object
- $ref: '#/components/schemas/WatchBase'
- type: 'object'
required:
- url
UpdateWatch:
allOf:
- $ref: '#/components/schemas/WatchBase'
- type: object
properties:
last_viewed:
type: integer
description: Unix timestamp in seconds of the last time the watch was viewed. Setting it to a value higher than `last_changed` in the "Update watch" endpoint marks the watch as viewed.
minimum: 0
Tag:
type: object
properties:
@ -271,8 +298,13 @@ components:
notification_muted:
type: boolean
description: Whether notifications are muted for this tag
required:
- title
CreateTag:
allOf:
- $ref: '#/components/schemas/Tag'
- type: object
required:
- title
NotificationUrls:
type: object
@ -368,9 +400,10 @@ paths:
example:
"095be615-a8ad-4c33-8e9c-c7612fbf6c9f":
uuid: "095be615-a8ad-4c33-8e9c-c7612fbf6c9f"
url: "http://example.com"
title: "Example Website Monitor"
tag: "550e8400-e29b-41d4-a716-446655440000"
url: "http://example.com?id={{1+1}} - the raw URL"
link: "http://example.com?id=2 - the rendered URL, always use this for listing."
title: "Example Website Monitor - manually entered title/description"
page_title: "The HTML <title> from the page"
tags: ["550e8400-e29b-41d4-a716-446655440000"]
paused: false
muted: false
@ -380,9 +413,10 @@ paths:
last_changed: 1640995200
"7c9e6b8d-f2a1-4e5c-9d3b-8a7f6e4c2d1a":
uuid: "7c9e6b8d-f2a1-4e5c-9d3b-8a7f6e4c2d1a"
url: "https://news.example.org"
title: "News Site Tracker"
tag: "330e8400-e29b-41d4-a716-446655440001"
url: "http://example.com?id={{1+1}} - the raw URL"
link: "http://example.com?id=2 - the rendered URL, always use this for listing."
title: "News Site Tracker - manually entered title/description"
page_title: "The HTML <title> from the page"
tags: ["330e8400-e29b-41d4-a716-446655440001"]
paused: false
muted: true
@ -562,7 +596,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/Watch'
$ref: '#/components/schemas/UpdateWatch'
responses:
'200':
description: Web page change monitor (watch) updated successfully
@ -576,7 +610,7 @@ paths:
delete:
operationId: deleteWatch
tags: [Watch Management]
tags: [Watch Management]
summary: Delete watch
description: Delete a web page change monitor (watch) and all related history
x-code-samples:
@ -805,7 +839,7 @@ paths:
'Content-Type': 'application/json'
}
data = {'title': 'Important Sites'}
response = requests.post('http://localhost:5000/api/v1/tag',
response = requests.post('http://localhost:5000/api/v1/tag',
headers=headers, json=data)
print(response.json())
requestBody:
@ -813,7 +847,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/Tag'
$ref: '#/components/schemas/CreateTag'
example:
title: "Important Sites"
responses:
@ -1188,7 +1222,6 @@ paths:
uuid: "095be615-a8ad-4c33-8e9c-c7612fbf6c9f"
url: "http://example.com"
title: "Example Website Monitor"
tag: "550e8400-e29b-41d4-a716-446655440000"
tags: ["550e8400-e29b-41d4-a716-446655440000"]
paused: false
muted: false

File diff suppressed because one or more lines are too long