kopia lustrzana https://github.com/dgtlmoon/changedetection.io
108 wiersze
3.8 KiB
Python
108 wiersze
3.8 KiB
Python
from flask_expects_json import expects_json
|
|
from flask_restful import Resource, abort
|
|
from flask import request
|
|
from . import auth, validate_openapi_request
|
|
from . import schema_create_notification_urls, schema_delete_notification_urls
|
|
|
|
class Notifications(Resource):
|
|
def __init__(self, **kwargs):
|
|
# datastore is a black box dependency
|
|
self.datastore = kwargs['datastore']
|
|
|
|
@auth.check_token
|
|
@validate_openapi_request('getNotifications')
|
|
def get(self):
|
|
"""Return Notification URL List."""
|
|
|
|
notification_urls = self.datastore.data.get('settings', {}).get('application', {}).get('notification_urls', [])
|
|
|
|
return {
|
|
'notification_urls': notification_urls,
|
|
}, 200
|
|
|
|
@auth.check_token
|
|
@validate_openapi_request('addNotifications')
|
|
@expects_json(schema_create_notification_urls)
|
|
def post(self):
|
|
"""Create Notification URLs."""
|
|
|
|
json_data = request.get_json()
|
|
notification_urls = json_data.get("notification_urls", [])
|
|
|
|
from wtforms import ValidationError
|
|
try:
|
|
validate_notification_urls(notification_urls)
|
|
except ValidationError as e:
|
|
return str(e), 400
|
|
|
|
added_urls = []
|
|
|
|
for url in notification_urls:
|
|
clean_url = url.strip()
|
|
added_url = self.datastore.add_notification_url(clean_url)
|
|
if added_url:
|
|
added_urls.append(added_url)
|
|
|
|
if not added_urls:
|
|
return "No valid notification URLs were added", 400
|
|
|
|
return {'notification_urls': added_urls}, 201
|
|
|
|
@auth.check_token
|
|
@validate_openapi_request('replaceNotifications')
|
|
@expects_json(schema_create_notification_urls)
|
|
def put(self):
|
|
"""Replace Notification URLs."""
|
|
json_data = request.get_json()
|
|
notification_urls = json_data.get("notification_urls", [])
|
|
|
|
from wtforms import ValidationError
|
|
try:
|
|
validate_notification_urls(notification_urls)
|
|
except ValidationError as e:
|
|
return str(e), 400
|
|
|
|
if not isinstance(notification_urls, list):
|
|
return "Invalid input format", 400
|
|
|
|
clean_urls = [url.strip() for url in notification_urls if isinstance(url, str)]
|
|
self.datastore.data['settings']['application']['notification_urls'] = clean_urls
|
|
self.datastore.needs_write = True
|
|
|
|
return {'notification_urls': clean_urls}, 200
|
|
|
|
@auth.check_token
|
|
@validate_openapi_request('deleteNotifications')
|
|
@expects_json(schema_delete_notification_urls)
|
|
def delete(self):
|
|
"""Delete Notification URLs."""
|
|
|
|
json_data = request.get_json()
|
|
urls_to_delete = json_data.get("notification_urls", [])
|
|
if not isinstance(urls_to_delete, list):
|
|
abort(400, message="Expected a list of notification URLs.")
|
|
|
|
notification_urls = self.datastore.data['settings']['application'].get('notification_urls', [])
|
|
deleted = []
|
|
|
|
for url in urls_to_delete:
|
|
clean_url = url.strip()
|
|
if clean_url in notification_urls:
|
|
notification_urls.remove(clean_url)
|
|
deleted.append(clean_url)
|
|
|
|
if not deleted:
|
|
abort(400, message="No matching notification URLs found.")
|
|
|
|
self.datastore.data['settings']['application']['notification_urls'] = notification_urls
|
|
self.datastore.needs_write = True
|
|
|
|
return 'OK', 204
|
|
|
|
def validate_notification_urls(notification_urls):
|
|
from changedetectionio.forms import ValidateAppRiseServers
|
|
validator = ValidateAppRiseServers()
|
|
class DummyForm: pass
|
|
dummy_form = DummyForm()
|
|
field = type("Field", (object,), {"data": notification_urls, "gettext": lambda self, x: x})()
|
|
validator(dummy_form, field) |