kopia lustrzana https://github.com/dgtlmoon/changedetection.io
rodzic
10b2bbea83
commit
289f118581
|
@ -285,8 +285,6 @@ class CreateWatch(Resource):
|
||||||
list = {}
|
list = {}
|
||||||
|
|
||||||
tag_limit = request.args.get('tag', '').lower()
|
tag_limit = request.args.get('tag', '').lower()
|
||||||
|
|
||||||
|
|
||||||
for uuid, watch in self.datastore.data['watching'].items():
|
for uuid, watch in self.datastore.data['watching'].items():
|
||||||
# Watch tags by name (replace the other calls?)
|
# Watch tags by name (replace the other calls?)
|
||||||
tags = self.datastore.get_all_tags_for_watch(uuid=uuid)
|
tags = self.datastore.get_all_tags_for_watch(uuid=uuid)
|
||||||
|
|
|
@ -11,22 +11,14 @@ def check_token(f):
|
||||||
datastore = args[0].datastore
|
datastore = args[0].datastore
|
||||||
|
|
||||||
config_api_token_enabled = datastore.data['settings']['application'].get('api_access_token_enabled')
|
config_api_token_enabled = datastore.data['settings']['application'].get('api_access_token_enabled')
|
||||||
if not config_api_token_enabled:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
api_key_header = request.headers['x-api-key']
|
|
||||||
except KeyError:
|
|
||||||
return make_response(
|
|
||||||
jsonify("No authorization x-api-key header."), 403
|
|
||||||
)
|
|
||||||
|
|
||||||
config_api_token = datastore.data['settings']['application'].get('api_access_token')
|
config_api_token = datastore.data['settings']['application'].get('api_access_token')
|
||||||
|
|
||||||
if api_key_header != config_api_token:
|
# config_api_token_enabled - a UI option in settings if access should obey the key or not
|
||||||
return make_response(
|
if config_api_token_enabled:
|
||||||
jsonify("Invalid access - API key invalid."), 403
|
if request.headers.get('x-api-key') != config_api_token:
|
||||||
)
|
return make_response(
|
||||||
|
jsonify("Invalid access - API key invalid."), 403
|
||||||
|
)
|
||||||
|
|
||||||
return f(*args, **kwargs)
|
return f(*args, **kwargs)
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,6 @@
|
||||||
import flask_login
|
import flask_login
|
||||||
import locale
|
import locale
|
||||||
import os
|
import os
|
||||||
import pytz
|
|
||||||
import queue
|
import queue
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
@ -244,6 +243,9 @@ def changedetection_app(config=None, datastore_o=None):
|
||||||
# RSS access with token is allowed
|
# RSS access with token is allowed
|
||||||
elif request.endpoint and 'rss.feed' in request.endpoint:
|
elif request.endpoint and 'rss.feed' in request.endpoint:
|
||||||
return None
|
return None
|
||||||
|
# API routes - use their own auth mechanism (@auth.check_token)
|
||||||
|
elif request.path.startswith('/api/'):
|
||||||
|
return None
|
||||||
else:
|
else:
|
||||||
return login_manager.unauthorized()
|
return login_manager.unauthorized()
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import live_server_setup, extract_api_key_from_UI, wait_for_all_checks
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import uuid
|
import uuid
|
||||||
|
@ -57,16 +57,15 @@ def test_setup(client, live_server, measure_memory_usage):
|
||||||
|
|
||||||
|
|
||||||
def test_api_simple(client, live_server, measure_memory_usage):
|
def test_api_simple(client, live_server, measure_memory_usage):
|
||||||
# live_server_setup(live_server)
|
#live_server_setup(live_server)
|
||||||
|
|
||||||
api_key = extract_api_key_from_UI(client)
|
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||||
|
|
||||||
# Create a watch
|
# Create a watch
|
||||||
set_original_response()
|
set_original_response()
|
||||||
|
|
||||||
# Validate bad URL
|
# Validate bad URL
|
||||||
test_url = url_for('test_endpoint', _external=True,
|
test_url = url_for('test_endpoint', _external=True )
|
||||||
headers={'x-api-key': api_key}, )
|
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("createwatch"),
|
url_for("createwatch"),
|
||||||
data=json.dumps({"url": "h://xxxxxxxxxom"}),
|
data=json.dumps({"url": "h://xxxxxxxxxom"}),
|
||||||
|
@ -293,12 +292,11 @@ def test_access_denied(client, live_server, measure_memory_usage):
|
||||||
def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
|
def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
|
||||||
|
|
||||||
#live_server_setup(live_server)
|
#live_server_setup(live_server)
|
||||||
api_key = extract_api_key_from_UI(client)
|
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||||
|
|
||||||
# Create a watch
|
# Create a watch
|
||||||
set_original_response()
|
set_original_response()
|
||||||
test_url = url_for('test_endpoint', _external=True,
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
headers={'x-api-key': api_key}, )
|
|
||||||
|
|
||||||
# Create new
|
# Create new
|
||||||
res = client.post(
|
res = client.post(
|
||||||
|
@ -374,7 +372,7 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
|
||||||
|
|
||||||
def test_api_import(client, live_server, measure_memory_usage):
|
def test_api_import(client, live_server, measure_memory_usage):
|
||||||
#live_server_setup(live_server)
|
#live_server_setup(live_server)
|
||||||
api_key = extract_api_key_from_UI(client)
|
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||||
|
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("import") + "?tag=import-test",
|
url_for("import") + "?tag=import-test",
|
||||||
|
@ -392,4 +390,48 @@ def test_api_import(client, live_server, measure_memory_usage):
|
||||||
# Should see the new tag in the tag/groups list
|
# Should see the new tag in the tag/groups list
|
||||||
res = client.get(url_for('tags.tags_overview_page'))
|
res = client.get(url_for('tags.tags_overview_page'))
|
||||||
assert b'import-test' in res.data
|
assert b'import-test' in res.data
|
||||||
|
|
||||||
|
def test_api_conflict_UI_password(client, live_server, measure_memory_usage):
|
||||||
|
|
||||||
|
#live_server_setup(live_server)
|
||||||
|
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||||
|
|
||||||
|
# Enable password check and diff page access bypass
|
||||||
|
res = client.post(
|
||||||
|
url_for("settings.settings_page"),
|
||||||
|
data={"application-password": "foobar", # password is now set! API should still work!
|
||||||
|
"application-api_access_token_enabled": "y",
|
||||||
|
"requests-time_between_check-minutes": 180,
|
||||||
|
'application-fetch_backend': "html_requests"},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
assert b"Password protection enabled." in res.data
|
||||||
|
|
||||||
|
# Create a watch
|
||||||
|
set_original_response()
|
||||||
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
|
|
||||||
|
# Create new
|
||||||
|
res = client.post(
|
||||||
|
url_for("createwatch"),
|
||||||
|
data=json.dumps({"url": test_url, "title": "My test URL" }),
|
||||||
|
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
assert res.status_code == 201
|
||||||
|
|
||||||
|
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
url = url_for("createwatch")
|
||||||
|
# Get a listing, it will be the first one
|
||||||
|
res = client.get(
|
||||||
|
url,
|
||||||
|
headers={'x-api-key': api_key}
|
||||||
|
)
|
||||||
|
assert res.status_code == 200
|
||||||
|
|
||||||
|
assert len(res.json)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import live_server_setup, extract_UUID_from_client, extract_api_key_from_UI, wait_for_all_checks
|
from .util import live_server_setup, extract_UUID_from_client, wait_for_all_checks
|
||||||
|
|
||||||
|
|
||||||
def set_response_with_ldjson():
|
def set_response_with_ldjson():
|
||||||
|
@ -110,7 +110,7 @@ def test_check_ldjson_price_autodetect(client, live_server, measure_memory_usage
|
||||||
assert b'tracking-ldjson-price-data' in res.data
|
assert b'tracking-ldjson-price-data' in res.data
|
||||||
|
|
||||||
# and last snapshop (via API) should be just the price
|
# and last snapshop (via API) should be just the price
|
||||||
api_key = extract_api_key_from_UI(client)
|
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("watchsinglehistory", uuid=uuid, timestamp='latest'),
|
url_for("watchsinglehistory", uuid=uuid, timestamp='latest'),
|
||||||
headers={'x-api-key': api_key},
|
headers={'x-api-key': api_key},
|
||||||
|
|
|
@ -95,20 +95,6 @@ def wait_for_notification_endpoint_output():
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
# kinda funky, but works for now
|
|
||||||
def extract_api_key_from_UI(client):
|
|
||||||
import re
|
|
||||||
res = client.get(
|
|
||||||
url_for("settings.settings_page"),
|
|
||||||
)
|
|
||||||
# <span id="api-key">{{api_key}}</span>
|
|
||||||
|
|
||||||
m = re.search('<span id="api-key">(.+?)</span>', str(res.data))
|
|
||||||
api_key = m.group(1)
|
|
||||||
return api_key.strip()
|
|
||||||
|
|
||||||
|
|
||||||
# kinda funky, but works for now
|
# kinda funky, but works for now
|
||||||
def get_UUID_for_tag_name(client, name):
|
def get_UUID_for_tag_name(client, name):
|
||||||
app_config = client.application.config.get('DATASTORE').data
|
app_config = client.application.config.get('DATASTORE').data
|
||||||
|
|
Ładowanie…
Reference in New Issue