kopia lustrzana https://github.com/dgtlmoon/changedetection.io
Co-authored-by: Ara Hayrabedian <ara.hayrabedian@gmail.com>pull/487/head
rodzic
5483f5d694
commit
942c3f021f
|
@ -508,13 +508,13 @@ def changedetection_app(config=None, datastore_o=None):
|
||||||
'headers': form.headers.data,
|
'headers': form.headers.data,
|
||||||
'body': form.body.data,
|
'body': form.body.data,
|
||||||
'method': form.method.data,
|
'method': form.method.data,
|
||||||
|
'ignore_status_codes': form.ignore_status_codes.data,
|
||||||
'fetch_backend': form.fetch_backend.data,
|
'fetch_backend': form.fetch_backend.data,
|
||||||
'trigger_text': form.trigger_text.data,
|
'trigger_text': form.trigger_text.data,
|
||||||
'notification_title': form.notification_title.data,
|
'notification_title': form.notification_title.data,
|
||||||
'notification_body': form.notification_body.data,
|
'notification_body': form.notification_body.data,
|
||||||
'notification_format': form.notification_format.data,
|
'notification_format': form.notification_format.data,
|
||||||
'extract_title_as_title': form.extract_title_as_title.data
|
'extract_title_as_title': form.extract_title_as_title.data,
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# Notification URLs
|
# Notification URLs
|
||||||
|
|
|
@ -32,7 +32,13 @@ class Fetcher():
|
||||||
return self.error
|
return self.error
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def run(self, url, timeout, request_headers, request_body, request_method):
|
def run(self,
|
||||||
|
url,
|
||||||
|
timeout,
|
||||||
|
request_headers,
|
||||||
|
request_body,
|
||||||
|
request_method,
|
||||||
|
ignore_status_codes=False):
|
||||||
# Should set self.error, self.status_code and self.content
|
# Should set self.error, self.status_code and self.content
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -99,7 +105,13 @@ class html_webdriver(Fetcher):
|
||||||
if proxy_args:
|
if proxy_args:
|
||||||
self.proxy = SeleniumProxy(raw=proxy_args)
|
self.proxy = SeleniumProxy(raw=proxy_args)
|
||||||
|
|
||||||
def run(self, url, timeout, request_headers, request_body, request_method):
|
def run(self,
|
||||||
|
url,
|
||||||
|
timeout,
|
||||||
|
request_headers,
|
||||||
|
request_body,
|
||||||
|
request_method,
|
||||||
|
ignore_status_codes=False):
|
||||||
|
|
||||||
# request_body, request_method unused for now, until some magic in the future happens.
|
# request_body, request_method unused for now, until some magic in the future happens.
|
||||||
|
|
||||||
|
@ -147,7 +159,13 @@ class html_webdriver(Fetcher):
|
||||||
class html_requests(Fetcher):
|
class html_requests(Fetcher):
|
||||||
fetcher_description = "Basic fast Plaintext/HTTP Client"
|
fetcher_description = "Basic fast Plaintext/HTTP Client"
|
||||||
|
|
||||||
def run(self, url, timeout, request_headers, request_body, request_method):
|
def run(self,
|
||||||
|
url,
|
||||||
|
timeout,
|
||||||
|
request_headers,
|
||||||
|
request_body,
|
||||||
|
request_method,
|
||||||
|
ignore_status_codes=False):
|
||||||
|
|
||||||
r = requests.request(method=request_method,
|
r = requests.request(method=request_method,
|
||||||
data=request_body,
|
data=request_body,
|
||||||
|
@ -167,7 +185,7 @@ class html_requests(Fetcher):
|
||||||
|
|
||||||
# @todo test this
|
# @todo test this
|
||||||
# @todo maybe you really want to test zero-byte return pages?
|
# @todo maybe you really want to test zero-byte return pages?
|
||||||
if not r or not r.content or not len(r.content):
|
if (not ignore_status_codes and not r) or not r.content or not len(r.content):
|
||||||
raise EmptyReply(url=url, status_code=r.status_code)
|
raise EmptyReply(url=url, status_code=r.status_code)
|
||||||
|
|
||||||
self.status_code = r.status_code
|
self.status_code = r.status_code
|
||||||
|
|
|
@ -53,6 +53,7 @@ class perform_site_check():
|
||||||
url = self.datastore.get_val(uuid, 'url')
|
url = self.datastore.get_val(uuid, 'url')
|
||||||
request_body = self.datastore.get_val(uuid, 'body')
|
request_body = self.datastore.get_val(uuid, 'body')
|
||||||
request_method = self.datastore.get_val(uuid, 'method')
|
request_method = self.datastore.get_val(uuid, 'method')
|
||||||
|
ignore_status_code = self.datastore.get_val(uuid, 'ignore_status_codes')
|
||||||
|
|
||||||
# Pluggable content fetcher
|
# Pluggable content fetcher
|
||||||
prefer_backend = watch['fetch_backend']
|
prefer_backend = watch['fetch_backend']
|
||||||
|
@ -64,7 +65,7 @@ class perform_site_check():
|
||||||
|
|
||||||
|
|
||||||
fetcher = klass()
|
fetcher = klass()
|
||||||
fetcher.run(url, timeout, request_headers, request_body, request_method)
|
fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_code)
|
||||||
# Fetching complete, now filters
|
# Fetching complete, now filters
|
||||||
# @todo move to class / maybe inside of fetcher abstract base?
|
# @todo move to class / maybe inside of fetcher abstract base?
|
||||||
|
|
||||||
|
|
|
@ -325,6 +325,7 @@ class watchForm(commonSettingsForm):
|
||||||
headers = StringDictKeyValue('Request Headers')
|
headers = StringDictKeyValue('Request Headers')
|
||||||
body = TextAreaField('Request Body', [validators.Optional()])
|
body = TextAreaField('Request Body', [validators.Optional()])
|
||||||
method = SelectField('Request Method', choices=valid_method, default=default_method)
|
method = SelectField('Request Method', choices=valid_method, default=default_method)
|
||||||
|
ignore_status_codes = BooleanField('Ignore Status Codes (process non-2xx status codes as normal)', default=False)
|
||||||
trigger_text = StringListField('Trigger/wait for text', [validators.Optional(), ValidateListRegex()])
|
trigger_text = StringListField('Trigger/wait for text', [validators.Optional(), ValidateListRegex()])
|
||||||
|
|
||||||
save_button = SubmitField('Save', render_kw={"class": "pure-button pure-button-primary"})
|
save_button = SubmitField('Save', render_kw={"class": "pure-button pure-button-primary"})
|
||||||
|
|
|
@ -81,6 +81,9 @@ User-Agent: wonderbra 1.0") }}
|
||||||
\"car\":null
|
\"car\":null
|
||||||
}") }}
|
}") }}
|
||||||
</div>
|
</div>
|
||||||
|
<div>
|
||||||
|
{{ render_field(form.ignore_status_codes) }}
|
||||||
|
</div>
|
||||||
</fieldset>
|
</fieldset>
|
||||||
<br/>
|
<br/>
|
||||||
</div>
|
</div>
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
#!/usr/bin/python3
|
#!/usr/bin/python3
|
||||||
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from . util import live_server_setup
|
from . util import live_server_setup
|
||||||
|
|
||||||
|
@ -17,7 +18,9 @@ def test_error_handler(client, live_server):
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
test_url = url_for('test_endpoint_403_error', _external=True)
|
test_url = url_for('test_endpoint',
|
||||||
|
status_code=403,
|
||||||
|
_external=True)
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("import_page"),
|
url_for("import_page"),
|
||||||
data={"urls": test_url},
|
data={"urls": test_url},
|
||||||
|
|
|
@ -0,0 +1,190 @@
|
||||||
|
#!/usr/bin/python3
|
||||||
|
|
||||||
|
import time
|
||||||
|
from flask import url_for
|
||||||
|
from . util import live_server_setup
|
||||||
|
|
||||||
|
|
||||||
|
def test_setup(live_server):
|
||||||
|
live_server_setup(live_server)
|
||||||
|
|
||||||
|
|
||||||
|
def set_original_response():
|
||||||
|
test_return_data = """<html>
|
||||||
|
<body>
|
||||||
|
Some initial text</br>
|
||||||
|
<p>Which is across multiple lines</p>
|
||||||
|
</br>
|
||||||
|
So let's see what happens. </br>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
"""
|
||||||
|
|
||||||
|
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||||
|
f.write(test_return_data)
|
||||||
|
|
||||||
|
|
||||||
|
def set_some_changed_response():
|
||||||
|
test_return_data = """<html>
|
||||||
|
<body>
|
||||||
|
Some initial text</br>
|
||||||
|
<p>Which is across multiple lines, and a new thing too.</p>
|
||||||
|
</br>
|
||||||
|
So let's see what happens. </br>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
"""
|
||||||
|
|
||||||
|
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||||
|
f.write(test_return_data)
|
||||||
|
|
||||||
|
|
||||||
|
def test_normal_page_check_works_with_ignore_status_code(client, live_server):
|
||||||
|
sleep_time_for_fetch_thread = 3
|
||||||
|
|
||||||
|
# Give the endpoint time to spin up
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
set_original_response()
|
||||||
|
|
||||||
|
# Goto the settings page, add our ignore text
|
||||||
|
res = client.post(
|
||||||
|
url_for("settings_page"),
|
||||||
|
data={
|
||||||
|
"minutes_between_check": 180,
|
||||||
|
"ignore_status_codes": "y",
|
||||||
|
'fetch_backend': "html_requests"
|
||||||
|
},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"Settings updated." in res.data
|
||||||
|
|
||||||
|
# Add our URL to the import page
|
||||||
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
|
res = client.post(
|
||||||
|
url_for("import_page"),
|
||||||
|
data={"urls": test_url},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
|
time.sleep(sleep_time_for_fetch_thread)
|
||||||
|
# Trigger a check
|
||||||
|
client.get(url_for("api_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
|
set_some_changed_response()
|
||||||
|
time.sleep(sleep_time_for_fetch_thread)
|
||||||
|
# Trigger a check
|
||||||
|
client.get(url_for("api_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
|
# Give the thread time to pick it up
|
||||||
|
time.sleep(sleep_time_for_fetch_thread)
|
||||||
|
|
||||||
|
# It should report nothing found (no new 'unviewed' class)
|
||||||
|
res = client.get(url_for("index"))
|
||||||
|
assert b'unviewed' in res.data
|
||||||
|
assert b'/test-endpoint' in res.data
|
||||||
|
|
||||||
|
|
||||||
|
# Tests the whole stack works with staus codes ignored
|
||||||
|
def test_403_page_check_works_with_ignore_status_code(client, live_server):
|
||||||
|
sleep_time_for_fetch_thread = 3
|
||||||
|
|
||||||
|
set_original_response()
|
||||||
|
|
||||||
|
# Give the endpoint time to spin up
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Add our URL to the import page
|
||||||
|
test_url = url_for('test_endpoint', status_code=403, _external=True)
|
||||||
|
res = client.post(
|
||||||
|
url_for("import_page"),
|
||||||
|
data={"urls": test_url},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
|
# Trigger a check
|
||||||
|
client.get(url_for("api_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
|
# Give the thread time to pick it up
|
||||||
|
time.sleep(sleep_time_for_fetch_thread)
|
||||||
|
|
||||||
|
# Goto the edit page, check our ignore option
|
||||||
|
# Add our URL to the import page
|
||||||
|
res = client.post(
|
||||||
|
url_for("edit_page", uuid="first"),
|
||||||
|
data={"ignore_status_codes": "y", "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"Updated watch." in res.data
|
||||||
|
|
||||||
|
# Trigger a check
|
||||||
|
client.get(url_for("api_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
|
# Give the thread time to pick it up
|
||||||
|
time.sleep(sleep_time_for_fetch_thread)
|
||||||
|
# Make a change
|
||||||
|
set_some_changed_response()
|
||||||
|
|
||||||
|
# Trigger a check
|
||||||
|
client.get(url_for("api_watch_checknow"), follow_redirects=True)
|
||||||
|
# Give the thread time to pick it up
|
||||||
|
time.sleep(sleep_time_for_fetch_thread)
|
||||||
|
|
||||||
|
# It should have 'unviewed' still
|
||||||
|
# Because it should be looking at only that 'sametext' id
|
||||||
|
res = client.get(url_for("index"))
|
||||||
|
assert b'unviewed' in res.data
|
||||||
|
|
||||||
|
|
||||||
|
# Tests the whole stack works with staus codes ignored
|
||||||
|
def test_403_page_check_fails_without_ignore_status_code(client, live_server):
|
||||||
|
sleep_time_for_fetch_thread = 3
|
||||||
|
|
||||||
|
set_original_response()
|
||||||
|
|
||||||
|
# Give the endpoint time to spin up
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Add our URL to the import page
|
||||||
|
test_url = url_for('test_endpoint', status_code=403, _external=True)
|
||||||
|
res = client.post(
|
||||||
|
url_for("import_page"),
|
||||||
|
data={"urls": test_url},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
|
# Trigger a check
|
||||||
|
client.get(url_for("api_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
|
# Give the thread time to pick it up
|
||||||
|
time.sleep(sleep_time_for_fetch_thread)
|
||||||
|
|
||||||
|
# Goto the edit page, check our ignore option
|
||||||
|
# Add our URL to the import page
|
||||||
|
res = client.post(
|
||||||
|
url_for("edit_page", uuid="first"),
|
||||||
|
data={"url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"Updated watch." in res.data
|
||||||
|
|
||||||
|
# Trigger a check
|
||||||
|
client.get(url_for("api_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
|
# Give the thread time to pick it up
|
||||||
|
time.sleep(sleep_time_for_fetch_thread)
|
||||||
|
# Make a change
|
||||||
|
set_some_changed_response()
|
||||||
|
|
||||||
|
# Trigger a check
|
||||||
|
client.get(url_for("api_watch_checknow"), follow_redirects=True)
|
||||||
|
# Give the thread time to pick it up
|
||||||
|
time.sleep(sleep_time_for_fetch_thread)
|
||||||
|
|
||||||
|
# It should have 'unviewed' still
|
||||||
|
# Because it should be looking at only that 'sametext' id
|
||||||
|
res = client.get(url_for("index"))
|
||||||
|
assert b'Status Code 403' in res.data
|
|
@ -38,21 +38,19 @@ def set_modified_response():
|
||||||
|
|
||||||
def live_server_setup(live_server):
|
def live_server_setup(live_server):
|
||||||
|
|
||||||
|
|
||||||
@live_server.app.route('/test-endpoint')
|
@live_server.app.route('/test-endpoint')
|
||||||
def test_endpoint():
|
def test_endpoint():
|
||||||
ctype = request.args.get('content_type')
|
ctype = request.args.get('content_type')
|
||||||
|
status_code = request.args.get('status_code')
|
||||||
|
|
||||||
# Tried using a global var here but didn't seem to work, so reading from a file instead.
|
try:
|
||||||
with open("test-datastore/endpoint-content.txt", "r") as f:
|
# Tried using a global var here but didn't seem to work, so reading from a file instead.
|
||||||
resp = make_response(f.read())
|
with open("test-datastore/endpoint-content.txt", "r") as f:
|
||||||
resp.headers['Content-Type'] = ctype if ctype else 'text/html'
|
resp = make_response(f.read(), status_code)
|
||||||
return resp
|
resp.headers['Content-Type'] = ctype if ctype else 'text/html'
|
||||||
|
return resp
|
||||||
@live_server.app.route('/test-403')
|
except FileNotFoundError:
|
||||||
def test_endpoint_403_error():
|
return make_response('', status_code)
|
||||||
resp = make_response('', 403)
|
|
||||||
return resp
|
|
||||||
|
|
||||||
# Just return the headers in the request
|
# Just return the headers in the request
|
||||||
@live_server.app.route('/test-headers')
|
@live_server.app.route('/test-headers')
|
||||||
|
|
Ładowanie…
Reference in New Issue