From 26339e0c326b435b9866472814cb7d918c97d80e Mon Sep 17 00:00:00 2001 From: Piero Toffanin Date: Sun, 5 Feb 2017 22:23:02 -0500 Subject: [PATCH] Moved @background decorator in separate module, added proof of concept intercept class --- app/background.py | 55 ++++++++++++++++++++++++++++++++++++++ app/scheduler.py | 46 +++++++------------------------ app/tests/classes.py | 1 + app/tests/test_api_task.py | 12 ++++++--- nodeodm/api_client.py | 5 ++-- 5 files changed, 77 insertions(+), 42 deletions(-) create mode 100644 app/background.py diff --git a/app/background.py b/app/background.py new file mode 100644 index 00000000..0d28f424 --- /dev/null +++ b/app/background.py @@ -0,0 +1,55 @@ +from threading import Thread +from django import db +from webodm import settings + + +# TODO: design class such that: +# 1. test cases can choose which functions to intercept (prevent from executing) +# 2. test cases can see how many times a function has been called (and with which parameters) +# 3. test cases can pause until a function has been called +class TestWatch: + stats = {} + + def called(self, func, *args, **kwargs): + list = TestWatch.stats[func] if func in TestWatch.stats else [] + list.append({'f': func, 'args': args, 'kwargs': kwargs}) + print(list) + + def clear(self): + TestWatch.stats = {} + +testWatch = TestWatch() + +def background(func): + """ + Adds background={True|False} param to any function + so that we can call update_nodes_info(background=True) from the outside + """ + def wrapper(*args,**kwargs): + background = kwargs.get('background', False) + if 'background' in kwargs: del kwargs['background'] + + if background: + if settings.TESTING: + # During testing, intercept all background requests and execute them on the same thread + testWatch.called(func.__name__, *args, **kwargs) + + # Create a function that closes all + # db connections at the end of the thread + # This is necessary to make sure we don't leave + # open connections lying around. + def execute_and_close_db(): + ret = None + try: + ret = func(*args, **kwargs) + finally: + db.connections.close_all() + return ret + + t = Thread(target=execute_and_close_db) + t.daemon = True + t.start() + return t + else: + return func(*args, **kwargs) + return wrapper \ No newline at end of file diff --git a/app/scheduler.py b/app/scheduler.py index 12607acf..616a6731 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -1,15 +1,17 @@ import logging import traceback +from multiprocessing.dummy import Pool as ThreadPool +from threading import Lock -from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError -from threading import Thread, Lock -from multiprocessing.dummy import Pool as ThreadPool -from nodeodm.models import ProcessingNode -from app.models import Task, Project -from django.db.models import Q, Count +from apscheduler.schedulers.background import BackgroundScheduler from django import db +from django.db.models import Q, Count + +from app.models import Task, Project from nodeodm import status_codes +from nodeodm.models import ProcessingNode +from app.background import background logger = logging.getLogger('app.logger') scheduler = BackgroundScheduler({ @@ -17,36 +19,6 @@ scheduler = BackgroundScheduler({ 'apscheduler.job_defaults.max_instances': '3', }) -def background(func): - """ - Adds background={True|False} param to any function - so that we can call update_nodes_info(background=True) from the outside - """ - def wrapper(*args,**kwargs): - background = kwargs.get('background', False) - if 'background' in kwargs: del kwargs['background'] - - if background: - # Create a function that closes all - # db connections at the end of the thread - # This is necessary to make sure we don't leave - # open connections lying around. - def execute_and_close_db(): - ret = None - try: - ret = func(*args, **kwargs) - finally: - db.connections.close_all() - return ret - - t = Thread(target=execute_and_close_db) - t.start() - return t - else: - return func(*args, **kwargs) - return wrapper - - @background def update_nodes_info(): processing_nodes = ProcessingNode.objects.all() @@ -86,6 +58,8 @@ def process_pending_tasks(): task.save() except Exception as e: logger.error("Uncaught error: {} {}".format(e, traceback.format_exc())) + finally: + db.connections.close_all() if tasks.count() > 0: pool = ThreadPool(tasks.count()) diff --git a/app/tests/classes.py b/app/tests/classes.py index fb5b139b..32435cf9 100644 --- a/app/tests/classes.py +++ b/app/tests/classes.py @@ -56,6 +56,7 @@ class BootTestCase(TestCase): def tearDownClass(cls): super(BootTestCase, cls).tearDownClass() + class BootTransactionTestCase(TransactionTestCase): ''' Same as above, but inherits from TransactionTestCase diff --git a/app/tests/test_api_task.py b/app/tests/test_api_task.py index b4726acd..c5b9f59d 100644 --- a/app/tests/test_api_task.py +++ b/app/tests/test_api_task.py @@ -15,8 +15,10 @@ from nodeodm import status_codes from nodeodm.models import ProcessingNode # We need to test the task API in a TransactionTestCase because -# processing happens on a separate thread. This is required by Django. -class TestApi(BootTransactionTestCase): +# task processing happens on a separate thread, and normal TestCases +# do not commit changes to the DB, so spawning a new thread will show no +# data in it. +class TestApiTask(BootTransactionTestCase): def test_task(self): DELAY = 1 # time to sleep for during process launch, background processing, etc. client = APIClient() @@ -167,6 +169,10 @@ class TestApi(BootTransactionTestCase): # On update scheduler.processing_pending_tasks should have been called in the background time.sleep(DELAY) + print("HERE") + from app.background import testWatch + print(testWatch.stats) + # Processing should have completed task.refresh_from_db() self.assertTrue(task.status == status_codes.RUNNING) @@ -190,4 +196,4 @@ class TestApi(BootTransactionTestCase): # Teardown processing node node_odm.terminate() - time.sleep(20) \ No newline at end of file + #time.sleep(20) diff --git a/nodeodm/api_client.py b/nodeodm/api_client.py index 13a060a2..046e9e01 100644 --- a/nodeodm/api_client.py +++ b/nodeodm/api_client.py @@ -43,7 +43,7 @@ class ApiClient: return requests.post(self.url('/task/restart'), data={'uuid': uuid}, timeout=TIMEOUT).json() def task_download(self, uuid, asset): - res = requests.get(self.url('/task/{}/download/{}').format(uuid, asset), stream=True, timeout=TIMEOUT) + res = requests.get(self.url('/task/{}/download/{}').format(uuid, asset), stream=True) if "Content-Type" in res.headers and "application/json" in res.headers['Content-Type']: return res.json() else: @@ -62,5 +62,4 @@ class ApiClient: ) for image in images] return requests.post(self.url("/task/new"), files=files, - data={'name': name, 'options': json.dumps(options)}, - timeout=TIMEOUT).json() + data={'name': name, 'options': json.dumps(options)}).json()