diff --git a/app/api/tasks.py b/app/api/tasks.py index 9f5bf227..a0bd0993 100644 --- a/app/api/tasks.py +++ b/app/api/tasks.py @@ -80,8 +80,8 @@ class TaskViewSet(viewsets.ViewSet): task.last_error = None task.save() - # Process pending tasks without waiting for the scheduler (speed things up) - worker_tasks.process_pending_tasks.delay() + # Process task right away + worker_tasks.process_task.delay(task.id) return Response({'success': True}) @@ -176,8 +176,8 @@ class TaskViewSet(viewsets.ViewSet): serializer.is_valid(raise_exception=True) serializer.save() - # Process pending tasks without waiting for the scheduler (speed things up) - worker_tasks.process_pending_tasks.delay() + # Process task right away + worker_tasks.process_task.delay(task.id) return Response(serializer.data) diff --git a/app/tests/test_api.py b/app/tests/test_api.py index ffddc3c9..0802b72f 100644 --- a/app/tests/test_api.py +++ b/app/tests/test_api.py @@ -182,14 +182,16 @@ class TestApi(BootTestCase): res = client.post('/api/projects/{}/tasks/{}/cancel/'.format(project.id, task.id)) self.assertTrue(res.data["success"]) task.refresh_from_db() - self.assertTrue(task.last_error is None) - self.assertTrue(task.pending_action == pending_actions.CANCEL) + + # Task should have failed to be canceled + self.assertTrue("has no processing node or UUID" in task.last_error) res = client.post('/api/projects/{}/tasks/{}/restart/'.format(project.id, task.id)) self.assertTrue(res.data["success"]) task.refresh_from_db() - self.assertTrue(task.last_error is None) - self.assertTrue(task.pending_action == pending_actions.RESTART) + + # Task should have failed to be restarted + self.assertTrue("has no processing node" in task.last_error) # Cannot cancel, restart or delete a task for which we don't have permission for action in ['cancel', 'remove', 'restart']: @@ -199,10 +201,9 @@ class TestApi(BootTestCase): # Can delete res = client.post('/api/projects/{}/tasks/{}/remove/'.format(project.id, task.id)) self.assertTrue(res.data["success"]) - task.refresh_from_db() - self.assertTrue(task.last_error is None) - self.assertTrue(task.pending_action == pending_actions.REMOVE) + self.assertFalse(Task.objects.filter(id=task.id).exists()) + task = Task.objects.create(project=project) temp_project = Project.objects.create(owner=user) # We have permissions to do anything on a project that we own diff --git a/app/tests/test_api_preset.py b/app/tests/test_api_preset.py index 3c0fd46c..d857b310 100644 --- a/app/tests/test_api_preset.py +++ b/app/tests/test_api_preset.py @@ -53,7 +53,7 @@ class TestApiPreset(BootTestCase): self.assertTrue(res.status_code == status.HTTP_200_OK) # Only ours and global presets are available - self.assertTrue(len(res.data) == 6) + self.assertTrue(len(res.data) == 7) self.assertTrue('My Local Preset' in [preset['name'] for preset in res.data]) self.assertTrue('High Quality' in [preset['name'] for preset in res.data]) self.assertTrue('Global Preset #1' in [preset['name'] for preset in res.data]) diff --git a/app/tests/test_api_task.py b/app/tests/test_api_task.py index 8c9e7427..c35f509a 100644 --- a/app/tests/test_api_task.py +++ b/app/tests/test_api_task.py @@ -12,7 +12,6 @@ from rest_framework import status from rest_framework.test import APIClient import worker -from app import pending_actions from django.utils import timezone from app.models import Project, Task, ImageUpload from app.models.task import task_directory_path, full_task_directory_path @@ -174,10 +173,9 @@ class TestApiTask(BootTransactionTestCase): }) self.assertTrue(res.status_code == status.HTTP_404_NOT_FOUND) - testWatch.clear() - # No UUID at this point self.assertTrue(len(task.uuid) == 0) + # Assign processing node to task via API res = client.patch("/api/projects/{}/tasks/{}/".format(project.id, task.id), { 'processing_node': pnode.id @@ -185,7 +183,7 @@ class TestApiTask(BootTransactionTestCase): self.assertTrue(res.status_code == status.HTTP_200_OK) # On update worker.tasks.process_pending_tasks should have been called in the background - testWatch.wait_until_call("worker.tasks.process_pending_tasks", timeout=5) + # (during tests this is sync) # Processing should have started and a UUID is assigned task.refresh_from_db() @@ -275,16 +273,15 @@ class TestApiTask(BootTransactionTestCase): testWatch.clear() res = client.post("/api/projects/{}/tasks/{}/restart/".format(project.id, task.id)) self.assertTrue(res.status_code == status.HTTP_200_OK) - testWatch.wait_until_call("worker.tasks.process_pending_tasks", timeout=5) + # process_task is called in the background task.refresh_from_db() self.assertTrue(task.status in [status_codes.RUNNING, status_codes.COMPLETED]) # Cancel a task - testWatch.clear() res = client.post("/api/projects/{}/tasks/{}/cancel/".format(project.id, task.id)) self.assertTrue(res.status_code == status.HTTP_200_OK) - testWatch.wait_until_call("worker.tasks.process_pending_tasks", timeout=5) + # task is processed right away # Should have been canceled task.refresh_from_db() @@ -293,7 +290,7 @@ class TestApiTask(BootTransactionTestCase): # Remove a task res = client.post("/api/projects/{}/tasks/{}/remove/".format(project.id, task.id)) self.assertTrue(res.status_code == status.HTTP_200_OK) - testWatch.wait_until_call("worker.tasks.process_pending_tasks", 2, timeout=5) + # task is processed right away # Has been removed along with assets self.assertFalse(Task.objects.filter(pk=task.id).exists()) @@ -302,9 +299,6 @@ class TestApiTask(BootTransactionTestCase): task_assets_path = os.path.join(settings.MEDIA_ROOT, task_directory_path(task.id, task.project.id)) self.assertFalse(os.path.exists(task_assets_path)) - testWatch.clear() - testWatch.intercept("worker.tasks.process_pending_tasks") - # Create a task, then kill the processing node res = client.post("/api/projects/{}/tasks/".format(project.id), { 'images': [image1, image2], @@ -334,11 +328,8 @@ class TestApiTask(BootTransactionTestCase): res = client.post("/api/projects/{}/tasks/{}/restart/".format(project.id, task.id)) self.assertTrue(res.status_code == status.HTTP_200_OK) task.refresh_from_db() - self.assertTrue(task.pending_action == pending_actions.RESTART) # After processing, the task should have restarted, and have no UUID or status - worker.tasks.process_pending_tasks() - task.refresh_from_db() self.assertTrue(task.status is None) self.assertTrue(len(task.uuid) == 0) @@ -368,12 +359,9 @@ class TestApiTask(BootTransactionTestCase): # 3. Restart the task res = client.post("/api/projects/{}/tasks/{}/restart/".format(project.id, task.id)) self.assertTrue(res.status_code == status.HTTP_200_OK) - task.refresh_from_db() - self.assertTrue(task.pending_action == pending_actions.RESTART) # 4. Check that the rerun_from parameter has been cleared # but the other parameters are still set - worker.tasks.process_pending_tasks() task.refresh_from_db() self.assertTrue(len(task.uuid) == 0) self.assertTrue(len(list(filter(lambda d: d['name'] == 'rerun-from', task.options))) == 0) diff --git a/app/tests/test_testwatch.py b/app/tests/test_testwatch.py index 6a286913..3268823d 100644 --- a/app/tests/test_testwatch.py +++ b/app/tests/test_testwatch.py @@ -1,7 +1,5 @@ from django.test import TestCase -from webodm.settings import CELERY_BROKER_URL - -from app.testwatch import TestWatch, SharedTestWatch +from app.testwatch import TestWatch def test(a, b): @@ -9,51 +7,47 @@ def test(a, b): class TestTestWatch(TestCase): def test_methods(self): + tw = TestWatch() + self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 0) + self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.nonexistent") == 0) - def test_watch_instance(tw): - self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 0) - self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.nonexistent") == 0) + # Test watch count + tw.hook_pre(test, 1, 2) + test(1, 2) + tw.hook_post(test, 1, 2) - # Test watch count - tw.hook_pre(test, 1, 2) - test(1, 2) - tw.hook_post(test, 1, 2) + self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 1) - self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 1) + tw.hook_pre(test, 1, 2) + test(1, 2) + tw.hook_post(test, 1, 2) - tw.hook_pre(test, 1, 2) - test(1, 2) - tw.hook_post(test, 1, 2) + self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 2) - self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 2) + @TestWatch.watch(testWatch=tw) + def test2(d): + d['flag'] = not d['flag'] - @TestWatch.watch(testWatch=tw) - def test2(d): - d['flag'] = not d['flag'] + # Test intercept + tw.intercept("app.tests.test_testwatch.test2") + d = {'flag': True} + test2(d) + self.assertTrue(d['flag']) - # Test intercept - tw.intercept("app.tests.test_testwatch.test2") - d = {'flag': True} - test2(d) - self.assertTrue(d['flag']) + # Test function replacement intercept + d = { + 'a': False, + 'b': False + } + @TestWatch.watch(testWatch=tw) + def test3(d): + d['a'] = True - # Test function replacement intercept - d = { - 'a': False, - 'b': False - } - @TestWatch.watch(testWatch=tw) - def test3(d): - d['a'] = True + def replacement(d): + d['b'] = True - def replacement(d): - d['b'] = True - - tw.intercept("app.tests.test_testwatch.test3", replacement) - test3(d) - self.assertFalse(d['a']) - self.assertTrue(d['b']) - - test_watch_instance(TestWatch()) - test_watch_instance(SharedTestWatch(CELERY_BROKER_URL)) + tw.intercept("app.tests.test_testwatch.test3", replacement) + test3(d) + self.assertFalse(d['a']) + self.assertTrue(d['b']) diff --git a/app/testwatch.py b/app/testwatch.py index 9d811a7e..3f54a888 100644 --- a/app/testwatch.py +++ b/app/testwatch.py @@ -1,12 +1,6 @@ -import time, redis +import time import logging - -import marshal -import types - -import json - from webodm import settings logger = logging.getLogger('app.logger') @@ -94,46 +88,4 @@ class TestWatch: return wrapper return outer -""" -Redis-backed test watch -suitable for cross-machine/cross-process -test watching -""" -class SharedTestWatch(TestWatch): - """ - :param redis_url same as celery broker URL, for ex. redis://localhost:1234 - """ - def __init__(self, redis_url): - self.r = redis.from_url(redis_url) - super().__init__() - - def clear(self): - self.r.delete('testwatch:calls', 'testwatch:intercept_list') - - def intercept(self, fname, f = None): - self.r.hmset('testwatch:intercept_list', {fname: marshal.dumps(f.__code__) if f is not None else 1}) - - def intercept_list_has(self, fname): - return self.r.hget('testwatch:intercept_list', fname) is not None - - def execute_intercept_function_replacement(self, fname, *args, **kwargs): - if self.intercept_list_has(fname) and self.r.hget('testwatch:intercept_list', fname) != b'1': - # Rebuild function - fcode = self.r.hget('testwatch:intercept_list', fname) - f = types.FunctionType(marshal.loads(fcode), globals()) - f(*args, **kwargs) - - def get_calls(self, fname): - value = self.r.hget('testwatch:calls', fname) - if value is None: return [] - else: - return json.loads(value.decode('utf-8')) - - def set_calls(self, fname, value): - self.r.hmset('testwatch:calls', {fname: json.dumps(value)}) - - # def watch(**kwargs): - # return TestWatch.watch(tw=sharedTestWatch, **kwargs) - - testWatch = TestWatch() diff --git a/webodm/settings.py b/webodm/settings.py index d6e1ca62..594c95e7 100644 --- a/webodm/settings.py +++ b/webodm/settings.py @@ -322,8 +322,8 @@ CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] CELERY_INCLUDE=['worker.tasks'] -CELERY_TASK_ALWAYS_EAGER = TESTING - +if TESTING: + CELERY_TASK_ALWAYS_EAGER = True if TESTING: MEDIA_ROOT = os.path.join(BASE_DIR, 'app', 'media_test') diff --git a/worker/tasks.py b/worker/tasks.py index 1a2189ae..c39b7ab3 100644 --- a/worker/tasks.py +++ b/worker/tasks.py @@ -12,7 +12,6 @@ from nodeodm.models import ProcessingNode from .celery import app from celery.utils.log import get_task_logger from django.db import transaction -from app.testwatch import testWatch logger = get_task_logger(__name__) @@ -66,22 +65,16 @@ def process_task(taskId): @app.task def process_pending_tasks(): - if settings.TESTING: - testWatch.manual_log_call('worker.tasks.process_pending_tasks') - # All tasks that have a processing node assigned # Or that need one assigned (via auto) # or tasks that need a status update # or tasks that have a pending action # and that are not locked (being processed by another thread) - qs = Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True) | + tasks = Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True) | Q(Q(status=None) | Q(status__in=[status_codes.QUEUED, status_codes.RUNNING]), processing_node__isnull=False) | Q(pending_action__isnull=False)).exclude(Q(processing_lock=True)) - tasks = list(qs) - - if len(qs) > 0: - for task in tasks: - process_task.delay(task.id) + for task in tasks: + process_task.delay(task.id)