kopia lustrzana https://github.com/dgtlmoon/changedetection.io
				
				
				
			hack of pytest implementation - doesnt work yet
							rodzic
							
								
									93ee65fe53
								
							
						
					
					
						commit
						87f4347fe5
					
				|  | @ -39,8 +39,14 @@ def main(argv): | |||
|             datastore_path = arg | ||||
| 
 | ||||
| 
 | ||||
|     # Kinda weird to tell them both where `datastore_path` is right.. | ||||
| 
 | ||||
|     # threads can read from disk every x seconds right? | ||||
|     # front end can just save | ||||
|     # We just need to know which threads are looking at which UUIDs | ||||
| 
 | ||||
|     # isnt there some @thingy to attach to each route to tell it, that this route needs a datastore | ||||
|     app_config = {'datastore_path': datastore_path} | ||||
| 
 | ||||
|     datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path']) | ||||
|     app = backend.changedetection_app(app_config, datastore) | ||||
| 
 | ||||
|  |  | |||
|  | @ -238,11 +238,14 @@ def changedetection_app(config=None, datastore_o=None): | |||
| 
 | ||||
|             messages.append({'class': 'ok', 'message': "{} Imported, {} Skipped.".format(good, len(remaining_urls))}) | ||||
| 
 | ||||
|         output = render_template("import.html", | ||||
|                                  messages=messages, | ||||
|                                  remaining="\n".join(remaining_urls) | ||||
|                                  ) | ||||
|         messages = [] | ||||
|         if len(remaining_urls) == 0: | ||||
|             return redirect(url_for('main_page')) | ||||
|         else: | ||||
|             output = render_template("import.html", | ||||
|                                      messages=messages, | ||||
|                                      remaining="\n".join(remaining_urls) | ||||
|                                      ) | ||||
|             messages = [] | ||||
|         return output | ||||
| 
 | ||||
| 
 | ||||
|  | @ -328,21 +331,6 @@ def changedetection_app(config=None, datastore_o=None): | |||
|                          attachment_filename=backupname) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
|     # A few self sanity checks, mostly for developer/bug check | ||||
|     @app.route("/self-check", methods=['GET']) | ||||
|     def selfcheck(): | ||||
|         output = "All fine" | ||||
|         # In earlier versions before a single threaded write of the JSON store, sometimes histories could get mixed. | ||||
|         # Could also maybe affect people who manually fiddle with their JSON store? | ||||
|         for uuid, watch in datastore.data['watching'].items(): | ||||
|             for timestamp, path in watch['history'].items(): | ||||
|                 # Each history snapshot should include a full path, which contains the {uuid} | ||||
|                 if not uuid in path: | ||||
|                     output = "Something weird in {}, suspected incorrect snapshot path.".format(uuid) | ||||
| 
 | ||||
|         return output | ||||
| 
 | ||||
|     @app.route("/static/<string:group>/<string:filename>", methods=['GET']) | ||||
|     def static_content(group, filename): | ||||
|         # These files should be in our subdirectory | ||||
|  | @ -380,6 +368,7 @@ def changedetection_app(config=None, datastore_o=None): | |||
| 
 | ||||
|     @app.route("/api/checknow", methods=['GET']) | ||||
|     def api_watch_checknow(): | ||||
| 
 | ||||
|         global messages | ||||
| 
 | ||||
|         tag = request.args.get('tag') | ||||
|  | @ -404,11 +393,14 @@ def changedetection_app(config=None, datastore_o=None): | |||
|         messages.append({'class': 'ok', 'message': "{} watches are rechecking.".format(i)}) | ||||
|         return redirect(url_for('main_page', tag=tag)) | ||||
| 
 | ||||
| 
 | ||||
|     # for pytest flask | ||||
|     @app.route("/timestamp", methods=['GET']) | ||||
|     def api_test_rand_int(): | ||||
|         return str(time.time()) | ||||
| 
 | ||||
|     # @todo handle ctrl break | ||||
|     ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start() | ||||
|     save_data_thread = threading.Thread(target=save_datastore).start() | ||||
| 
 | ||||
| 
 | ||||
|     return app | ||||
| 
 | ||||
|  | @ -434,6 +426,8 @@ class Worker(threading.Thread): | |||
|                 uuid = self.q.get(block=True, timeout=1)  # Blocking | ||||
|             except queue.Empty: | ||||
|                 # We have a chance to kill this thread that needs to monitor for new jobs.. | ||||
|                 # Delays here would be caused by a current response object pending | ||||
|                 # @todo switch to threaded response handler | ||||
|                 if app.config['STOP_THREADS']: | ||||
|                     return | ||||
|             else: | ||||
|  | @ -442,14 +436,23 @@ class Worker(threading.Thread): | |||
|                 if uuid in list(datastore.data['watching'].keys()): | ||||
| 
 | ||||
|                     try: | ||||
|                         result = update_handler.run(uuid) | ||||
|                         result, contents = update_handler.run(uuid) | ||||
| 
 | ||||
|                     except PermissionError as s: | ||||
|                         print ("File permission error updating", uuid,str(s)) | ||||
|                         app.logger.error("File permission error updating", uuid, str(s)) | ||||
|                     else: | ||||
|                         if result: | ||||
|                             datastore.update_watch(uuid=uuid, update_obj=result) | ||||
| 
 | ||||
|                             if contents: | ||||
|                                 # A change was detected | ||||
|                                 datastore.save_history_text(uuid=uuid, contents=contents, result_obj=result) | ||||
| 
 | ||||
|                         else: | ||||
|                             # No change | ||||
|                             x = 1 | ||||
| 
 | ||||
| 
 | ||||
|                 self.current_uuid = None  # Done | ||||
|                 self.q.task_done() | ||||
| 
 | ||||
|  | @ -459,7 +462,6 @@ def ticker_thread_check_time_launch_checks(): | |||
| 
 | ||||
|     # Spin up Workers. | ||||
|     for _ in range(datastore.data['settings']['requests']['workers']): | ||||
|         print ("...") | ||||
|         new_worker = Worker(update_q) | ||||
|         running_update_threads.append(new_worker) | ||||
|         new_worker.start() | ||||
|  | @ -473,19 +475,6 @@ def ticker_thread_check_time_launch_checks(): | |||
| 
 | ||||
|         if app.config['STOP_THREADS']: | ||||
|             return | ||||
|         time.sleep(1) | ||||
| 
 | ||||
| 
 | ||||
| # Thread runner, this helps with thread/write issues when there are many operations that want to update the JSON | ||||
| # by just running periodically in one thread, according to python, dict updates are threadsafe. | ||||
| def save_datastore(): | ||||
| 
 | ||||
|     global stop_threads | ||||
| 
 | ||||
|     while True: | ||||
|         if app.config['STOP_THREADS']: | ||||
|             return | ||||
|         if datastore.needs_write: | ||||
|             datastore.sync_to_json() | ||||
| 
 | ||||
|         time.sleep(1) | ||||
| 
 | ||||
|  |  | |||
|  | @ -15,37 +15,17 @@ class perform_site_check(): | |||
|         super().__init__(*args, **kwargs) | ||||
|         self.datastore = datastore | ||||
| 
 | ||||
|     def save_firefox_screenshot(self, uuid, output): | ||||
|         # @todo call selenium or whatever | ||||
|         return | ||||
| 
 | ||||
|     def ensure_output_path(self): | ||||
|         try: | ||||
|             os.mkdir(self.output_path) | ||||
|         except FileExistsError: | ||||
|             print (self.output_path, "already exists.") | ||||
| 
 | ||||
| 
 | ||||
|     def save_response_stripped_output(self, output, fname): | ||||
| 
 | ||||
|         with open(fname, 'w') as f: | ||||
|             f.write(output) | ||||
|             f.close() | ||||
| 
 | ||||
|         return fname | ||||
| 
 | ||||
|     def run(self, uuid): | ||||
| 
 | ||||
|         timestamp = int(time.time())  # used for storage etc too | ||||
|         stripped_text_from_html = False | ||||
| 
 | ||||
|         update_obj = {'previous_md5': self.datastore.data['watching'][uuid]['previous_md5'], | ||||
|                       'history': {}, | ||||
|                       "last_checked": timestamp | ||||
|                       } | ||||
| 
 | ||||
|         self.output_path = "{}/{}".format(self.datastore.datastore_path,uuid) | ||||
|         self.ensure_output_path() | ||||
| 
 | ||||
|         extra_headers = self.datastore.get_val(uuid, 'headers') | ||||
| 
 | ||||
|         # Tweak the base config with the per-watch ones | ||||
|  | @ -111,13 +91,5 @@ class perform_site_check(): | |||
|                     update_obj["last_changed"] = timestamp | ||||
| 
 | ||||
|                 update_obj["previous_md5"] = fetched_md5 | ||||
|                 fname = "{}/{}.stripped.txt".format(self.output_path, fetched_md5) | ||||
|                 with open(fname, 'w') as f: | ||||
|                     f.write(stripped_text_from_html) | ||||
|                     f.close() | ||||
| 
 | ||||
|                 # Update history with the stripped text for future reference, this will also mean we save the first | ||||
|                 # Should always be keyed by string(timestamp) | ||||
|                 update_obj.update({"history": {str(timestamp): fname}}) | ||||
| 
 | ||||
|         return update_obj | ||||
|         return update_obj, stripped_text_from_html | ||||
|  |  | |||
|  | @ -7,6 +7,9 @@ from threading import Lock, Thread | |||
| 
 | ||||
| from copy import deepcopy | ||||
| 
 | ||||
| import logging | ||||
| import time | ||||
| import threading | ||||
| 
 | ||||
| # Is there an existing library to ensure some data store (JSON etc) is in sync with CRUD methods? | ||||
| # Open a github issue if you know something :) | ||||
|  | @ -17,7 +20,8 @@ class ChangeDetectionStore: | |||
|     def __init__(self, datastore_path="/datastore"): | ||||
|         self.needs_write = False | ||||
|         self.datastore_path = datastore_path | ||||
| 
 | ||||
|         self.json_store_path = "{}/url-watches.json".format(self.datastore_path) | ||||
|         self.stop_thread = False | ||||
|         self.__data = { | ||||
|             'note': "Hello! If you change this file manually, please be sure to restart your changedetection.io instance!", | ||||
|             'watching': {}, | ||||
|  | @ -59,7 +63,7 @@ class ChangeDetectionStore: | |||
|                 self.__data['build_sha'] = f.read() | ||||
| 
 | ||||
|         try: | ||||
|             with open("{}/url-watches.json".format(self.datastore_path)) as json_file: | ||||
|             with open(self.json_store_path) as json_file: | ||||
|                 from_disk = json.load(json_file) | ||||
| 
 | ||||
|                 # @todo isnt there a way todo this dict.update recursively? | ||||
|  | @ -92,6 +96,9 @@ class ChangeDetectionStore: | |||
|             self.add_watch(url='https://www.gov.uk/coronavirus', tag='Covid') | ||||
|             self.add_watch(url='https://changedetection.io', tag='Tech news') | ||||
| 
 | ||||
|         # Finally start the thread that will manage periodic data saves to JSON | ||||
|         save_data_thread = threading.Thread(target=self.save_datastore).start() | ||||
| 
 | ||||
|     # Returns the newest key, but if theres only 1 record, then it's counted as not being new, so return 0. | ||||
|     def get_newest_history_key(self, uuid): | ||||
|         if len(self.__data['watching'][uuid]['history']) == 1: | ||||
|  | @ -176,16 +183,51 @@ class ChangeDetectionStore: | |||
| 
 | ||||
|             self.data['watching'][new_uuid] = _blank | ||||
| 
 | ||||
|         self.needs_write = True | ||||
|         # Get the directory ready | ||||
|         output_path = "{}/{}".format(self.datastore_path, new_uuid) | ||||
|         try: | ||||
|             os.mkdir(output_path) | ||||
|         except FileExistsError: | ||||
|             print(output_path, "already exists.") | ||||
| 
 | ||||
|         self.sync_to_json() | ||||
|         return new_uuid | ||||
| 
 | ||||
|     def sync_to_json(self): | ||||
|     # Save some text file to the appropriate path and bump the history | ||||
|     # result_obj from fetch_site_status.run() | ||||
|     def save_history_text(self, uuid, result_obj, contents): | ||||
| 
 | ||||
|         with open("{}/url-watches.json".format(self.datastore_path), 'w') as json_file: | ||||
|         output_path = "{}/{}".format(self.datastore_path, uuid) | ||||
|         fname = "{}/{}.stripped.txt".format(output_path, result_obj['previous_md5']) | ||||
|         with open(fname, 'w') as f: | ||||
|             f.write(contents) | ||||
|             f.close() | ||||
| 
 | ||||
|         # Update history with the stripped text for future reference, this will also mean we save the first | ||||
|         # Should always be keyed by string(timestamp) | ||||
|         self.update_watch(uuid, {"history": {str(result_obj["last_checked"]): fname}}) | ||||
| 
 | ||||
|         return fname | ||||
| 
 | ||||
|     def sync_to_json(self): | ||||
|         print ("Saving..") | ||||
|         with open(self.json_store_path, 'w') as json_file: | ||||
|             json.dump(self.__data, json_file, indent=4) | ||||
|             print("Re-saved index") | ||||
|             logging.info("Re-saved index") | ||||
| 
 | ||||
|         self.needs_write = False | ||||
| 
 | ||||
|     # Thread runner, this helps with thread/write issues when there are many operations that want to update the JSON | ||||
|     # by just running periodically in one thread, according to python, dict updates are threadsafe. | ||||
|     def save_datastore(self): | ||||
| 
 | ||||
|         while True: | ||||
|             if self.stop_thread: | ||||
|                 print ("Shutting down datastore thread") | ||||
|                 return | ||||
|             if self.needs_write: | ||||
|                 self.sync_to_json() | ||||
|             time.sleep(1) | ||||
| 
 | ||||
| 
 | ||||
| # body of the constructor | ||||
|  |  | |||
|  | @ -1,44 +0,0 @@ | |||
| #!/usr/bin/python3 | ||||
| 
 | ||||
| import pytest | ||||
| import backend | ||||
| from backend import store | ||||
| import os | ||||
| # https://github.com/pallets/flask/blob/1.1.2/examples/tutorial/tests/test_auth.py | ||||
| 
 | ||||
| # Much better boilerplate than the docs | ||||
| # https://www.python-boilerplate.com/py3+flask+pytest/ | ||||
| 
 | ||||
| @pytest.fixture | ||||
| def app(request): | ||||
| 
 | ||||
| 
 | ||||
|     datastore_path ="./test-datastore" | ||||
|     try: | ||||
|         os.mkdir(datastore_path) | ||||
|     except FileExistsError: | ||||
|         pass | ||||
| 
 | ||||
|     # Kinda weird to tell them both where `datastore_path` is right.. | ||||
|     app_config = {'datastore_path': datastore_path} | ||||
|     datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path']) | ||||
|     app = backend.changedetection_app(app_config, datastore) | ||||
| 
 | ||||
| 
 | ||||
|     app.debug = True | ||||
| 
 | ||||
|     def teardown(): | ||||
|         app.config['STOP_THREADS']=True | ||||
|         print("teardown") | ||||
| 
 | ||||
|     request.addfinalizer(teardown) | ||||
| 
 | ||||
|     return app.test_client() | ||||
| 
 | ||||
| 
 | ||||
| def test_hello_world(app): | ||||
|     res = app.get("/") | ||||
|     # print(dir(res), res.status_code) | ||||
|     assert res.status_code == 200 | ||||
|     assert b"IMPORT" in res.data | ||||
| 
 | ||||
|  | @ -0,0 +1,39 @@ | |||
| #!/usr/bin/python3 | ||||
| 
 | ||||
| import pytest | ||||
| import backend | ||||
| from backend import store | ||||
| import os | ||||
| import time | ||||
| import requests | ||||
| # https://github.com/pallets/flask/blob/1.1.2/examples/tutorial/tests/test_auth.py | ||||
| 
 | ||||
| # Much better boilerplate than the docs | ||||
| # https://www.python-boilerplate.com/py3+flask+pytest/ | ||||
| 
 | ||||
| 
 | ||||
| def test_import(session): | ||||
|     res = session.get("/") | ||||
|     assert b"IMPORT" in res.data | ||||
|     assert res.status_code == 200 | ||||
| 
 | ||||
|     test_url_list = ["https://slashdot.org"] | ||||
|     res = session.post('/import', data={'urls': "\n".join(test_url_list)}, follow_redirects=True) | ||||
|     s = "{} Imported".format(len(test_url_list)) | ||||
| 
 | ||||
|     #p= url_for('test_endpoint', _external=True | ||||
| 
 | ||||
|     assert bytes(s.encode('utf-8')) in res.data | ||||
| 
 | ||||
|     for url in test_url_list: | ||||
|         assert bytes(url.encode('utf-8')) in res.data | ||||
| 
 | ||||
|     #response = requests.get('http://localhost:5000/random_string') | ||||
|     #assert response.status_code == 200 | ||||
|     #assert response.json() == [{'id': 1}] | ||||
| 
 | ||||
| 
 | ||||
| def test_import_a(session): | ||||
|     res = session.get("/") | ||||
|     assert b"IMPORT" in res.data | ||||
|     assert res.status_code == 200 | ||||
		Ładowanie…
	
		Reference in New Issue
	
	 Leigh Morresi
						Leigh Morresi