import os from changedetectionio.strtobool import strtobool from flask_expects_json import expects_json from changedetectionio import queuedWatchMetaData from changedetectionio import worker_handler from flask_restful import abort, Resource from flask import request, make_response, send_from_directory import validators from . import auth import copy # Import schemas from __init__.py from . import schema, schema_create_watch, schema_update_watch, validate_openapi_request class Watch(Resource): def __init__(self, **kwargs): # datastore is a black box dependency self.datastore = kwargs['datastore'] self.update_q = kwargs['update_q'] # Get information about a single watch, excluding the history list (can be large) # curl http://localhost:5000/api/v1/watch/ # @todo - version2 - ?muted and ?paused should be able to be called together, return the watch struct not "OK" # ?recheck=true @auth.check_token @validate_openapi_request('getWatch') def get(self, uuid): """Get information about a single watch, recheck, pause, or mute.""" from copy import deepcopy watch = deepcopy(self.datastore.data['watching'].get(uuid)) if not watch: abort(404, message='No watch exists with the UUID of {}'.format(uuid)) if request.args.get('recheck'): worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) return "OK", 200 if request.args.get('paused', '') == 'paused': self.datastore.data['watching'].get(uuid).pause() return "OK", 200 elif request.args.get('paused', '') == 'unpaused': self.datastore.data['watching'].get(uuid).unpause() return "OK", 200 if request.args.get('muted', '') == 'muted': self.datastore.data['watching'].get(uuid).mute() return "OK", 200 elif request.args.get('muted', '') == 'unmuted': self.datastore.data['watching'].get(uuid).unmute() return "OK", 200 # Return without history, get that via another API call # Properties are not returned as a JSON, so add the required props manually watch['history_n'] = watch.history_n # attr .last_changed will check for the last written text snapshot on change watch['last_changed'] = watch.last_changed watch['viewed'] = watch.viewed return watch @auth.check_token @validate_openapi_request('deleteWatch') def delete(self, uuid): """Delete a watch and related history.""" if not self.datastore.data['watching'].get(uuid): abort(400, message='No watch exists with the UUID of {}'.format(uuid)) self.datastore.delete(uuid) return 'OK', 204 @auth.check_token @validate_openapi_request('updateWatch') @expects_json(schema_update_watch) def put(self, uuid): """Update watch information.""" watch = self.datastore.data['watching'].get(uuid) if not watch: abort(404, message='No watch exists with the UUID of {}'.format(uuid)) if request.json.get('proxy'): plist = self.datastore.proxy_list if not request.json.get('proxy') in plist: return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400 watch.update(request.json) return "OK", 200 class WatchHistory(Resource): def __init__(self, **kwargs): # datastore is a black box dependency self.datastore = kwargs['datastore'] # Get a list of available history for a watch by UUID # curl http://localhost:5000/api/v1/watch//history @auth.check_token @validate_openapi_request('getWatchHistory') def get(self, uuid): """Get a list of all historical snapshots available for a watch.""" watch = self.datastore.data['watching'].get(uuid) if not watch: abort(404, message='No watch exists with the UUID of {}'.format(uuid)) return watch.history, 200 class WatchSingleHistory(Resource): def __init__(self, **kwargs): # datastore is a black box dependency self.datastore = kwargs['datastore'] @auth.check_token @validate_openapi_request('getWatchSnapshot') def get(self, uuid, timestamp): """Get single snapshot from watch.""" watch = self.datastore.data['watching'].get(uuid) if not watch: abort(404, message=f"No watch exists with the UUID of {uuid}") if not len(watch.history): abort(404, message=f"Watch found but no history exists for the UUID {uuid}") if timestamp == 'latest': timestamp = list(watch.history.keys())[-1] if request.args.get('html'): content = watch.get_fetched_html(timestamp) if content: response = make_response(content, 200) response.mimetype = "text/html" else: response = make_response("No content found", 404) response.mimetype = "text/plain" else: content = watch.get_history_snapshot(timestamp) response = make_response(content, 200) response.mimetype = "text/plain" return response class WatchFavicon(Resource): def __init__(self, **kwargs): # datastore is a black box dependency self.datastore = kwargs['datastore'] @auth.check_token @validate_openapi_request('getWatchFavicon') def get(self, uuid): """Get favicon for a watch.""" watch = self.datastore.data['watching'].get(uuid) if not watch: abort(404, message=f"No watch exists with the UUID of {uuid}") favicon_filename = watch.get_favicon_filename() if favicon_filename: try: import magic mime = magic.from_file( os.path.join(watch.watch_data_dir, favicon_filename), mime=True ) except ImportError: # Fallback, no python-magic import mimetypes mime, encoding = mimetypes.guess_type(favicon_filename) response = make_response(send_from_directory(watch.watch_data_dir, favicon_filename)) response.headers['Content-type'] = mime response.headers['Cache-Control'] = 'max-age=300, must-revalidate' # Cache for 5 minutes, then revalidate return response abort(404, message=f'No Favicon available for {uuid}') class CreateWatch(Resource): def __init__(self, **kwargs): # datastore is a black box dependency self.datastore = kwargs['datastore'] self.update_q = kwargs['update_q'] @auth.check_token @validate_openapi_request('createWatch') @expects_json(schema_create_watch) def post(self): """Create a single watch.""" json_data = request.get_json() url = json_data['url'].strip() # If hosts that only contain alphanumerics are allowed ("localhost" for example) allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False')) if not validators.url(url, simple_host=allow_simplehost): return "Invalid or unsupported URL", 400 if json_data.get('proxy'): plist = self.datastore.proxy_list if not json_data.get('proxy') in plist: return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400 extras = copy.deepcopy(json_data) # Because we renamed 'tag' to 'tags' but don't want to change the API (can do this in v2 of the API) tags = None if extras.get('tag'): tags = extras.get('tag') del extras['tag'] del extras['url'] new_uuid = self.datastore.add_watch(url=url, extras=extras, tag=tags) if new_uuid: worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid})) return {'uuid': new_uuid}, 201 else: return "Invalid or unsupported URL", 400 @auth.check_token @validate_openapi_request('listWatches') def get(self): """List watches.""" list = {} tag_limit = request.args.get('tag', '').lower() for uuid, watch in self.datastore.data['watching'].items(): # Watch tags by name (replace the other calls?) tags = self.datastore.get_all_tags_for_watch(uuid=uuid) if tag_limit and not any(v.get('title').lower() == tag_limit for k, v in tags.items()): continue list[uuid] = { 'last_changed': watch.last_changed, 'last_checked': watch['last_checked'], 'last_error': watch['last_error'], 'title': watch['title'], 'url': watch['url'], 'viewed': watch.viewed } if request.args.get('recheck_all'): for uuid in self.datastore.data['watching'].keys(): worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) return {'status': "OK"}, 200 return list, 200