kopia lustrzana https://github.com/dgtlmoon/changedetection.io
Better text/plain detection and refactor tests (#443)
rodzic
615fa2c5b2
commit
96664ffb10
|
@ -838,6 +838,7 @@ def changedetection_app(config=None, datastore_o=None):
|
||||||
logs=notification_debug_log if len(notification_debug_log) else ["No errors or warnings detected"])
|
logs=notification_debug_log if len(notification_debug_log) else ["No errors or warnings detected"])
|
||||||
|
|
||||||
return output
|
return output
|
||||||
|
|
||||||
@app.route("/api/<string:uuid>/snapshot/current", methods=['GET'])
|
@app.route("/api/<string:uuid>/snapshot/current", methods=['GET'])
|
||||||
@login_required
|
@login_required
|
||||||
def api_snapshot(uuid):
|
def api_snapshot(uuid):
|
||||||
|
|
|
@ -86,8 +86,13 @@ class perform_site_check():
|
||||||
if is_html:
|
if is_html:
|
||||||
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
|
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
|
||||||
html_content = fetcher.content
|
html_content = fetcher.content
|
||||||
if not fetcher.headers.get('Content-Type', '') == 'text/plain':
|
|
||||||
|
|
||||||
|
# If not JSON, and if it's not text/plain..
|
||||||
|
if 'text/plain' in fetcher.headers.get('Content-Type', '').lower():
|
||||||
|
# Don't run get_text or xpath/css filters on plaintext
|
||||||
|
stripped_text_from_html = html_content
|
||||||
|
else:
|
||||||
|
# Then we assume HTML
|
||||||
if has_filter_rule:
|
if has_filter_rule:
|
||||||
# For HTML/XML we offer xpath as an option, just start a regular xPath "/.."
|
# For HTML/XML we offer xpath as an option, just start a regular xPath "/.."
|
||||||
if css_filter_rule[0] == '/':
|
if css_filter_rule[0] == '/':
|
||||||
|
@ -98,9 +103,7 @@ class perform_site_check():
|
||||||
|
|
||||||
# get_text() via inscriptis
|
# get_text() via inscriptis
|
||||||
stripped_text_from_html = get_text(html_content)
|
stripped_text_from_html = get_text(html_content)
|
||||||
else:
|
|
||||||
# Don't run get_text or xpath/css filters on plaintext
|
|
||||||
stripped_text_from_html = html_content
|
|
||||||
|
|
||||||
# Re #340 - return the content before the 'ignore text' was applied
|
# Re #340 - return the content before the 'ignore text' was applied
|
||||||
text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8')
|
text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8')
|
||||||
|
|
|
@ -14,7 +14,6 @@ def set_response_data(test_return_data):
|
||||||
|
|
||||||
|
|
||||||
def test_snapshot_api_detects_change(client, live_server):
|
def test_snapshot_api_detects_change(client, live_server):
|
||||||
|
|
||||||
test_return_data = "Some initial text"
|
test_return_data = "Some initial text"
|
||||||
|
|
||||||
test_return_data_modified = "Some NEW nice initial text"
|
test_return_data_modified = "Some NEW nice initial text"
|
||||||
|
@ -27,7 +26,7 @@ def test_snapshot_api_detects_change(client, live_server):
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
test_url = url_for('test_endpoint', _external=True)
|
test_url = url_for('test_endpoint', content_type="text/plain", _external=True)
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("import_page"),
|
url_for("import_page"),
|
||||||
data={"urls": test_url},
|
data={"urls": test_url},
|
||||||
|
|
|
@ -7,6 +7,13 @@ from . util import set_original_response, set_modified_response, live_server_set
|
||||||
|
|
||||||
sleep_time_for_fetch_thread = 3
|
sleep_time_for_fetch_thread = 3
|
||||||
|
|
||||||
|
# Basic test to check inscriptus is not adding return line chars, basically works etc
|
||||||
|
def test_inscriptus():
|
||||||
|
from inscriptis import get_text
|
||||||
|
html_content="<html><body>test!<br/>ok man</body></html>"
|
||||||
|
stripped_text_from_html = get_text(html_content)
|
||||||
|
assert stripped_text_from_html == 'test!\nok man'
|
||||||
|
|
||||||
|
|
||||||
def test_check_basic_change_detection_functionality(client, live_server):
|
def test_check_basic_change_detection_functionality(client, live_server):
|
||||||
set_original_response()
|
set_original_response()
|
||||||
|
|
|
@ -162,7 +162,7 @@ def test_check_json_without_filter(client, live_server):
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
test_url = url_for('test_endpoint_json', _external=True)
|
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
|
||||||
client.post(
|
client.post(
|
||||||
url_for("import_page"),
|
url_for("import_page"),
|
||||||
data={"urls": test_url},
|
data={"urls": test_url},
|
||||||
|
@ -193,7 +193,7 @@ def test_check_json_filter(client, live_server):
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
test_url = url_for('test_endpoint', _external=True)
|
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("import_page"),
|
url_for("import_page"),
|
||||||
data={"urls": test_url},
|
data={"urls": test_url},
|
||||||
|
@ -258,7 +258,7 @@ def test_check_json_filter_bool_val(client, live_server):
|
||||||
# Give the endpoint time to spin up
|
# Give the endpoint time to spin up
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
test_url = url_for('test_endpoint', _external=True)
|
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
|
||||||
|
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("import_page"),
|
url_for("import_page"),
|
||||||
|
@ -313,7 +313,7 @@ def test_check_json_ext_filter(client, live_server):
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
test_url = url_for('test_endpoint', _external=True)
|
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("import_page"),
|
url_for("import_page"),
|
||||||
data={"urls": test_url},
|
data={"urls": test_url},
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
#!/usr/bin/python3
|
#!/usr/bin/python3
|
||||||
|
|
||||||
|
from flask import make_response, request
|
||||||
|
|
||||||
def set_original_response():
|
def set_original_response():
|
||||||
test_return_data = """<html>
|
test_return_data = """<html>
|
||||||
|
@ -40,24 +41,16 @@ def live_server_setup(live_server):
|
||||||
|
|
||||||
@live_server.app.route('/test-endpoint')
|
@live_server.app.route('/test-endpoint')
|
||||||
def test_endpoint():
|
def test_endpoint():
|
||||||
|
ctype = request.args.get('content_type')
|
||||||
|
|
||||||
# Tried using a global var here but didn't seem to work, so reading from a file instead.
|
# Tried using a global var here but didn't seem to work, so reading from a file instead.
|
||||||
with open("test-datastore/endpoint-content.txt", "r") as f:
|
|
||||||
return f.read()
|
|
||||||
|
|
||||||
@live_server.app.route('/test-endpoint-json')
|
|
||||||
def test_endpoint_json():
|
|
||||||
|
|
||||||
from flask import make_response
|
|
||||||
|
|
||||||
with open("test-datastore/endpoint-content.txt", "r") as f:
|
with open("test-datastore/endpoint-content.txt", "r") as f:
|
||||||
resp = make_response(f.read())
|
resp = make_response(f.read())
|
||||||
resp.headers['Content-Type'] = 'application/json'
|
resp.headers['Content-Type'] = ctype if ctype else 'text/html'
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
@live_server.app.route('/test-403')
|
@live_server.app.route('/test-403')
|
||||||
def test_endpoint_403_error():
|
def test_endpoint_403_error():
|
||||||
|
|
||||||
from flask import make_response
|
|
||||||
resp = make_response('', 403)
|
resp = make_response('', 403)
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
|
@ -65,7 +58,6 @@ def live_server_setup(live_server):
|
||||||
@live_server.app.route('/test-headers')
|
@live_server.app.route('/test-headers')
|
||||||
def test_headers():
|
def test_headers():
|
||||||
|
|
||||||
from flask import request
|
|
||||||
output= []
|
output= []
|
||||||
|
|
||||||
for header in request.headers:
|
for header in request.headers:
|
||||||
|
@ -76,24 +68,16 @@ def live_server_setup(live_server):
|
||||||
# Just return the body in the request
|
# Just return the body in the request
|
||||||
@live_server.app.route('/test-body', methods=['POST', 'GET'])
|
@live_server.app.route('/test-body', methods=['POST', 'GET'])
|
||||||
def test_body():
|
def test_body():
|
||||||
|
|
||||||
from flask import request
|
|
||||||
|
|
||||||
return request.data
|
return request.data
|
||||||
|
|
||||||
# Just return the verb in the request
|
# Just return the verb in the request
|
||||||
@live_server.app.route('/test-method', methods=['POST', 'GET', 'PATCH'])
|
@live_server.app.route('/test-method', methods=['POST', 'GET', 'PATCH'])
|
||||||
def test_method():
|
def test_method():
|
||||||
|
|
||||||
from flask import request
|
|
||||||
|
|
||||||
return request.method
|
return request.method
|
||||||
|
|
||||||
# Where we POST to as a notification
|
# Where we POST to as a notification
|
||||||
@live_server.app.route('/test_notification_endpoint', methods=['POST', 'GET'])
|
@live_server.app.route('/test_notification_endpoint', methods=['POST', 'GET'])
|
||||||
def test_notification_endpoint():
|
def test_notification_endpoint():
|
||||||
from flask import request
|
|
||||||
|
|
||||||
with open("test-datastore/notification.txt", "wb") as f:
|
with open("test-datastore/notification.txt", "wb") as f:
|
||||||
# Debug method, dump all POST to file also, used to prove #65
|
# Debug method, dump all POST to file also, used to prove #65
|
||||||
data = request.stream.read()
|
data = request.stream.read()
|
||||||
|
@ -107,8 +91,6 @@ def live_server_setup(live_server):
|
||||||
# Just return the verb in the request
|
# Just return the verb in the request
|
||||||
@live_server.app.route('/test-basicauth', methods=['GET'])
|
@live_server.app.route('/test-basicauth', methods=['GET'])
|
||||||
def test_basicauth_method():
|
def test_basicauth_method():
|
||||||
|
|
||||||
from flask import request
|
|
||||||
auth = request.authorization
|
auth = request.authorization
|
||||||
ret = " ".join([auth.username, auth.password, auth.type])
|
ret = " ".join([auth.username, auth.password, auth.type])
|
||||||
return ret
|
return ret
|
||||||
|
|
Ładowanie…
Reference in New Issue