from blinker import signal from changedetectionio.strtobool import strtobool from changedetectionio.safe_jinja import render as jinja_render from . import watch_base import os import re from pathlib import Path from loguru import logger from .. import safe_jinja from ..html_tools import TRANSLATE_WHITESPACE_TABLE # Allowable protocols, protects against javascript: etc # file:// is further checked by ALLOW_FILE_URI SAFE_PROTOCOL_REGEX='^(http|https|ftp|file):' minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3)) mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7} def is_safe_url(test_url): # See https://github.com/dgtlmoon/changedetection.io/issues/1358 # Remove 'source:' prefix so we dont get 'source:javascript:' etc # 'source:' is a valid way to tell us to return the source r = re.compile(re.escape('source:'), re.IGNORECASE) test_url = r.sub('', test_url) pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', SAFE_PROTOCOL_REGEX), re.IGNORECASE) if not pattern.match(test_url.strip()): return False return True class model(watch_base): __newest_history_key = None __history_n = 0 jitter_seconds = 0 def __init__(self, *arg, **kw): self.__datastore_path = kw.get('datastore_path') if kw.get('datastore_path'): del kw['datastore_path'] super(model, self).__init__(*arg, **kw) if kw.get('default'): self.update(kw['default']) del kw['default'] if self.get('default'): del self['default'] # Be sure the cached timestamp is ready bump = self.history @property def viewed(self): # Don't return viewed when last_viewed is 0 and newest_key is 0 if int(self['last_viewed']) and int(self['last_viewed']) >= int(self.newest_history_key) : return True return False @property def has_unviewed(self): return int(self.newest_history_key) > int(self['last_viewed']) and self.__history_n >= 2 def ensure_data_dir_exists(self): if not os.path.isdir(self.watch_data_dir): logger.debug(f"> Creating data dir {self.watch_data_dir}") os.mkdir(self.watch_data_dir) @property def link(self): url = self.get('url', '') if not is_safe_url(url): return 'DISABLED' ready_url = url if '{%' in url or '{{' in url: # Jinja2 available in URLs along with https://pypi.org/project/jinja2-time/ try: ready_url = jinja_render(template_str=url) except Exception as e: logger.critical(f"Invalid URL template for: '{url}' - {str(e)}") from flask import ( flash, Markup, url_for ) message = Markup('The URL {} is invalid and cannot be used, click to edit'.format( url_for('ui.ui_edit.edit_page', uuid=self.get('uuid')), self.get('url', ''))) flash(message, 'error') return '' if ready_url.startswith('source:'): ready_url=ready_url.replace('source:', '') # Also double check it after any Jinja2 formatting just incase if not is_safe_url(ready_url): return 'DISABLED' return ready_url @property def domain_only_from_link(self): from urllib.parse import urlparse parsed = urlparse(self.link) domain = parsed.hostname return domain def clear_watch(self): import pathlib # JSON Data, Screenshots, Textfiles (history index and snapshots), HTML in the future etc for item in pathlib.Path(str(self.watch_data_dir)).rglob("*.*"): os.unlink(item) # Force the attr to recalculate bump = self.history # Do this last because it will trigger a recheck due to last_checked being zero self.update({ 'browser_steps_last_error_step': None, 'check_count': 0, 'fetch_time': 0.0, 'has_ldjson_price_data': None, 'last_checked': 0, 'last_error': False, 'last_notification_error': False, 'last_viewed': 0, 'previous_md5': False, 'previous_md5_before_filters': False, 'remote_server_reply': None, 'track_ldjson_price_data': None }) watch_check_update = signal('watch_check_update') if watch_check_update: watch_check_update.send(watch_uuid=self.get('uuid')) return @property def is_source_type_url(self): return self.get('url', '').startswith('source:') @property def get_fetch_backend(self): """ Like just using the `fetch_backend` key but there could be some logic :return: """ # Maybe also if is_image etc? # This is because chrome/playwright wont render the PDF in the browser and we will just fetch it and use pdf2html to see the text. if self.is_pdf: return 'html_requests' return self.get('fetch_backend') @property def is_pdf(self): # content_type field is set in the future # https://github.com/dgtlmoon/changedetection.io/issues/1392 # Not sure the best logic here return self.get('url', '').lower().endswith('.pdf') or 'pdf' in self.get('content_type', '').lower() @property def label(self): # Used for sorting return self.get('title') if self.get('title') else self.get('url') @property def last_changed(self): # last_changed will be the newest snapshot, but when we have just one snapshot, it should be 0 if self.__history_n <= 1: return 0 if self.__newest_history_key: return int(self.__newest_history_key) return 0 @property def history_n(self): return self.__history_n @property def history(self): """History index is just a text file as a list {watch-uuid}/history.txt contains a list like {epoch-time},{filename}\n We read in this list as the history information """ tmp_history = {} # In the case we are only using the watch for processing without history if not self.watch_data_dir: return [] # Read the history file as a dict fname = os.path.join(self.watch_data_dir, "history.txt") if os.path.isfile(fname): logger.debug(f"Reading watch history index for {self.get('uuid')}") with open(fname, "r") as f: for i in f.readlines(): if ',' in i: k, v = i.strip().split(',', 2) # The index history could contain a relative path, so we need to make the fullpath # so that python can read it if not '/' in v and not '\'' in v: v = os.path.join(self.watch_data_dir, v) else: # It's possible that they moved the datadir on older versions # So the snapshot exists but is in a different path snapshot_fname = v.split('/')[-1] proposed_new_path = os.path.join(self.watch_data_dir, snapshot_fname) if not os.path.exists(v) and os.path.exists(proposed_new_path): v = proposed_new_path tmp_history[k] = v if len(tmp_history): self.__newest_history_key = list(tmp_history.keys())[-1] else: self.__newest_history_key = None self.__history_n = len(tmp_history) return tmp_history @property def has_history(self): fname = os.path.join(self.watch_data_dir, "history.txt") return os.path.isfile(fname) @property def has_browser_steps(self): has_browser_steps = self.get('browser_steps') and list(filter( lambda s: (s['operation'] and len(s['operation']) and s['operation'] != 'Choose one' and s['operation'] != 'Goto site'), self.get('browser_steps'))) return has_browser_steps @property def has_restock_info(self): if self.get('restock') and self['restock'].get('in_stock') != None: return True return False # Returns the newest key, but if theres only 1 record, then it's counted as not being new, so return 0. @property def newest_history_key(self): if self.__newest_history_key is not None: return self.__newest_history_key if len(self.history) <= 1: return 0 bump = self.history return self.__newest_history_key # Given an arbitrary timestamp, find the best history key for the [diff] button so it can preset a smarter from_version @property def get_from_version_based_on_last_viewed(self): """Unfortunately for now timestamp is stored as string key""" keys = list(self.history.keys()) if not keys: return None if len(keys) == 1: return keys[0] last_viewed = int(self.get('last_viewed')) sorted_keys = sorted(keys, key=lambda x: int(x)) sorted_keys.reverse() # When the 'last viewed' timestamp is greater than or equal the newest snapshot, return second newest if last_viewed >= int(sorted_keys[0]): return sorted_keys[1] # When the 'last viewed' timestamp is between snapshots, return the older snapshot for newer, older in list(zip(sorted_keys[0:], sorted_keys[1:])): if last_viewed < int(newer) and last_viewed >= int(older): return older # When the 'last viewed' timestamp is less than the oldest snapshot, return oldest return sorted_keys[-1] def get_history_snapshot(self, timestamp): import brotli filepath = self.history[timestamp] # See if a brotli versions exists and switch to that if not filepath.endswith('.br') and os.path.isfile(f"{filepath}.br"): filepath = f"{filepath}.br" # OR in the backup case that the .br does not exist, but the plain one does if filepath.endswith('.br') and not os.path.isfile(filepath): if os.path.isfile(filepath.replace('.br', '')): filepath = filepath.replace('.br', '') if filepath.endswith('.br'): # Brotli doesnt have a fileheader to detect it, so we rely on filename # https://www.rfc-editor.org/rfc/rfc7932 with open(filepath, 'rb') as f: return(brotli.decompress(f.read()).decode('utf-8')) with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: return f.read() # Save some text file to the appropriate path and bump the history # result_obj from fetch_site_status.run() def save_history_text(self, contents, timestamp, snapshot_id): import brotli import tempfile logger.trace(f"{self.get('uuid')} - Updating history.txt with timestamp {timestamp}") self.ensure_data_dir_exists() threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024)) skip_brotli = strtobool(os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False')) # Decide on snapshot filename and destination path if not skip_brotli and len(contents) > threshold: snapshot_fname = f"{snapshot_id}.txt.br" encoded_data = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT) else: snapshot_fname = f"{snapshot_id}.txt" encoded_data = contents.encode('utf-8') dest = os.path.join(self.watch_data_dir, snapshot_fname) # Write snapshot file atomically if it doesn't exist if not os.path.exists(dest): with tempfile.NamedTemporaryFile('wb', delete=False, dir=self.watch_data_dir) as tmp: tmp.write(encoded_data) tmp.flush() os.fsync(tmp.fileno()) tmp_path = tmp.name os.rename(tmp_path, dest) # Append to history.txt atomically index_fname = os.path.join(self.watch_data_dir, "history.txt") index_line = f"{timestamp},{snapshot_fname}\n" # Lets try force flush here since it's usually a very small file # If this still fails in the future then try reading all to memory first, re-writing etc with open(index_fname, 'a', encoding='utf-8') as f: f.write(index_line) f.flush() os.fsync(f.fileno()) # Update internal state self.__newest_history_key = timestamp self.__history_n += 1 # @todo bump static cache of the last timestamp so we dont need to examine the file to set a proper ''viewed'' status return snapshot_fname @property def has_empty_checktime(self): # using all() + dictionary comprehension # Check if all values are 0 in dictionary res = all(x == None or x == False or x==0 for x in self.get('time_between_check', {}).values()) return res def threshold_seconds(self): seconds = 0 for m, n in mtable.items(): x = self.get('time_between_check', {}).get(m, None) if x: seconds += x * n return seconds # Iterate over all history texts and see if something new exists # Always applying .strip() to start/end but optionally replace any other whitespace def lines_contain_something_unique_compared_to_history(self, lines: list, ignore_whitespace=False): local_lines = set([]) if lines: if ignore_whitespace: if isinstance(lines[0], str): # Can be either str or bytes depending on what was on the disk local_lines = set([l.translate(TRANSLATE_WHITESPACE_TABLE).lower() for l in lines]) else: local_lines = set([l.decode('utf-8').translate(TRANSLATE_WHITESPACE_TABLE).lower() for l in lines]) else: if isinstance(lines[0], str): # Can be either str or bytes depending on what was on the disk local_lines = set([l.strip().lower() for l in lines]) else: local_lines = set([l.decode('utf-8').strip().lower() for l in lines]) # Compare each lines (set) against each history text file (set) looking for something new.. existing_history = set({}) for k, v in self.history.items(): content = self.get_history_snapshot(k) if ignore_whitespace: alist = set([line.translate(TRANSLATE_WHITESPACE_TABLE).lower() for line in content.splitlines()]) else: alist = set([line.strip().lower() for line in content.splitlines()]) existing_history = existing_history.union(alist) # Check that everything in local_lines(new stuff) already exists in existing_history - it should # if not, something new happened return not local_lines.issubset(existing_history) def get_screenshot(self): fname = os.path.join(self.watch_data_dir, "last-screenshot.png") if os.path.isfile(fname): return fname # False is not an option for AppRise, must be type None return None def bump_favicon(self, url, favicon_base_64: str) -> None: from urllib.parse import urlparse import base64 import binascii decoded = None if url: try: parsed = urlparse(url) filename = os.path.basename(parsed.path) (base, extension) = filename.lower().strip().rsplit('.', 1) except ValueError: logger.error(f"UUID: {self.get('uuid')} Cant work out file extension from '{url}'") return None else: # Assume favicon.ico base = "favicon" extension = "ico" fname = os.path.join(self.watch_data_dir, f"favicon.{extension}") try: # validate=True makes sure the string only contains valid base64 chars decoded = base64.b64decode(favicon_base_64, validate=True) except (binascii.Error, ValueError) as e: logger.warning(f"UUID: {self.get('uuid')} FavIcon save data (Base64) corrupt? {str(e)}") else: if decoded: try: with open(fname, 'wb') as f: f.write(decoded) # A signal that could trigger the socket server to update the browser also watch_check_update = signal('watch_favicon_bump') if watch_check_update: watch_check_update.send(watch_uuid=self.get('uuid')) except Exception as e: logger.warning(f"UUID: {self.get('uuid')} error saving FavIcon to {fname} - {str(e)}") # @todo - Store some checksum and only write when its different logger.debug(f"UUID: {self.get('uuid')} updated favicon to at {fname}") def get_favicon_filename(self) -> str | None: """ Find any favicon.* file in the current working directory and return the contents of the newest one. Returns: bytes: Contents of the newest favicon file, or None if not found. """ import glob # Search for all favicon.* files files = glob.glob(os.path.join(self.watch_data_dir, "favicon.*")) if not files: return None # Find the newest by modification time newest_file = max(files, key=os.path.getmtime) return os.path.basename(newest_file) def get_screenshot_as_thumbnail(self, max_age=3200): """Return path to a square thumbnail of the most recent screenshot. Creates a 150x150 pixel thumbnail from the top portion of the screenshot. Args: max_age: Maximum age in seconds before recreating thumbnail Returns: Path to thumbnail or None if no screenshot exists """ import os import time thumbnail_path = os.path.join(self.watch_data_dir, "thumbnail.jpeg") top_trim = 500 # Pixels from top of screenshot to use screenshot_path = self.get_screenshot() if not screenshot_path: return None # Reuse thumbnail if it's fresh and screenshot hasn't changed if os.path.isfile(thumbnail_path): thumbnail_mtime = os.path.getmtime(thumbnail_path) screenshot_mtime = os.path.getmtime(screenshot_path) if screenshot_mtime <= thumbnail_mtime and time.time() - thumbnail_mtime < max_age: return thumbnail_path try: from PIL import Image with Image.open(screenshot_path) as img: # Crop top portion first (full width, top_trim height) top_crop_height = min(top_trim, img.height) img = img.crop((0, 0, img.width, top_crop_height)) # Create a smaller intermediate image (to reduce memory usage) aspect = img.width / img.height interim_width = min(top_trim, img.width) interim_height = int(interim_width / aspect) if aspect > 0 else top_trim img = img.resize((interim_width, interim_height), Image.NEAREST) # Convert to RGB if needed if img.mode != 'RGB': img = img.convert('RGB') # Crop to square from top center square_size = min(img.width, img.height) left = (img.width - square_size) // 2 img = img.crop((left, 0, left + square_size, square_size)) # Final resize to exact thumbnail size with better filter img = img.resize((350, 350), Image.BILINEAR) # Save with optimized settings img.save(thumbnail_path, "JPEG", quality=75, optimize=True) return thumbnail_path except Exception as e: logger.error(f"Error creating thumbnail for {self.get('uuid')}: {str(e)}") return None def __get_file_ctime(self, filename): fname = os.path.join(self.watch_data_dir, filename) if os.path.isfile(fname): return int(os.path.getmtime(fname)) return False @property def error_text_ctime(self): return self.__get_file_ctime('last-error.txt') @property def snapshot_text_ctime(self): if self.history_n==0: return False timestamp = list(self.history.keys())[-1] return int(timestamp) @property def snapshot_screenshot_ctime(self): return self.__get_file_ctime('last-screenshot.png') @property def snapshot_error_screenshot_ctime(self): return self.__get_file_ctime('last-error-screenshot.png') @property def watch_data_dir(self): # The base dir of the watch data return os.path.join(self.__datastore_path, self['uuid']) if self.__datastore_path else None def get_error_text(self): """Return the text saved from a previous request that resulted in a non-200 error""" fname = os.path.join(self.watch_data_dir, "last-error.txt") if os.path.isfile(fname): with open(fname, 'r') as f: return f.read() return False def get_error_snapshot(self): """Return path to the screenshot that resulted in a non-200 error""" fname = os.path.join(self.watch_data_dir, "last-error-screenshot.png") if os.path.isfile(fname): return fname return False def pause(self): self['paused'] = True def unpause(self): self['paused'] = False def toggle_pause(self): self['paused'] ^= True def mute(self): self['notification_muted'] = True def unmute(self): self['notification_muted'] = False def toggle_mute(self): self['notification_muted'] ^= True def extra_notification_token_values(self): # Used for providing extra tokens # return {'widget': 555} return {} def extra_notification_token_placeholder_info(self): # Used for providing extra tokens # return [('widget', "Get widget amounts")] return [] def extract_regex_from_all_history(self, regex): import csv import re import datetime csv_output_filename = False csv_writer = False f = None # self.history will be keyed with the full path for k, fname in self.history.items(): if os.path.isfile(fname): if True: contents = self.get_history_snapshot(k) res = re.findall(regex, contents, re.MULTILINE) if res: if not csv_writer: # A file on the disk can be transferred much faster via flask than a string reply csv_output_filename = 'report.csv' f = open(os.path.join(self.watch_data_dir, csv_output_filename), 'w') # @todo some headers in the future #fieldnames = ['Epoch seconds', 'Date'] csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL, #fieldnames=fieldnames ) csv_writer.writerow(['Epoch seconds', 'Date']) # csv_writer.writeheader() date_str = datetime.datetime.fromtimestamp(int(k)).strftime('%Y-%m-%d %H:%M:%S') for r in res: row = [k, date_str] if isinstance(r, str): row.append(r) else: row+=r csv_writer.writerow(row) if f: f.close() return csv_output_filename def has_special_diff_filter_options_set(self): # All False - nothing would be done, so act like it's not processable if not self.get('filter_text_added', True) and not self.get('filter_text_replaced', True) and not self.get('filter_text_removed', True): return False # Or one is set if not self.get('filter_text_added', True) or not self.get('filter_text_replaced', True) or not self.get('filter_text_removed', True): return True # None is set return False def save_error_text(self, contents): self.ensure_data_dir_exists() target_path = os.path.join(self.watch_data_dir, "last-error.txt") with open(target_path, 'w', encoding='utf-8') as f: f.write(contents) def save_xpath_data(self, data, as_error=False): import json import zlib if as_error: target_path = os.path.join(str(self.watch_data_dir), "elements-error.deflate") else: target_path = os.path.join(str(self.watch_data_dir), "elements.deflate") self.ensure_data_dir_exists() with open(target_path, 'wb') as f: if not isinstance(data, str): f.write(zlib.compress(json.dumps(data).encode())) else: f.write(zlib.compress(data.encode())) f.close() # Save as PNG, PNG is larger but better for doing visual diff in the future def save_screenshot(self, screenshot: bytes, as_error=False): if as_error: target_path = os.path.join(self.watch_data_dir, "last-error-screenshot.png") else: target_path = os.path.join(self.watch_data_dir, "last-screenshot.png") self.ensure_data_dir_exists() with open(target_path, 'wb') as f: f.write(screenshot) f.close() def get_last_fetched_text_before_filters(self): import brotli filepath = os.path.join(self.watch_data_dir, 'last-fetched.br') if not os.path.isfile(filepath) or os.path.getsize(filepath) == 0: # If a previous attempt doesnt yet exist, just snarf the previous snapshot instead dates = list(self.history.keys()) if len(dates): return self.get_history_snapshot(dates[-1]) else: return '' with open(filepath, 'rb') as f: return(brotli.decompress(f.read()).decode('utf-8')) def save_last_text_fetched_before_filters(self, contents): import brotli filepath = os.path.join(self.watch_data_dir, 'last-fetched.br') with open(filepath, 'wb') as f: f.write(brotli.compress(contents, mode=brotli.MODE_TEXT)) def save_last_fetched_html(self, timestamp, contents): import brotli self.ensure_data_dir_exists() snapshot_fname = f"{timestamp}.html.br" filepath = os.path.join(self.watch_data_dir, snapshot_fname) with open(filepath, 'wb') as f: contents = contents.encode('utf-8') if isinstance(contents, str) else contents try: f.write(brotli.compress(contents)) except Exception as e: logger.warning(f"{self.get('uuid')} - Unable to compress snapshot, saving as raw data to {filepath}") logger.warning(e) f.write(contents) self._prune_last_fetched_html_snapshots() def get_fetched_html(self, timestamp): import brotli snapshot_fname = f"{timestamp}.html.br" filepath = os.path.join(self.watch_data_dir, snapshot_fname) if os.path.isfile(filepath): with open(filepath, 'rb') as f: return (brotli.decompress(f.read()).decode('utf-8')) return False def _prune_last_fetched_html_snapshots(self): dates = list(self.history.keys()) dates.reverse() for index, timestamp in enumerate(dates): snapshot_fname = f"{timestamp}.html.br" filepath = os.path.join(self.watch_data_dir, snapshot_fname) # Keep only the first 2 if index > 1 and os.path.isfile(filepath): os.remove(filepath) @property def get_browsersteps_available_screenshots(self): "For knowing which screenshots are available to show the user in BrowserSteps UI" available = [] for f in Path(self.watch_data_dir).glob('step_before-*.jpeg'): step_n=re.search(r'step_before-(\d+)', f.name) if step_n: available.append(step_n.group(1)) return available def compile_error_texts(self, has_proxies=None): """Compile error texts for this watch. Accepts has_proxies parameter to ensure it works even outside app context""" from flask import url_for from markupsafe import Markup output = [] # Initialize as list since we're using append last_error = self.get('last_error','') try: url_for('settings.settings_page') except Exception as e: has_app_context = False else: has_app_context = True # has app+request context, we can use url_for() if has_app_context: if last_error: if '403' in last_error: if has_proxies: output.append(str(Markup(f"{last_error} - Try other proxies/location '"))) else: output.append(str(Markup(f"{last_error} - Try adding external proxies/locations '"))) else: output.append(str(Markup(last_error))) if self.get('last_notification_error'): output.append(str(Markup(f"
"))) else: # Lo_Fi version - no app context, cant rely on Jinja2 Markup if last_error: output.append(safe_jinja.render_fully_escaped(last_error)) if self.get('last_notification_error'): output.append(safe_jinja.render_fully_escaped(self.get('last_notification_error'))) res = "\n".join(output) return res