import time from copy import deepcopy import os import importlib.resources from flask import Blueprint, request, redirect, url_for, flash, render_template, make_response, send_from_directory, abort from loguru import logger from jinja2 import Environment, FileSystemLoader from changedetectionio.store import ChangeDetectionStore from changedetectionio.auth_decorator import login_optionally_required from changedetectionio.time_handler import is_within_schedule from changedetectionio import worker_handler def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData): edit_blueprint = Blueprint('ui_edit', __name__, template_folder="../ui/templates") def _watch_has_tag_options_set(watch): """This should be fixed better so that Tag is some proper Model, a tag is just a Watch also""" for tag_uuid, tag in datastore.data['settings']['application'].get('tags', {}).items(): if tag_uuid in watch.get('tags', []) and (tag.get('include_filters') or tag.get('subtractive_selectors')): return True @edit_blueprint.route("/edit/", methods=['GET', 'POST']) @login_optionally_required # https://stackoverflow.com/questions/42984453/wtforms-populate-form-with-data-if-data-exists # https://wtforms.readthedocs.io/en/3.0.x/forms/#wtforms.form.Form.populate_obj ? def edit_page(uuid): from changedetectionio import forms from changedetectionio.blueprint.browser_steps.browser_steps import browser_step_ui_config from changedetectionio import processors import importlib # More for testing, possible to return the first/only if not datastore.data['watching'].keys(): flash("No watches to edit", "error") return redirect(url_for('watchlist.index')) if uuid == 'first': uuid = list(datastore.data['watching'].keys()).pop() if not uuid in datastore.data['watching']: flash("No watch with the UUID %s found." % (uuid), "error") return redirect(url_for('watchlist.index')) switch_processor = request.args.get('switch_processor') if switch_processor: for p in processors.available_processors(): if p[0] == switch_processor: datastore.data['watching'][uuid]['processor'] = switch_processor flash(f"Switched to mode - {p[1]}.") datastore.clear_watch_history(uuid) redirect(url_for('ui_edit.edit_page', uuid=uuid)) # be sure we update with a copy instead of accidently editing the live object by reference default = deepcopy(datastore.data['watching'][uuid]) # Defaults for proxy choice if datastore.proxy_list is not None: # When enabled # @todo # Radio needs '' not None, or incase that the chosen one no longer exists if default['proxy'] is None or not any(default['proxy'] in tup for tup in datastore.proxy_list): default['proxy'] = '' # proxy_override set to the json/text list of the items # Does it use some custom form? does one exist? processor_name = datastore.data['watching'][uuid].get('processor', '') processor_classes = next((tpl for tpl in processors.find_processors() if tpl[1] == processor_name), None) if not processor_classes: flash(f"Cannot load the edit form for processor/plugin '{processor_classes[1]}', plugin missing?", 'error') return redirect(url_for('watchlist.index')) parent_module = processors.get_parent_module(processor_classes[0]) try: # Get the parent of the "processor.py" go up one, get the form (kinda spaghetti but its reusing existing code) forms_module = importlib.import_module(f"{parent_module.__name__}.forms") # Access the 'processor_settings_form' class from the 'forms' module form_class = getattr(forms_module, 'processor_settings_form') except ModuleNotFoundError as e: # .forms didnt exist form_class = forms.processor_text_json_diff_form except AttributeError as e: # .forms exists but no useful form form_class = forms.processor_text_json_diff_form form = form_class(formdata=request.form if request.method == 'POST' else None, data=default, extra_notification_tokens=default.extra_notification_token_values(), default_system_settings=datastore.data['settings'] ) # For the form widget tag UUID back to "string name" for the field form.tags.datastore = datastore # Used by some forms that need to dig deeper form.datastore = datastore form.watch = default for p in datastore.extra_browsers: form.fetch_backend.choices.append(p) form.fetch_backend.choices.append(("system", 'System settings default')) # form.browser_steps[0] can be assumed that we 'goto url' first if datastore.proxy_list is None: # @todo - Couldn't get setattr() etc dynamic addition working, so remove it instead del form.proxy else: form.proxy.choices = [('', 'Default')] for p in datastore.proxy_list: form.proxy.choices.append(tuple((p, datastore.proxy_list[p]['label']))) if request.method == 'POST' and form.validate(): # If they changed processor, it makes sense to reset it. if datastore.data['watching'][uuid].get('processor') != form.data.get('processor'): datastore.data['watching'][uuid].clear_watch() flash("Reset watch history due to change of processor") extra_update_obj = { 'consecutive_filter_failures': 0, 'last_error' : False } if request.args.get('unpause_on_save'): extra_update_obj['paused'] = False extra_update_obj['time_between_check'] = form.time_between_check.data # Ignore text form_ignore_text = form.ignore_text.data datastore.data['watching'][uuid]['ignore_text'] = form_ignore_text # Be sure proxy value is None if datastore.proxy_list is not None and form.data['proxy'] == '': extra_update_obj['proxy'] = None # Unsetting all filter_text methods should make it go back to default # This particularly affects tests running if 'filter_text_added' in form.data and not form.data.get('filter_text_added') \ and 'filter_text_replaced' in form.data and not form.data.get('filter_text_replaced') \ and 'filter_text_removed' in form.data and not form.data.get('filter_text_removed'): extra_update_obj['filter_text_added'] = True extra_update_obj['filter_text_replaced'] = True extra_update_obj['filter_text_removed'] = True # Because wtforms doesn't support accessing other data in process_ , but we convert the CSV list of tags back to a list of UUIDs tag_uuids = [] if form.data.get('tags'): # Sometimes in testing this can be list, dont know why if type(form.data.get('tags')) == list: extra_update_obj['tags'] = form.data.get('tags') else: for t in form.data.get('tags').split(','): tag_uuids.append(datastore.add_tag(title=t)) extra_update_obj['tags'] = tag_uuids datastore.data['watching'][uuid].update(form.data) datastore.data['watching'][uuid].update(extra_update_obj) if not datastore.data['watching'][uuid].get('tags'): # Force it to be a list, because form.data['tags'] will be string if nothing found # And del(form.data['tags'] ) wont work either for some reason datastore.data['watching'][uuid]['tags'] = [] # Recast it if need be to right data Watch handler watch_class = processors.get_custom_watch_obj_for_processor(form.data.get('processor')) datastore.data['watching'][uuid] = watch_class(datastore_path=datastore.datastore_path, default=datastore.data['watching'][uuid]) flash("Updated watch - unpaused!" if request.args.get('unpause_on_save') else "Updated watch.") # Re #286 - We wait for syncing new data to disk in another thread every 60 seconds # But in the case something is added we should save straight away datastore.needs_write_urgent = True # Do not queue on edit if its not within the time range # @todo maybe it should never queue anyway on edit... is_in_schedule = True watch = datastore.data['watching'].get(uuid) if watch.get('time_between_check_use_default'): time_schedule_limit = datastore.data['settings']['requests'].get('time_schedule_limit', {}) else: time_schedule_limit = watch.get('time_schedule_limit') tz_name = time_schedule_limit.get('timezone') if not tz_name: tz_name = datastore.data['settings']['application'].get('timezone', 'UTC') if time_schedule_limit and time_schedule_limit.get('enabled'): try: is_in_schedule = is_within_schedule(time_schedule_limit=time_schedule_limit, default_tz=tz_name ) except Exception as e: logger.error( f"{uuid} - Recheck scheduler, error handling timezone, check skipped - TZ name '{tz_name}' - {str(e)}") return False ############################# if not datastore.data['watching'][uuid].get('paused') and is_in_schedule: # Queue the watch for immediate recheck, with a higher priority worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) # Diff page [edit] link should go back to diff page if request.args.get("next") and request.args.get("next") == 'diff': return redirect(url_for('ui.ui_views.diff_history_page', uuid=uuid)) return redirect(url_for('watchlist.index', tag=request.args.get("tag",''))) else: if request.method == 'POST' and not form.validate(): flash("An error occurred, please see below.", "error") # JQ is difficult to install on windows and must be manually added (outside requirements.txt) jq_support = True try: import jq except ModuleNotFoundError: jq_support = False watch = datastore.data['watching'].get(uuid) # if system or watch is configured to need a chrome type browser system_uses_webdriver = datastore.data['settings']['application']['fetch_backend'] == 'html_webdriver' watch_needs_selenium_or_playwright = False if (watch.get('fetch_backend') == 'system' and system_uses_webdriver) or watch.get('fetch_backend') == 'html_webdriver' or watch.get('fetch_backend', '').startswith('extra_browser_'): watch_needs_selenium_or_playwright = True from zoneinfo import available_timezones # Only works reliably with Playwright # Import the global plugin system from changedetectionio.pluggy_interface import collect_ui_edit_stats_extras template_args = { 'available_processors': processors.available_processors(), 'available_timezones': sorted(available_timezones()), 'browser_steps_config': browser_step_ui_config, 'emailprefix': os.getenv('NOTIFICATION_MAIL_BUTTON_PREFIX', False), 'extra_notification_token_placeholder_info': datastore.get_unique_notification_token_placeholders_available(), 'extra_processor_config': form.extra_tab_content(), 'extra_title': f" - Edit - {watch.label}", 'form': form, 'has_default_notification_urls': True if len(datastore.data['settings']['application']['notification_urls']) else False, 'has_extra_headers_file': len(datastore.get_all_headers_in_textfile_for_watch(uuid=uuid)) > 0, 'has_special_tag_options': _watch_has_tag_options_set(watch=watch), 'jq_support': jq_support, 'playwright_enabled': os.getenv('PLAYWRIGHT_DRIVER_URL', False), 'settings_application': datastore.data['settings']['application'], 'system_has_playwright_configured': os.getenv('PLAYWRIGHT_DRIVER_URL'), 'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'), 'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch), 'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid), 'timezone_default_config': datastore.data['settings']['application'].get('timezone'), 'using_global_webdriver_wait': not default['webdriver_delay'], 'uuid': uuid, 'watch': watch, 'watch_needs_selenium_or_playwright': watch_needs_selenium_or_playwright, } included_content = None if form.extra_form_content(): # So that the extra panels can access _helpers.html etc, we set the environment to load from templates/ # And then render the code from the module templates_dir = str(importlib.resources.files("changedetectionio").joinpath('templates')) env = Environment(loader=FileSystemLoader(templates_dir)) template = env.from_string(form.extra_form_content()) included_content = template.render(**template_args) output = render_template("edit.html", extra_tab_content=form.extra_tab_content() if form.extra_tab_content() else None, extra_form_content=included_content, **template_args ) return output @edit_blueprint.route("/edit//get-html", methods=['GET']) @login_optionally_required def watch_get_latest_html(uuid): from io import BytesIO from flask import send_file import brotli watch = datastore.data['watching'].get(uuid) if watch and watch.history.keys() and os.path.isdir(watch.watch_data_dir): latest_filename = list(watch.history.keys())[-1] html_fname = os.path.join(watch.watch_data_dir, f"{latest_filename}.html.br") with open(html_fname, 'rb') as f: if html_fname.endswith('.br'): # Read and decompress the Brotli file decompressed_data = brotli.decompress(f.read()) else: decompressed_data = f.read() buffer = BytesIO(decompressed_data) return send_file(buffer, as_attachment=True, download_name=f"{latest_filename}.html", mimetype='text/html') # Return a 500 error abort(500) # Ajax callback @edit_blueprint.route("/edit//preview-rendered", methods=['POST']) @login_optionally_required def watch_get_preview_rendered(uuid): '''For when viewing the "preview" of the rendered text from inside of Edit''' from flask import jsonify from changedetectionio.processors.text_json_diff import prepare_filter_prevew result = prepare_filter_prevew(watch_uuid=uuid, form_data=request.form, datastore=datastore) return jsonify(result) @edit_blueprint.route("/highlight_submit_ignore_url", methods=['POST']) @login_optionally_required def highlight_submit_ignore_url(): import re mode = request.form.get('mode') selection = request.form.get('selection') uuid = request.args.get('uuid','') if datastore.data["watching"].get(uuid): if mode == 'exact': for l in selection.splitlines(): datastore.data["watching"][uuid]['ignore_text'].append(l.strip()) elif mode == 'digit-regex': for l in selection.splitlines(): # Replace any series of numbers with a regex s = re.escape(l.strip()) s = re.sub(r'[0-9]+', r'\\d+', s) datastore.data["watching"][uuid]['ignore_text'].append('/' + s + '/') return f"Click to preview" return edit_blueprint