diff --git a/app/api/tasks.py b/app/api/tasks.py index f35b4204..9f5bf227 100644 --- a/app/api/tasks.py +++ b/app/api/tasks.py @@ -1,25 +1,20 @@ import mimetypes import os +from wsgiref.util import FileWrapper -from django.contrib.gis.db.models import GeometryField -from django.contrib.gis.db.models.functions import Envelope from django.core.exceptions import ObjectDoesNotExist, SuspiciousFileOperation, ValidationError from django.db import transaction -from django.db.models.functions import Cast from django.http import HttpResponse -from wsgiref.util import FileWrapper from rest_framework import status, serializers, viewsets, filters, exceptions, permissions, parsers +from rest_framework.decorators import detail_route from rest_framework.permissions import IsAuthenticatedOrReadOnly from rest_framework.response import Response -from rest_framework.decorators import detail_route from rest_framework.views import APIView -from nodeodm import status_codes -from .common import get_and_check_project, get_tile_json, path_traversal_check - from app import models, pending_actions from nodeodm.models import ProcessingNode from worker import tasks as worker_tasks +from .common import get_and_check_project, get_tile_json, path_traversal_check class TaskIDsSerializer(serializers.BaseSerializer): diff --git a/app/tests/test_api_task.py b/app/tests/test_api_task.py index c2fd31f7..8c9e7427 100644 --- a/app/tests/test_api_task.py +++ b/app/tests/test_api_task.py @@ -178,14 +178,13 @@ class TestApiTask(BootTransactionTestCase): # No UUID at this point self.assertTrue(len(task.uuid) == 0) - # Assign processing node to task via API res = client.patch("/api/projects/{}/tasks/{}/".format(project.id, task.id), { 'processing_node': pnode.id }) self.assertTrue(res.status_code == status.HTTP_200_OK) - # On update scheduler.processing_pending_tasks should have been called in the background + # On update worker.tasks.process_pending_tasks should have been called in the background testWatch.wait_until_call("worker.tasks.process_pending_tasks", timeout=5) # Processing should have started and a UUID is assigned @@ -489,7 +488,7 @@ class TestApiTask(BootTransactionTestCase): task.refresh_from_db() self.assertTrue(task.processing_node is None) - # Bring a proessing node online + # Bring a processing node online pnode.last_refreshed = timezone.now() pnode.save() self.assertTrue(pnode.is_online()) diff --git a/app/testwatch.py b/app/testwatch.py index 1ae0c99a..9d811a7e 100644 --- a/app/testwatch.py +++ b/app/testwatch.py @@ -59,6 +59,9 @@ class TestWatch: def log_call(self, func, *args, **kwargs): fname = TestWatch.func_to_name(func) + self.manual_log_call(fname, *args, **kwargs) + + def manual_log_call(self, fname, *args, **kwargs): logger.info("{} called".format(fname)) list = self.get_calls(fname) list.append({'f': fname, 'args': args, 'kwargs': kwargs}) @@ -100,31 +103,19 @@ class SharedTestWatch(TestWatch): """ :param redis_url same as celery broker URL, for ex. redis://localhost:1234 """ - def needs_redis(func): - # Lazy evaluator for redis instance - def wrapper(self, *args): - if not hasattr(self, 'r'): - self.r = redis.from_url(self.redis_url) - return func(self, *args) - return wrapper - def __init__(self, redis_url): - self.redis_url = redis_url + self.r = redis.from_url(redis_url) super().__init__() - @needs_redis def clear(self): self.r.delete('testwatch:calls', 'testwatch:intercept_list') - @needs_redis def intercept(self, fname, f = None): self.r.hmset('testwatch:intercept_list', {fname: marshal.dumps(f.__code__) if f is not None else 1}) - @needs_redis def intercept_list_has(self, fname): return self.r.hget('testwatch:intercept_list', fname) is not None - @needs_redis def execute_intercept_function_replacement(self, fname, *args, **kwargs): if self.intercept_list_has(fname) and self.r.hget('testwatch:intercept_list', fname) != b'1': # Rebuild function @@ -132,19 +123,17 @@ class SharedTestWatch(TestWatch): f = types.FunctionType(marshal.loads(fcode), globals()) f(*args, **kwargs) - @needs_redis def get_calls(self, fname): value = self.r.hget('testwatch:calls', fname) if value is None: return [] else: return json.loads(value.decode('utf-8')) - @needs_redis def set_calls(self, fname, value): self.r.hmset('testwatch:calls', {fname: json.dumps(value)}) - def watch(**kwargs): - return TestWatch.watch(tw=sharedTestWatch, **kwargs) + # def watch(**kwargs): + # return TestWatch.watch(tw=sharedTestWatch, **kwargs) + testWatch = TestWatch() -sharedTestWatch = SharedTestWatch(settings.CELERY_BROKER_URL) \ No newline at end of file diff --git a/webodm/settings.py b/webodm/settings.py index 41778efb..d6e1ca62 100644 --- a/webodm/settings.py +++ b/webodm/settings.py @@ -322,6 +322,7 @@ CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] CELERY_INCLUDE=['worker.tasks'] +CELERY_TASK_ALWAYS_EAGER = TESTING if TESTING: diff --git a/worker/celery.py b/worker/celery.py index 98871334..4bdd8fe8 100644 --- a/worker/celery.py +++ b/worker/celery.py @@ -33,12 +33,5 @@ app.conf.beat_schedule = { }, } -# We need this for tests -@app.task(name='celery.ping') -def ping(): - # type: () -> str - """Simple task that just returns 'pong'.""" - return 'pong' - if __name__ == '__main__': app.start() \ No newline at end of file diff --git a/worker/tasks.py b/worker/tasks.py index 6bd9846d..1a2189ae 100644 --- a/worker/tasks.py +++ b/worker/tasks.py @@ -12,6 +12,7 @@ from nodeodm.models import ProcessingNode from .celery import app from celery.utils.log import get_task_logger from django.db import transaction +from app.testwatch import testWatch logger = get_task_logger(__name__) @@ -65,6 +66,9 @@ def process_task(taskId): @app.task def process_pending_tasks(): + if settings.TESTING: + testWatch.manual_log_call('worker.tasks.process_pending_tasks') + # All tasks that have a processing node assigned # Or that need one assigned (via auto) # or tasks that need a status update