kopia lustrzana https://github.com/dgtlmoon/changedetection.io
When new ignore text is specified, reprocess the checksum
rodzic
8bc7b5be40
commit
f1da8f96b6
|
@ -190,7 +190,7 @@ def changedetection_app(config=None, datastore_o=None):
|
||||||
ignore_text.append(text)
|
ignore_text.append(text)
|
||||||
|
|
||||||
# Reset the previous_md5 so we process a new snapshot including stripping ignore text.
|
# Reset the previous_md5 so we process a new snapshot including stripping ignore text.
|
||||||
update_obj['previous_md5'] = ""
|
update_obj['previous_md5'] = "reprocess previous"
|
||||||
|
|
||||||
update_obj['ignore_text'] = ignore_text
|
update_obj['ignore_text'] = ignore_text
|
||||||
|
|
||||||
|
|
|
@ -12,13 +12,20 @@ class perform_site_check():
|
||||||
self.datastore = datastore
|
self.datastore = datastore
|
||||||
|
|
||||||
def strip_ignore_text(self, content, list_ignore_text):
|
def strip_ignore_text(self, content, list_ignore_text):
|
||||||
|
ignore = []
|
||||||
|
for k in list_ignore_text:
|
||||||
|
ignore.append(k.encode('utf8'))
|
||||||
|
|
||||||
output=[]
|
output = []
|
||||||
for line in content.splitlines():
|
for line in content.splitlines():
|
||||||
if not any(skip_text in line for skip_text in list_ignore_text):
|
line = line.encode('utf8')
|
||||||
output.append(line)
|
|
||||||
|
|
||||||
return "\n".join(output)
|
# Always ignore blank lines in this mode. (when this function gets called)
|
||||||
|
if len(line.strip()):
|
||||||
|
if not any(skip_text in line for skip_text in ignore):
|
||||||
|
output.append(line)
|
||||||
|
|
||||||
|
return "\n".encode('utf8').join(output)
|
||||||
|
|
||||||
def run(self, uuid):
|
def run(self, uuid):
|
||||||
timestamp = int(time.time()) # used for storage etc too
|
timestamp = int(time.time()) # used for storage etc too
|
||||||
|
@ -85,16 +92,32 @@ class perform_site_check():
|
||||||
if not len(r.text):
|
if not len(r.text):
|
||||||
update_obj["last_error"] = "Empty reply"
|
update_obj["last_error"] = "Empty reply"
|
||||||
|
|
||||||
content = stripped_text_from_html.encode('utf-8')
|
|
||||||
|
|
||||||
# If there's text to skip
|
# If there's text to skip
|
||||||
# @todo we could abstract out the get_text() to handle this cleaner
|
# @todo we could abstract out the get_text() to handle this cleaner
|
||||||
if len(self.datastore.data['watching'][uuid]['ignore_text']):
|
if len(self.datastore.data['watching'][uuid]['ignore_text']):
|
||||||
content = self.strip_ignore_text(content, self.datastore.data['watching'][uuid]['ignore_text'])
|
content = self.strip_ignore_text(stripped_text_from_html,
|
||||||
|
self.datastore.data['watching'][uuid]['ignore_text'])
|
||||||
|
else:
|
||||||
|
content = stripped_text_from_html
|
||||||
|
|
||||||
fetched_md5 = hashlib.md5(content).hexdigest()
|
fetched_md5 = hashlib.md5(content).hexdigest()
|
||||||
|
|
||||||
|
# If they edited an existing watch, we need to know to reset the current/previous md5 to include
|
||||||
|
# the excluded text.
|
||||||
|
|
||||||
|
if self.datastore.data['watching'][uuid]['previous_md5'] == "reprocess previous":
|
||||||
|
# Get the most recent one
|
||||||
|
newest_history_key = self.datastore.get_newest_history_key(uuid)
|
||||||
|
if newest_history_key:
|
||||||
|
with open(self.datastore.data['watching'][uuid]['history'][newest_history_key],
|
||||||
|
encoding='utf-8') as file:
|
||||||
|
raw_content = file.read()
|
||||||
|
|
||||||
|
stripped_content = self.strip_ignore_text(raw_content,
|
||||||
|
self.datastore.data['watching'][uuid]['ignore_text'])
|
||||||
|
|
||||||
|
checksum = hashlib.md5(stripped_content).hexdigest()
|
||||||
|
self.datastore.data['watching'][uuid]['previous_md5'] = checksum
|
||||||
|
|
||||||
# could be None or False depending on JSON type
|
# could be None or False depending on JSON type
|
||||||
if self.datastore.data['watching'][uuid]['previous_md5'] != fetched_md5:
|
if self.datastore.data['watching'][uuid]['previous_md5'] != fetched_md5:
|
||||||
|
|
|
@ -64,6 +64,7 @@ class ChangeDetectionStore:
|
||||||
self.__data['build_sha'] = f.read()
|
self.__data['build_sha'] = f.read()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# @todo retest with ", encoding='utf-8'"
|
||||||
with open(self.json_store_path) as json_file:
|
with open(self.json_store_path) as json_file:
|
||||||
from_disk = json.load(json_file)
|
from_disk = json.load(json_file)
|
||||||
|
|
||||||
|
@ -139,7 +140,6 @@ class ChangeDetectionStore:
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def data(self):
|
def data(self):
|
||||||
|
|
||||||
return self.__data
|
return self.__data
|
||||||
|
|
||||||
def get_all_tags(self):
|
def get_all_tags(self):
|
||||||
|
@ -161,6 +161,7 @@ class ChangeDetectionStore:
|
||||||
self.__data['watching'] = {}
|
self.__data['watching'] = {}
|
||||||
else:
|
else:
|
||||||
del (self.__data['watching'][uuid])
|
del (self.__data['watching'][uuid])
|
||||||
|
|
||||||
self.needs_write = True
|
self.needs_write = True
|
||||||
|
|
||||||
def url_exists(self, url):
|
def url_exists(self, url):
|
||||||
|
|
|
@ -6,7 +6,20 @@ from urllib.request import urlopen
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
def test_setup_liveserver(live_server):
|
||||||
|
@live_server.app.route('/test-endpoint')
|
||||||
|
def test_endpoint():
|
||||||
|
# Tried using a global var here but didn't seem to work, so reading from a file instead.
|
||||||
|
with open("test-datastore/output.txt", "r") as f:
|
||||||
|
return f.read()
|
||||||
|
|
||||||
|
live_server.start()
|
||||||
|
|
||||||
|
assert 1 == 1
|
||||||
|
|
||||||
|
|
||||||
# Unit test of the stripper
|
# Unit test of the stripper
|
||||||
|
# Always we are dealing in utf-8
|
||||||
def test_strip_text_func():
|
def test_strip_text_func():
|
||||||
from backend import fetch_site_status
|
from backend import fetch_site_status
|
||||||
|
|
||||||
|
@ -18,19 +31,13 @@ def test_strip_text_func():
|
||||||
|
|
||||||
but not always."""
|
but not always."""
|
||||||
|
|
||||||
original_length = len(test_content.splitlines())
|
|
||||||
|
|
||||||
fetcher = fetch_site_status.perform_site_check(datastore=False)
|
|
||||||
|
|
||||||
ignore_lines = ["sometimes"]
|
ignore_lines = ["sometimes"]
|
||||||
|
|
||||||
|
fetcher = fetch_site_status.perform_site_check(datastore=False)
|
||||||
stripped_content = fetcher.strip_ignore_text(test_content, ignore_lines)
|
stripped_content = fetcher.strip_ignore_text(test_content, ignore_lines)
|
||||||
|
|
||||||
# Should be one line shorter
|
assert b"sometimes" not in stripped_content
|
||||||
assert len(stripped_content.splitlines()) == original_length - 1
|
assert b"Some content" in stripped_content
|
||||||
|
|
||||||
assert "sometimes" not in stripped_content
|
|
||||||
assert "Some content" in stripped_content
|
|
||||||
|
|
||||||
|
|
||||||
def set_original_ignore_response():
|
def set_original_ignore_response():
|
||||||
|
@ -49,6 +56,22 @@ def set_original_ignore_response():
|
||||||
f.write(test_return_data)
|
f.write(test_return_data)
|
||||||
|
|
||||||
|
|
||||||
|
def set_modified_original_ignore_response():
|
||||||
|
test_return_data = """<html>
|
||||||
|
<body>
|
||||||
|
Some NEW nice initial text</br>
|
||||||
|
<p>Which is across multiple lines</p>
|
||||||
|
</br>
|
||||||
|
So let's see what happens. </br>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
with open("test-datastore/output.txt", "w") as f:
|
||||||
|
f.write(test_return_data)
|
||||||
|
|
||||||
|
|
||||||
# Is the same but includes ZZZZZ, 'ZZZZZ' is the last line in ignore_text
|
# Is the same but includes ZZZZZ, 'ZZZZZ' is the last line in ignore_text
|
||||||
def set_modified_ignore_response():
|
def set_modified_ignore_response():
|
||||||
test_return_data = """<html>
|
test_return_data = """<html>
|
||||||
|
@ -68,7 +91,7 @@ def set_modified_ignore_response():
|
||||||
|
|
||||||
|
|
||||||
def test_check_ignore_text_functionality(client, live_server):
|
def test_check_ignore_text_functionality(client, live_server):
|
||||||
sleep_time_for_fetch_thread = 5
|
sleep_time_for_fetch_thread = 3
|
||||||
|
|
||||||
ignore_text = "XXXXX\nYYYYY\nZZZZZ"
|
ignore_text = "XXXXX\nYYYYY\nZZZZZ"
|
||||||
set_original_ignore_response()
|
set_original_ignore_response()
|
||||||
|
@ -111,11 +134,11 @@ def test_check_ignore_text_functionality(client, live_server):
|
||||||
assert b'unviewed' not in res.data
|
assert b'unviewed' not in res.data
|
||||||
assert b'/test-endpoint' in res.data
|
assert b'/test-endpoint' in res.data
|
||||||
|
|
||||||
|
# Make a change
|
||||||
set_modified_ignore_response()
|
set_modified_ignore_response()
|
||||||
|
|
||||||
# Trigger a check
|
# Trigger a check
|
||||||
client.get(url_for("api_watch_checknow"), follow_redirects=True)
|
client.get(url_for("api_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
time.sleep(sleep_time_for_fetch_thread)
|
time.sleep(sleep_time_for_fetch_thread)
|
||||||
|
|
||||||
|
@ -124,5 +147,12 @@ def test_check_ignore_text_functionality(client, live_server):
|
||||||
assert b'unviewed' not in res.data
|
assert b'unviewed' not in res.data
|
||||||
assert b'/test-endpoint' in res.data
|
assert b'/test-endpoint' in res.data
|
||||||
|
|
||||||
|
# Just to be sure.. set a regular modified change..
|
||||||
|
set_modified_original_ignore_response()
|
||||||
|
client.get(url_for("api_watch_checknow"), follow_redirects=True)
|
||||||
|
time.sleep(sleep_time_for_fetch_thread)
|
||||||
|
res = client.get(url_for("index"))
|
||||||
|
assert b'unviewed' in res.data
|
||||||
|
|
||||||
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
|
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
|
||||||
assert b'Deleted' in res.data
|
assert b'Deleted' in res.data
|
||||||
|
|
Ładowanie…
Reference in New Issue