kopia lustrzana https://github.com/dgtlmoon/changedetection.io
Merge 6a5e6c7c8b
into 58e2a41c95
commit
0c6b145e94
|
@ -71,6 +71,7 @@ jobs:
|
|||
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_watch_model'
|
||||
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_jinja2_security'
|
||||
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_semver'
|
||||
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_update_watch_deep_merge'
|
||||
|
||||
- name: Test built container with Pytest (generally as requests/plaintext fetching)
|
||||
run: |
|
||||
|
|
|
@ -303,7 +303,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
|
|||
watch['ignore_text'] += datastore.data['settings']['application']['global_ignore_text']
|
||||
watch['subtractive_selectors'] += datastore.data['settings']['application']['global_subtractive_selectors']
|
||||
|
||||
watch_json = json.dumps(watch)
|
||||
watch_json = json.dumps(dict(watch))
|
||||
|
||||
try:
|
||||
r = requests.request(method="POST",
|
||||
|
|
|
@ -44,14 +44,14 @@ class model(watch_base):
|
|||
self.__datastore_path = kw.get('datastore_path')
|
||||
if kw.get('datastore_path'):
|
||||
del kw['datastore_path']
|
||||
|
||||
|
||||
# Save default before passing to parent, since parent will delete it
|
||||
default_values = kw.get('default')
|
||||
|
||||
super(model, self).__init__(*arg, **kw)
|
||||
if kw.get('default'):
|
||||
self.update(kw['default'])
|
||||
del kw['default']
|
||||
|
||||
if self.get('default'):
|
||||
del self['default']
|
||||
|
||||
if default_values:
|
||||
self.update(default_values)
|
||||
|
||||
# Be sure the cached timestamp is ready
|
||||
bump = self.history
|
||||
|
@ -228,8 +228,8 @@ class model(watch_base):
|
|||
|
||||
@property
|
||||
def has_history(self):
|
||||
fname = os.path.join(self.watch_data_dir, "history.txt")
|
||||
return os.path.isfile(fname)
|
||||
fname = self._get_data_file_path("history.txt")
|
||||
return fname and os.path.isfile(fname)
|
||||
|
||||
@property
|
||||
def has_browser_steps(self):
|
||||
|
@ -406,16 +406,16 @@ class model(watch_base):
|
|||
return not local_lines.issubset(existing_history)
|
||||
|
||||
def get_screenshot(self):
|
||||
fname = os.path.join(self.watch_data_dir, "last-screenshot.png")
|
||||
if os.path.isfile(fname):
|
||||
fname = self._get_data_file_path("last-screenshot.png")
|
||||
if fname and os.path.isfile(fname):
|
||||
return fname
|
||||
|
||||
# False is not an option for AppRise, must be type None
|
||||
return None
|
||||
|
||||
def __get_file_ctime(self, filename):
|
||||
fname = os.path.join(self.watch_data_dir, filename)
|
||||
if os.path.isfile(fname):
|
||||
fname = self._get_data_file_path(filename)
|
||||
if fname and os.path.isfile(fname):
|
||||
return int(os.path.getmtime(fname))
|
||||
return False
|
||||
|
||||
|
@ -442,20 +442,28 @@ class model(watch_base):
|
|||
@property
|
||||
def watch_data_dir(self):
|
||||
# The base dir of the watch data
|
||||
return os.path.join(self.__datastore_path, self['uuid']) if self.__datastore_path else None
|
||||
if self.__datastore_path and self.get('uuid'):
|
||||
return os.path.join(self.__datastore_path, self['uuid'])
|
||||
return None
|
||||
|
||||
def _get_data_file_path(self, filename):
|
||||
"""Safely get the full path to a data file, returns None if watch_data_dir is None"""
|
||||
if self.watch_data_dir:
|
||||
return os.path.join(self.watch_data_dir, filename)
|
||||
return None
|
||||
|
||||
def get_error_text(self):
|
||||
"""Return the text saved from a previous request that resulted in a non-200 error"""
|
||||
fname = os.path.join(self.watch_data_dir, "last-error.txt")
|
||||
if os.path.isfile(fname):
|
||||
fname = self._get_data_file_path("last-error.txt")
|
||||
if fname and os.path.isfile(fname):
|
||||
with open(fname, 'r') as f:
|
||||
return f.read()
|
||||
return False
|
||||
|
||||
def get_error_snapshot(self):
|
||||
"""Return path to the screenshot that resulted in a non-200 error"""
|
||||
fname = os.path.join(self.watch_data_dir, "last-error-screenshot.png")
|
||||
if os.path.isfile(fname):
|
||||
fname = self._get_data_file_path("last-error-screenshot.png")
|
||||
if fname and os.path.isfile(fname):
|
||||
return fname
|
||||
return False
|
||||
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
import os
|
||||
import uuid
|
||||
import json
|
||||
|
||||
from changedetectionio import strtobool
|
||||
default_notification_format_for_watch = 'System default'
|
||||
|
||||
class watch_base(dict):
|
||||
class watch_base:
|
||||
|
||||
def __init__(self, *arg, **kw):
|
||||
self.update({
|
||||
self.__data = {
|
||||
# Custom notification content
|
||||
# Re #110, so then if this is set to None, we know to use the default value instead
|
||||
# Requires setting to None on submit if it's the same as the default
|
||||
|
@ -128,9 +129,78 @@ class watch_base(dict):
|
|||
'uuid': str(uuid.uuid4()),
|
||||
'webdriver_delay': None,
|
||||
'webdriver_js_execute_code': None, # Run before change-detection
|
||||
})
|
||||
}
|
||||
|
||||
if len(arg) == 1 and (isinstance(arg[0], dict) or hasattr(arg[0], 'keys')):
|
||||
self.__data.update(arg[0])
|
||||
if kw:
|
||||
self.__data.update(kw)
|
||||
|
||||
super(watch_base, self).__init__(*arg, **kw)
|
||||
if self.__data.get('default'):
|
||||
del self.__data['default']
|
||||
|
||||
if self.get('default'):
|
||||
del self['default']
|
||||
def __getitem__(self, key):
|
||||
return self.__data[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.__data[key] = value
|
||||
|
||||
def __delitem__(self, key):
|
||||
del self.__data[key]
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.__data)
|
||||
|
||||
def __len__(self):
|
||||
return len(self.__data)
|
||||
|
||||
def __contains__(self, key):
|
||||
return key in self.__data
|
||||
|
||||
def __repr__(self):
|
||||
return repr(self.__data)
|
||||
|
||||
def __str__(self):
|
||||
return str(self.__data)
|
||||
|
||||
def keys(self):
|
||||
return self.__data.keys()
|
||||
|
||||
def values(self):
|
||||
return self.__data.values()
|
||||
|
||||
def items(self):
|
||||
return self.__data.items()
|
||||
|
||||
def get(self, key, default=None):
|
||||
return self.__data.get(key, default)
|
||||
|
||||
def pop(self, key, *args):
|
||||
return self.__data.pop(key, *args)
|
||||
|
||||
def popitem(self):
|
||||
return self.__data.popitem()
|
||||
|
||||
def clear(self):
|
||||
self.__data.clear()
|
||||
|
||||
def update(self, *args, **kwargs):
|
||||
self.__data.update(*args, **kwargs)
|
||||
|
||||
def setdefault(self, key, default=None):
|
||||
return self.__data.setdefault(key, default)
|
||||
|
||||
def copy(self):
|
||||
return self.__data.copy()
|
||||
|
||||
def __deepcopy__(self, memo):
|
||||
from copy import deepcopy
|
||||
new_instance = self.__class__()
|
||||
new_instance.__data = deepcopy(self.__data, memo)
|
||||
return new_instance
|
||||
|
||||
def __reduce__(self):
|
||||
return (self.__class__, (self.__data,))
|
||||
|
||||
def to_dict(self):
|
||||
return dict(self.__data)
|
|
@ -42,10 +42,10 @@ class Restock(dict):
|
|||
|
||||
# Update with any provided positional arguments (dictionaries)
|
||||
if args:
|
||||
if len(args) == 1 and isinstance(args[0], dict):
|
||||
if len(args) == 1 and (isinstance(args[0], dict) or hasattr(args[0], 'keys')):
|
||||
self.update(args[0])
|
||||
else:
|
||||
raise ValueError("Only one positional argument of type 'dict' is allowed")
|
||||
raise ValueError("Only one positional argument of type 'dict' or dict-like is allowed")
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
# Custom logic to handle setting price and original_price
|
||||
|
|
|
@ -23,6 +23,13 @@ from blinker import signal
|
|||
from .processors import get_custom_watch_obj_for_processor
|
||||
from .processors.restock_diff import Restock
|
||||
|
||||
class WatchEncoder(json.JSONEncoder):
|
||||
def default(self, obj):
|
||||
from .model import watch_base
|
||||
if isinstance(obj, watch_base):
|
||||
return dict(obj)
|
||||
return super().default(obj)
|
||||
|
||||
# Because the server will run as a daemon and wont know the URL for notification links when firing off a notification
|
||||
BASE_URL_NOT_SET_TEXT = '("Base URL" not set - see settings - notifications)'
|
||||
|
||||
|
@ -51,9 +58,6 @@ class ChangeDetectionStore:
|
|||
self.needs_write = False
|
||||
self.start_time = time.time()
|
||||
self.stop_thread = False
|
||||
# Base definition for all watchers
|
||||
# deepcopy part of #569 - not sure why its needed exactly
|
||||
self.generic_definition = deepcopy(Watch.model(datastore_path = datastore_path, default={}))
|
||||
|
||||
if path.isfile('changedetectionio/source.txt'):
|
||||
with open('changedetectionio/source.txt') as f:
|
||||
|
@ -174,6 +178,14 @@ class ChangeDetectionStore:
|
|||
self.__data['settings']['application']['password'] = False
|
||||
self.needs_write = True
|
||||
|
||||
def _deep_merge(self, target, source):
|
||||
"""Recursively merge source dict into target dict"""
|
||||
for key, value in source.items():
|
||||
if key in target and isinstance(target[key], dict) and isinstance(value, dict):
|
||||
self._deep_merge(target[key], value)
|
||||
else:
|
||||
target[key] = value
|
||||
|
||||
def update_watch(self, uuid, update_obj):
|
||||
|
||||
# It's possible that the watch could be deleted before update
|
||||
|
@ -181,15 +193,8 @@ class ChangeDetectionStore:
|
|||
return
|
||||
|
||||
with self.lock:
|
||||
|
||||
# In python 3.9 we have the |= dict operator, but that still will lose data on nested structures...
|
||||
for dict_key, d in self.generic_definition.items():
|
||||
if isinstance(d, dict):
|
||||
if update_obj is not None and dict_key in update_obj:
|
||||
self.__data['watching'][uuid][dict_key].update(update_obj[dict_key])
|
||||
del (update_obj[dict_key])
|
||||
|
||||
self.__data['watching'][uuid].update(update_obj)
|
||||
# Use recursive merge to handle nested dictionaries properly
|
||||
self._deep_merge(self.__data['watching'][uuid], update_obj)
|
||||
self.needs_write = True
|
||||
|
||||
@property
|
||||
|
@ -393,6 +398,51 @@ class ChangeDetectionStore:
|
|||
|
||||
return False
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path # just for nicer paths
|
||||
|
||||
JSON_INDENT = 2 # or None in production
|
||||
ENCODER = WatchEncoder # your custom encoder
|
||||
|
||||
def save_json_atomic(self, save_path: str | os.PathLike, data) -> None:
|
||||
"""
|
||||
Atomically (re)write *path* with *data* encoded as JSON.
|
||||
The original file is left untouched if anything fails.
|
||||
"""
|
||||
import tempfile
|
||||
from pathlib import Path # just for nicer paths
|
||||
|
||||
JSON_INDENT = 2 # or None in production
|
||||
ENCODER = WatchEncoder # your custom encoder
|
||||
|
||||
datapath = Path(save_path)
|
||||
directory = datapath.parent
|
||||
|
||||
# 1. create a unique temp file in the same directory
|
||||
fd, tmp_name = tempfile.mkstemp(
|
||||
dir=directory,
|
||||
prefix=f"{datapath.name}.",
|
||||
suffix=".tmp",
|
||||
)
|
||||
try:
|
||||
with os.fdopen(fd, "w", encoding="utf-8") as tmp:
|
||||
json.dump(data, tmp, indent=JSON_INDENT, cls=ENCODER)
|
||||
if os.getenv('JSON_SAVE_FORCE_FLUSH'):
|
||||
tmp.flush() # push Python buffers
|
||||
os.fsync(tmp.fileno()) # force kernel to write to disk
|
||||
os.replace(tmp_name, datapath)
|
||||
|
||||
except Exception as e:
|
||||
logger.critical(f"Failed to write JSON to {datapath} - {str(e)}")
|
||||
# if anything above blew up, ensure we don't leave junk lying around
|
||||
try:
|
||||
os.unlink(tmp_name)
|
||||
finally:
|
||||
raise
|
||||
|
||||
|
||||
def sync_to_json(self):
|
||||
logger.info("Saving JSON..")
|
||||
try:
|
||||
|
@ -404,18 +454,7 @@ class ChangeDetectionStore:
|
|||
self.sync_to_json()
|
||||
return
|
||||
else:
|
||||
|
||||
try:
|
||||
# Re #286 - First write to a temp file, then confirm it looks OK and rename it
|
||||
# This is a fairly basic strategy to deal with the case that the file is corrupted,
|
||||
# system was out of memory, out of RAM etc
|
||||
with open(self.json_store_path+".tmp", 'w') as json_file:
|
||||
# Use compact JSON in production for better performance
|
||||
json.dump(data, json_file, indent=2)
|
||||
os.replace(self.json_store_path+".tmp", self.json_store_path)
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}")
|
||||
|
||||
self.save_json_atomic(save_path = self.json_store_path, data =data)
|
||||
self.needs_write = False
|
||||
self.needs_write_urgent = False
|
||||
|
||||
|
|
|
@ -0,0 +1,291 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
# run from dir above changedetectionio/ dir
|
||||
# python3 -m unittest changedetectionio.tests.unit.test_update_watch_deep_merge
|
||||
|
||||
import unittest
|
||||
import os
|
||||
import tempfile
|
||||
import shutil
|
||||
from unittest.mock import patch
|
||||
|
||||
from changedetectionio import store
|
||||
|
||||
|
||||
class TestUpdateWatchDeepMerge(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# Create a temporary directory for test data
|
||||
self.test_datastore_path = tempfile.mkdtemp()
|
||||
self.datastore = store.ChangeDetectionStore(datastore_path=self.test_datastore_path, include_default_watches=False)
|
||||
|
||||
# Create a test watch with known nested structure
|
||||
self.test_uuid = self.datastore.add_watch(url='http://example.com')
|
||||
|
||||
# Set up known initial nested structure
|
||||
initial_data = {
|
||||
'time_between_check': {'weeks': None, 'days': 1, 'hours': 6, 'minutes': 30, 'seconds': None},
|
||||
'headers': {'user-agent': 'test-browser', 'accept': 'text/html'},
|
||||
'time_schedule_limit': {
|
||||
'enabled': True,
|
||||
'monday': {
|
||||
'enabled': True,
|
||||
'start_time': '09:00',
|
||||
'duration': {'hours': '8', 'minutes': '00'}
|
||||
},
|
||||
'tuesday': {
|
||||
'enabled': False,
|
||||
'start_time': '10:00',
|
||||
'duration': {'hours': '6', 'minutes': '30'}
|
||||
}
|
||||
}
|
||||
}
|
||||
self.datastore.update_watch(self.test_uuid, initial_data)
|
||||
|
||||
def tearDown(self):
|
||||
self.datastore.stop_thread = True
|
||||
# Clean up the temporary directory
|
||||
shutil.rmtree(self.test_datastore_path, ignore_errors=True)
|
||||
|
||||
def test_simple_flat_update(self):
|
||||
"""Test that simple flat updates work as before"""
|
||||
update_obj = {'url': 'http://newexample.com', 'paused': True}
|
||||
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||
|
||||
watch = self.datastore.data['watching'][self.test_uuid]
|
||||
self.assertEqual(watch['url'], 'http://newexample.com')
|
||||
self.assertEqual(watch['paused'], True)
|
||||
|
||||
def test_time_between_check_partial_update(self):
|
||||
"""Test partial update of time_between_check preserves existing keys"""
|
||||
# Update only hours, should preserve other existing values
|
||||
update_obj = {'time_between_check': {'hours': 2}}
|
||||
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||
|
||||
watch = self.datastore.data['watching'][self.test_uuid]
|
||||
time_check = watch['time_between_check']
|
||||
|
||||
# Updated value
|
||||
self.assertEqual(time_check['hours'], 2)
|
||||
# Preserved existing values
|
||||
self.assertEqual(time_check['days'], 1)
|
||||
self.assertEqual(time_check['minutes'], 30)
|
||||
self.assertEqual(time_check['weeks'], None)
|
||||
self.assertEqual(time_check['seconds'], None)
|
||||
|
||||
def test_time_between_check_multiple_partial_updates(self):
|
||||
"""Test multiple partial updates to time_between_check"""
|
||||
# First update
|
||||
update_obj1 = {'time_between_check': {'minutes': 45}}
|
||||
self.datastore.update_watch(self.test_uuid, update_obj1)
|
||||
|
||||
# Second update
|
||||
update_obj2 = {'time_between_check': {'seconds': 15}}
|
||||
self.datastore.update_watch(self.test_uuid, update_obj2)
|
||||
|
||||
watch = self.datastore.data['watching'][self.test_uuid]
|
||||
time_check = watch['time_between_check']
|
||||
|
||||
# Both updates should be preserved
|
||||
self.assertEqual(time_check['minutes'], 45)
|
||||
self.assertEqual(time_check['seconds'], 15)
|
||||
# Original values should be preserved
|
||||
self.assertEqual(time_check['days'], 1)
|
||||
self.assertEqual(time_check['hours'], 6)
|
||||
|
||||
def test_headers_partial_update(self):
|
||||
"""Test partial update of headers preserves existing headers"""
|
||||
update_obj = {'headers': {'authorization': 'Bearer token123'}}
|
||||
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||
|
||||
watch = self.datastore.data['watching'][self.test_uuid]
|
||||
headers = watch['headers']
|
||||
|
||||
# New header added
|
||||
self.assertEqual(headers['authorization'], 'Bearer token123')
|
||||
# Existing headers preserved
|
||||
self.assertEqual(headers['user-agent'], 'test-browser')
|
||||
self.assertEqual(headers['accept'], 'text/html')
|
||||
|
||||
def test_headers_update_existing_key(self):
|
||||
"""Test updating an existing header key"""
|
||||
update_obj = {'headers': {'user-agent': 'new-browser'}}
|
||||
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||
|
||||
watch = self.datastore.data['watching'][self.test_uuid]
|
||||
headers = watch['headers']
|
||||
|
||||
# Updated existing header
|
||||
self.assertEqual(headers['user-agent'], 'new-browser')
|
||||
# Other headers preserved
|
||||
self.assertEqual(headers['accept'], 'text/html')
|
||||
|
||||
def test_time_schedule_limit_deep_nested_update(self):
|
||||
"""Test deep nested update of time_schedule_limit structure"""
|
||||
update_obj = {
|
||||
'time_schedule_limit': {
|
||||
'monday': {
|
||||
'duration': {'hours': '10'} # Only update hours, preserve minutes
|
||||
}
|
||||
}
|
||||
}
|
||||
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||
|
||||
watch = self.datastore.data['watching'][self.test_uuid]
|
||||
schedule = watch['time_schedule_limit']
|
||||
|
||||
# Deep nested update applied
|
||||
self.assertEqual(schedule['monday']['duration']['hours'], '10')
|
||||
# Existing nested values preserved
|
||||
self.assertEqual(schedule['monday']['duration']['minutes'], '00')
|
||||
self.assertEqual(schedule['monday']['start_time'], '09:00')
|
||||
self.assertEqual(schedule['monday']['enabled'], True)
|
||||
# Other days preserved
|
||||
self.assertEqual(schedule['tuesday']['enabled'], False)
|
||||
self.assertEqual(schedule['enabled'], True)
|
||||
|
||||
def test_mixed_flat_and_nested_update(self):
|
||||
"""Test update with both flat and nested properties"""
|
||||
update_obj = {
|
||||
'url': 'http://mixed-update.com',
|
||||
'paused': False,
|
||||
'time_between_check': {'days': 2, 'minutes': 15},
|
||||
'headers': {'cookie': 'session=abc123'}
|
||||
}
|
||||
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||
|
||||
watch = self.datastore.data['watching'][self.test_uuid]
|
||||
|
||||
# Flat updates
|
||||
self.assertEqual(watch['url'], 'http://mixed-update.com')
|
||||
self.assertEqual(watch['paused'], False)
|
||||
|
||||
# Nested updates
|
||||
time_check = watch['time_between_check']
|
||||
self.assertEqual(time_check['days'], 2)
|
||||
self.assertEqual(time_check['minutes'], 15)
|
||||
self.assertEqual(time_check['hours'], 6) # preserved
|
||||
|
||||
headers = watch['headers']
|
||||
self.assertEqual(headers['cookie'], 'session=abc123')
|
||||
self.assertEqual(headers['user-agent'], 'test-browser') # preserved
|
||||
|
||||
def test_overwrite_nested_with_flat(self):
|
||||
"""Test that providing a non-dict value overwrites the entire nested structure"""
|
||||
update_obj = {'time_between_check': 'invalid_value'}
|
||||
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||
|
||||
watch = self.datastore.data['watching'][self.test_uuid]
|
||||
# Should completely replace the nested dict with the string
|
||||
self.assertEqual(watch['time_between_check'], 'invalid_value')
|
||||
|
||||
def test_add_new_nested_structure(self):
|
||||
"""Test adding a completely new nested dictionary"""
|
||||
update_obj = {
|
||||
'custom_config': {
|
||||
'option1': 'value1',
|
||||
'nested': {
|
||||
'suboption': 'subvalue'
|
||||
}
|
||||
}
|
||||
}
|
||||
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||
|
||||
watch = self.datastore.data['watching'][self.test_uuid]
|
||||
self.assertEqual(watch['custom_config']['option1'], 'value1')
|
||||
self.assertEqual(watch['custom_config']['nested']['suboption'], 'subvalue')
|
||||
|
||||
def test_empty_dict_update(self):
|
||||
"""Test updating with empty dictionaries"""
|
||||
update_obj = {'headers': {}}
|
||||
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||
|
||||
watch = self.datastore.data['watching'][self.test_uuid]
|
||||
# Empty dict should preserve existing headers (no keys to merge)
|
||||
self.assertEqual(watch['headers']['user-agent'], 'test-browser')
|
||||
self.assertEqual(watch['headers']['accept'], 'text/html')
|
||||
|
||||
def test_none_values_in_nested_update(self):
|
||||
"""Test handling None values in nested updates"""
|
||||
update_obj = {
|
||||
'time_between_check': {
|
||||
'hours': None,
|
||||
'days': 3
|
||||
}
|
||||
}
|
||||
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||
|
||||
watch = self.datastore.data['watching'][self.test_uuid]
|
||||
time_check = watch['time_between_check']
|
||||
|
||||
self.assertEqual(time_check['hours'], None)
|
||||
self.assertEqual(time_check['days'], 3)
|
||||
self.assertEqual(time_check['minutes'], 30) # preserved
|
||||
|
||||
def test_real_world_api_update_scenario(self):
|
||||
"""Test a real-world API update scenario from the codebase analysis"""
|
||||
# Based on actual API call patterns found in the codebase
|
||||
update_obj = {
|
||||
"title": "Updated API Watch",
|
||||
'time_between_check': {'minutes': 60},
|
||||
'headers': {'authorization': 'Bearer api-token', 'user-agent': 'api-client'},
|
||||
'notification_urls': ['https://webhook.example.com']
|
||||
}
|
||||
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||
|
||||
watch = self.datastore.data['watching'][self.test_uuid]
|
||||
|
||||
# Verify all updates
|
||||
self.assertEqual(watch['title'], 'Updated API Watch')
|
||||
self.assertEqual(watch['time_between_check']['minutes'], 60)
|
||||
self.assertEqual(watch['time_between_check']['days'], 1) # preserved
|
||||
self.assertEqual(watch['headers']['authorization'], 'Bearer api-token')
|
||||
self.assertEqual(watch['headers']['user-agent'], 'api-client') # overwrote existing
|
||||
self.assertEqual(watch['headers']['accept'], 'text/html') # preserved
|
||||
self.assertEqual(watch['notification_urls'], ['https://webhook.example.com'])
|
||||
|
||||
def test_watch_not_found(self):
|
||||
"""Test update_watch with non-existent UUID"""
|
||||
# Should not raise an error, just return silently
|
||||
fake_uuid = 'non-existent-uuid'
|
||||
update_obj = {'url': 'http://should-not-update.com'}
|
||||
|
||||
# Should not raise an exception
|
||||
self.datastore.update_watch(fake_uuid, update_obj)
|
||||
|
||||
# Verify no changes were made to existing watch
|
||||
watch = self.datastore.data['watching'][self.test_uuid]
|
||||
self.assertNotEqual(watch['url'], 'http://should-not-update.com')
|
||||
|
||||
def test_processor_style_update(self):
|
||||
"""Test the type of updates made by processors during check operations"""
|
||||
# Based on async_update_worker.py patterns
|
||||
update_obj = {
|
||||
'last_notification_error': False,
|
||||
'last_error': False,
|
||||
'previous_md5': 'abc123def456',
|
||||
'content-type': 'application/json',
|
||||
'consecutive_filter_failures': 0,
|
||||
'fetch_time': 1.234,
|
||||
'check_count': 42
|
||||
}
|
||||
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||
|
||||
watch = self.datastore.data['watching'][self.test_uuid]
|
||||
|
||||
# Verify processor updates
|
||||
self.assertEqual(watch['last_notification_error'], False)
|
||||
self.assertEqual(watch['last_error'], False)
|
||||
self.assertEqual(watch['previous_md5'], 'abc123def456')
|
||||
self.assertEqual(watch['content-type'], 'application/json')
|
||||
self.assertEqual(watch['consecutive_filter_failures'], 0)
|
||||
self.assertEqual(watch['fetch_time'], 1.234)
|
||||
self.assertEqual(watch['check_count'], 42)
|
||||
|
||||
# Verify nested structures weren't affected
|
||||
self.assertEqual(watch['time_between_check']['days'], 1)
|
||||
self.assertEqual(watch['headers']['user-agent'], 'test-browser')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Ładowanie…
Reference in New Issue