From 22c3e66c02f9bc8b9adf39c132ee47b2534e4bfa Mon Sep 17 00:00:00 2001 From: Piero Toffanin Date: Thu, 15 Feb 2018 16:23:29 -0500 Subject: [PATCH] Removed scheduler code, background decorator, added Celery workers, migrated code, added SharedTestWatch --- .gitignore | 1 + app/api/tasks.py | 11 +++-- app/background.py | 40 --------------- app/boot.py | 10 ++-- app/models/project.py | 2 +- app/models/task.py | 4 +- app/scheduler.py | 96 ------------------------------------ app/tests/test_api_task.py | 1 - app/tests/test_app.py | 12 +---- app/tests/test_testwatch.py | 74 +++++++++++++-------------- app/testwatch.py | 71 +++++++++++++++++++++----- app/urls.py | 5 +- docker-compose.yml | 2 + webodm/settings.py | 10 ++++ worker/start.sh => worker.sh | 28 ++++++++--- worker/Dockerfile | 21 -------- worker/celery.py | 31 +++++++++++- worker/celeryconfig.py | 9 ---- worker/requirements.txt | 2 - worker/tasks.py | 80 +++++++++++++++++++++++++++--- 20 files changed, 256 insertions(+), 254 deletions(-) delete mode 100644 app/background.py delete mode 100644 app/scheduler.py rename worker/start.sh => worker.sh (72%) delete mode 100644 worker/Dockerfile delete mode 100644 worker/celeryconfig.py delete mode 100644 worker/requirements.txt diff --git a/.gitignore b/.gitignore index d7660349..8b6d21f9 100644 --- a/.gitignore +++ b/.gitignore @@ -75,6 +75,7 @@ target/ # celery beat schedule file celerybeat-schedule +celerybeat.pid # dotenv .env diff --git a/app/api/tasks.py b/app/api/tasks.py index 87d7b469..f35b4204 100644 --- a/app/api/tasks.py +++ b/app/api/tasks.py @@ -17,8 +17,9 @@ from rest_framework.views import APIView from nodeodm import status_codes from .common import get_and_check_project, get_tile_json, path_traversal_check -from app import models, scheduler, pending_actions +from app import models, pending_actions from nodeodm.models import ProcessingNode +from worker import tasks as worker_tasks class TaskIDsSerializer(serializers.BaseSerializer): @@ -84,8 +85,8 @@ class TaskViewSet(viewsets.ViewSet): task.last_error = None task.save() - # Call the scheduler (speed things up) - scheduler.process_pending_tasks(background=True) + # Process pending tasks without waiting for the scheduler (speed things up) + worker_tasks.process_pending_tasks.delay() return Response({'success': True}) @@ -180,8 +181,8 @@ class TaskViewSet(viewsets.ViewSet): serializer.is_valid(raise_exception=True) serializer.save() - # Call the scheduler (speed things up) - scheduler.process_pending_tasks(background=True) + # Process pending tasks without waiting for the scheduler (speed things up) + worker_tasks.process_pending_tasks.delay() return Response(serializer.data) diff --git a/app/background.py b/app/background.py deleted file mode 100644 index 6f1745cf..00000000 --- a/app/background.py +++ /dev/null @@ -1,40 +0,0 @@ -from threading import Thread - -import logging -from django import db -from app.testwatch import testWatch - -logger = logging.getLogger('app.logger') - -def background(func): - """ - Adds background={True|False} param to any function - so that we can call update_nodes_info(background=True) from the outside - """ - def wrapper(*args,**kwargs): - background = kwargs.get('background', False) - if 'background' in kwargs: del kwargs['background'] - - if background: - if testWatch.hook_pre(func, *args, **kwargs): return - - # Create a function that closes all - # db connections at the end of the thread - # This is necessary to make sure we don't leave - # open connections lying around. - def execute_and_close_db(): - ret = None - try: - ret = func(*args, **kwargs) - finally: - db.connections.close_all() - testWatch.hook_post(func, *args, **kwargs) - return ret - - t = Thread(target=execute_and_close_db) - t.daemon = True - t.start() - return t - else: - return func(*args, **kwargs) - return wrapper \ No newline at end of file diff --git a/app/boot.py b/app/boot.py index 8d30e825..6054924a 100644 --- a/app/boot.py +++ b/app/boot.py @@ -7,13 +7,14 @@ from django.core.files import File from django.db.utils import ProgrammingError from guardian.shortcuts import assign_perm +from worker import tasks as worker_tasks from app.models import Preset from app.models import Theme from app.plugins import register_plugins from nodeodm.models import ProcessingNode # noinspection PyUnresolvedReferences from webodm.settings import MEDIA_ROOT -from . import scheduler, signals +from . import signals import logging from .models import Task, Setting from webodm import settings @@ -22,7 +23,7 @@ from webodm.wsgi import booted def boot(): # booted is a shared memory variable to keep track of boot status - # as multiple workers could trigger the boot sequence twice + # as multiple gunicorn workers could trigger the boot sequence twice if not settings.DEBUG and booted.value: return booted.value = True @@ -92,10 +93,7 @@ def boot(): register_plugins() if not settings.TESTING: - # Setup and start scheduler - scheduler.setup() - - scheduler.update_nodes_info(background=True) + worker_tasks.update_nodes_info.delay() except ProgrammingError: logger.warning("Could not touch the database. If running a migration, this is expected.") \ No newline at end of file diff --git a/app/models/project.py b/app/models/project.py index 51cf4047..a8b1ab18 100644 --- a/app/models/project.py +++ b/app/models/project.py @@ -32,7 +32,7 @@ class Project(models.Model): super().delete(*args) else: # Need to remove all tasks before we can remove this project - # which will be deleted on the scheduler after pending actions + # which will be deleted by workers after pending actions # have been completed self.task_set.update(pending_action=pending_actions.REMOVE) self.deleting = True diff --git a/app/models/task.py b/app/models/task.py index 16ed2954..4808b561 100644 --- a/app/models/task.py +++ b/app/models/task.py @@ -109,7 +109,7 @@ class Task(models.Model): # mission created_at = models.DateTimeField(default=timezone.now, help_text="Creation date") - pending_action = models.IntegerField(choices=PENDING_ACTIONS, db_index=True, null=True, blank=True, help_text="A requested action to be performed on the task. The selected action will be performed by the scheduler at the next iteration.") + pending_action = models.IntegerField(choices=PENDING_ACTIONS, db_index=True, null=True, blank=True, help_text="A requested action to be performed on the task. The selected action will be performed by the worker at the next iteration.") public = models.BooleanField(default=False, help_text="A flag indicating whether this task is available to the public") @@ -221,7 +221,7 @@ class Task(models.Model): def process(self): """ This method contains the logic for processing tasks asynchronously - from a background thread or from the scheduler. Here tasks that are + from a background thread or from a worker. Here tasks that are ready to be processed execute some logic. This could be communication with a processing node or executing a pending action. """ diff --git a/app/scheduler.py b/app/scheduler.py deleted file mode 100644 index 20337138..00000000 --- a/app/scheduler.py +++ /dev/null @@ -1,96 +0,0 @@ -import logging -import traceback -from multiprocessing.dummy import Pool as ThreadPool -from threading import Lock - -from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError -from apscheduler.schedulers.background import BackgroundScheduler -from django import db -from django.db.models import Q, Count -from webodm import settings - -from app.models import Task, Project -from nodeodm import status_codes -from nodeodm.models import ProcessingNode -from app.background import background - -logger = logging.getLogger('app.logger') -scheduler = BackgroundScheduler({ - 'apscheduler.job_defaults.coalesce': 'true', - 'apscheduler.job_defaults.max_instances': '3', -}) - -@background -def update_nodes_info(): - processing_nodes = ProcessingNode.objects.all() - for processing_node in processing_nodes: - processing_node.update_node_info() - -tasks_mutex = Lock() - -@background -def process_pending_tasks(): - tasks = [] - try: - tasks_mutex.acquire() - - # All tasks that have a processing node assigned - # Or that need one assigned (via auto) - # or tasks that need a status update - # or tasks that have a pending action - # and that are not locked (being processed by another thread) - tasks = Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True) | - Q(Q(status=None) | Q(status__in=[status_codes.QUEUED, status_codes.RUNNING]), processing_node__isnull=False) | - Q(pending_action__isnull=False)).exclude(Q(processing_lock=True)) - for task in tasks: - task.processing_lock = True - task.save() - finally: - tasks_mutex.release() - - def process(task): - try: - task.process() - except Exception as e: - logger.error("Uncaught error! This is potentially bad. Please report it to http://github.com/OpenDroneMap/WebODM/issues: {} {}".format(e, traceback.format_exc())) - if settings.TESTING: raise e - finally: - # Might have been deleted - if task.pk is not None: - task.processing_lock = False - task.save() - - db.connections.close_all() - - if tasks.count() > 0: - pool = ThreadPool(tasks.count()) - pool.map(process, tasks, chunksize=1) - pool.close() - pool.join() - - -def cleanup_projects(): - # Delete all projects that are marked for deletion - # and that have no tasks left - total, count_dict = Project.objects.filter(deleting=True).annotate( - tasks_count=Count('task') - ).filter(tasks_count=0).delete() - if total > 0 and 'app.Project' in count_dict: - logger.info("Deleted {} projects".format(count_dict['app.Project'])) - -def setup(): - try: - scheduler.start() - scheduler.add_job(update_nodes_info, 'interval', seconds=30) - scheduler.add_job(process_pending_tasks, 'interval', seconds=5) - scheduler.add_job(cleanup_projects, 'interval', seconds=60) - except SchedulerAlreadyRunningError: - logger.warning("Scheduler already running (this is OK while testing)") - -def teardown(): - logger.info("Stopping scheduler...") - try: - scheduler.shutdown() - logger.info("Scheduler stopped") - except SchedulerNotRunningError: - logger.warning("Scheduler not running") diff --git a/app/tests/test_api_task.py b/app/tests/test_api_task.py index 4e7d6ef7..0e8d8afe 100644 --- a/app/tests/test_api_task.py +++ b/app/tests/test_api_task.py @@ -16,7 +16,6 @@ from rest_framework import status from rest_framework.test import APIClient from app import pending_actions -from app import scheduler from django.utils import timezone from app.models import Project, Task, ImageUpload from app.models.task import task_directory_path, full_task_directory_path diff --git a/app/tests/test_app.py b/app/tests/test_app.py index caacc30f..e9c51b4a 100644 --- a/app/tests/test_app.py +++ b/app/tests/test_app.py @@ -201,15 +201,7 @@ class TestApp(BootTestCase): self.assertRaises(ValidationError, task.save) - def test_scheduler(self): - self.assertTrue(scheduler.setup() is None) - - # Can call update_nodes_info() - self.assertTrue(scheduler.update_nodes_info() is None) - - # Can call function in background - self.assertTrue(scheduler.update_nodes_info(background=True).join() is None) - - self.assertTrue(scheduler.teardown() is None) + def test_worker(self): + self.assertTrue(True) # TODO!!! diff --git a/app/tests/test_testwatch.py b/app/tests/test_testwatch.py index 969b6a41..6a286913 100644 --- a/app/tests/test_testwatch.py +++ b/app/tests/test_testwatch.py @@ -1,6 +1,7 @@ from django.test import TestCase +from webodm.settings import CELERY_BROKER_URL -from app.testwatch import TestWatch +from app.testwatch import TestWatch, SharedTestWatch def test(a, b): @@ -8,50 +9,51 @@ def test(a, b): class TestTestWatch(TestCase): def test_methods(self): - tw = TestWatch() - self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 0) - self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.nonexistent") == 0) + def test_watch_instance(tw): + self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 0) + self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.nonexistent") == 0) - # Test watch count - tw.hook_pre(test, 1, 2) - test(1, 2) - tw.hook_post(test, 1, 2) + # Test watch count + tw.hook_pre(test, 1, 2) + test(1, 2) + tw.hook_post(test, 1, 2) - self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 1) + self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 1) - tw.hook_pre(test, 1, 2) - test(1, 2) - tw.hook_post(test, 1, 2) + tw.hook_pre(test, 1, 2) + test(1, 2) + tw.hook_post(test, 1, 2) - self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 2) + self.assertTrue(tw.get_calls_count("app.tests.test_testwatch.test") == 2) - @TestWatch.watch(testWatch=tw) - def test2(d): - d['flag'] = not d['flag'] + @TestWatch.watch(testWatch=tw) + def test2(d): + d['flag'] = not d['flag'] - # Test intercept - tw.intercept("app.tests.test_testwatch.test2") - d = {'flag': True} - test2(d) - self.assertTrue(d['flag']) + # Test intercept + tw.intercept("app.tests.test_testwatch.test2") + d = {'flag': True} + test2(d) + self.assertTrue(d['flag']) - # Test function replacement intercept - d = { - 'a': False, - 'b': False - } - @TestWatch.watch(testWatch=tw) - def test3(d): - d['a'] = True + # Test function replacement intercept + d = { + 'a': False, + 'b': False + } + @TestWatch.watch(testWatch=tw) + def test3(d): + d['a'] = True - def replacement(d): - d['b'] = True - - tw.intercept("app.tests.test_testwatch.test3", replacement) - test3(d) - self.assertFalse(d['a']) - self.assertTrue(d['b']) + def replacement(d): + d['b'] = True + tw.intercept("app.tests.test_testwatch.test3", replacement) + test3(d) + self.assertFalse(d['a']) + self.assertTrue(d['b']) + test_watch_instance(TestWatch()) + test_watch_instance(SharedTestWatch(CELERY_BROKER_URL)) diff --git a/app/testwatch.py b/app/testwatch.py index 4631eef1..46c2de71 100644 --- a/app/testwatch.py +++ b/app/testwatch.py @@ -1,7 +1,12 @@ -import time +import time, redis import logging +import marshal +import types + +import json + from webodm import settings logger = logging.getLogger('app.logger') @@ -10,26 +15,32 @@ class TestWatch: def __init__(self): self.clear() + def func_to_name(f): + return "{}.{}".format(f.__module__, f.__name__) + def clear(self): self._calls = {} self._intercept_list = {} - def func_to_name(f): - return "{}.{}".format(f.__module__, f.__name__) - def intercept(self, fname, f = None): self._intercept_list[fname] = f if f is not None else True - def execute_intercept_function_replacement(self, fname, *args, **kwargs): - if fname in self._intercept_list and callable(self._intercept_list[fname]): - (self._intercept_list[fname])(*args, **kwargs) + def intercept_list_has(self, fname): + return fname in self._intercept_list - def should_prevent_execution(self, func): - return TestWatch.func_to_name(func) in self._intercept_list + def execute_intercept_function_replacement(self, fname, *args, **kwargs): + if self.intercept_list_has(fname) and callable(self._intercept_list[fname]): + (self._intercept_list[fname])(*args, **kwargs) def get_calls(self, fname): return self._calls[fname] if fname in self._calls else [] + def set_calls(self, fname, value): + self._calls[fname] = value + + def should_prevent_execution(self, func): + return self.intercept_list_has(TestWatch.func_to_name(func)) + def get_calls_count(self, fname): return len(self.get_calls(fname)) @@ -49,9 +60,9 @@ class TestWatch: def log_call(self, func, *args, **kwargs): fname = TestWatch.func_to_name(func) logger.info("{} called".format(fname)) - list = self._calls[fname] if fname in self._calls else [] + list = self.get_calls(fname) list.append({'f': fname, 'args': args, 'kwargs': kwargs}) - self._calls[fname] = list + self.set_calls(fname, list) def hook_pre(self, func, *args, **kwargs): if settings.TESTING and self.should_prevent_execution(func): @@ -80,4 +91,42 @@ class TestWatch: return wrapper return outer +""" +Redis-backed test watch +suitable for cross-machine/cross-process +test watching +""" +class SharedTestWatch(TestWatch): + """ + :param redis_url same as celery broker URL, for ex. redis://localhost:1234 + """ + def __init__(self, redis_url): + self.r = redis.from_url(redis_url) + super().__init__() + + def clear(self): + self.r.delete('testwatch:calls', 'testwatch:intercept_list') + + def intercept(self, fname, f = None): + self.r.hmset('testwatch:intercept_list', {fname: marshal.dumps(f.__code__) if f is not None else 1}) + + def intercept_list_has(self, fname): + return self.r.hget('testwatch:intercept_list', fname) is not None + + def execute_intercept_function_replacement(self, fname, *args, **kwargs): + if self.intercept_list_has(fname) and self.r.hget('testwatch:intercept_list', fname) != b'1': + # Rebuild function + fcode = self.r.hget('testwatch:intercept_list', fname) + f = types.FunctionType(marshal.loads(fcode), globals()) + f(*args, **kwargs) + + def get_calls(self, fname): + value = self.r.hget('testwatch:calls', fname) + if value is None: return [] + else: + return json.loads(value.decode('utf-8')) + + def set_calls(self, fname, value): + self.r.hmset('testwatch:calls', {fname: json.dumps(value)}) + testWatch = TestWatch() \ No newline at end of file diff --git a/app/urls.py b/app/urls.py index 82b72e40..2f3f2e2a 100644 --- a/app/urls.py +++ b/app/urls.py @@ -1,3 +1,4 @@ +import sys from django.conf.urls import url, include from .views import app as app_views, public as public_views @@ -30,5 +31,7 @@ urlpatterns = [ urlpatterns += get_url_patterns() # Test cases call boot() independently -if not settings.TESTING: +# Also don't execute boot with celery workers +celery_running = sys.argv[2:3] == ["worker"] +if not celery_running and not settings.TESTING: boot() \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index efe5d9bf..9a1af100 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,6 +37,8 @@ services: image: opendronemap/webodm_worker container_name: worker entrypoint: /bin/bash -c \"/webodm/wait-for-it.sh broker:6379 -- /broker/start.sh\"" + volumes: + - ${WO_MEDIA_DIR}:/worker/app/media depends_on: - broker environment: diff --git a/webodm/settings.py b/webodm/settings.py index 9aa7f580..2a297ddb 100644 --- a/webodm/settings.py +++ b/webodm/settings.py @@ -313,6 +313,16 @@ LIBSASS_CUSTOM_FUNCTIONS = { 'scalebyiv': scalebyiv } +# Celery +CELERY_BROKER_URL = os.environ.get('WO_BROKER', 'redis://localhost') +CELERY_RESULT_BACKEND = os.environ.get('WO_BROKER', 'redis://localhost') + +CELERY_TASK_SERIALIZER = 'json' +CELERY_RESULT_SERIALIZER = 'json' +CELERY_ACCEPT_CONTENT = ['json'] +CELERY_INCLUDE=['worker.tasks'] + + if TESTING: MEDIA_ROOT = os.path.join(BASE_DIR, 'app', 'media_test') diff --git a/worker/start.sh b/worker.sh similarity index 72% rename from worker/start.sh rename to worker.sh index 55510c7f..033a6d3d 100755 --- a/worker/start.sh +++ b/worker.sh @@ -3,6 +3,16 @@ set -eo pipefail __dirname=$(cd $(dirname "$0"); pwd -P) cd ${__dirname} +usage(){ + echo "Usage: $0 " + echo + echo "This program manages the background worker processes. WebODM requires at least one background process worker to be running at all times." + echo + echo "Command list:" + echo " start Start background worker" + exit +} + check_command(){ check_msg_prefix="Checking for $1... " check_msg_result="\033[92m\033[1m OK\033[0m\033[39m" @@ -36,11 +46,17 @@ environment_check(){ fi } -environment_check -echo "Starting worker using broker at $WO_BROKER" -# Switch to parent directory -# so that celery recognizes the package name -cd ${__dirname}/../ +start(){ + action=$1 -celery -A worker worker --loglevel=info \ No newline at end of file + echo "Starting worker using broker at $WO_BROKER" + celery -A worker worker --loglevel=info +} + +if [[ $1 = "start" ]]; then + environment_check + start +else + usage +fi diff --git a/worker/Dockerfile b/worker/Dockerfile deleted file mode 100644 index c49abc01..00000000 --- a/worker/Dockerfile +++ /dev/null @@ -1,21 +0,0 @@ -FROM ubuntu:16.04 -MAINTAINER Piero Toffanin - -WO_BROKER=redis://broker - -RUN apt-get update && \ - apt-get install -y software-properties-common && \ - add-apt-repository -y ppa:ubuntugis/ubuntugis-unstable && \ - apt-get install update && \ - apt-get install -y grass-core python-pip - -COPY requirements.txt /worker/ -COPY ../wait-for-it.sh /worker/ -WORKDIR /worker - -RUN pip install -U pip && pip install -r requirements.txt - -RUN apt-get clean && \ - rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* - -RUN chmod 644 /docker-entrypoint-initdb.d/init-db.sql diff --git a/worker/celery.py b/worker/celery.py index 7a3fd59d..4bdd8fe8 100644 --- a/worker/celery.py +++ b/worker/celery.py @@ -1,8 +1,37 @@ from celery import Celery import os +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'webodm.settings') + app = Celery('tasks') -app.config_from_object('worker.celeryconfig'); +app.config_from_object('django.conf:settings', namespace='CELERY') + +app.conf.beat_schedule = { + 'update-nodes-info': { + 'task': 'worker.tasks.update_nodes_info', + 'schedule': 30, + 'options': { + 'expires': 14, + 'retry': False + } + }, + 'cleanup-projects': { + 'task': 'worker.tasks.cleanup_projects', + 'schedule': 60, + 'options': { + 'expires': 29, + 'retry': False + } + }, + 'process-pending-tasks': { + 'task': 'worker.tasks.process_pending_tasks', + 'schedule': 5, + 'options': { + 'expires': 2, + 'retry': False + } + }, +} if __name__ == '__main__': app.start() \ No newline at end of file diff --git a/worker/celeryconfig.py b/worker/celeryconfig.py deleted file mode 100644 index 64372f79..00000000 --- a/worker/celeryconfig.py +++ /dev/null @@ -1,9 +0,0 @@ -import os - -broker_url = os.environ.get('WO_BROKER', 'redis://localhost') -result_backend = os.environ.get('WO_BROKER', 'redis://localhost') - -task_serializer = 'json' -result_serializer = 'json' -accept_content = ['json'] -include=['worker.tasks'] diff --git a/worker/requirements.txt b/worker/requirements.txt deleted file mode 100644 index d3323c83..00000000 --- a/worker/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -celery -redis diff --git a/worker/tasks.py b/worker/tasks.py index 0af84c19..6bd9846d 100644 --- a/worker/tasks.py +++ b/worker/tasks.py @@ -1,15 +1,83 @@ +import traceback + +from django.core.exceptions import ObjectDoesNotExist +from django.db.models import Count +from django.db.models import Q + +from app.models import Project +from app.models import Task +from webodm import settings +from nodeodm import status_codes +from nodeodm.models import ProcessingNode from .celery import app +from celery.utils.log import get_task_logger +from django.db import transaction + +logger = get_task_logger(__name__) @app.task -def add(x, y): - return x + y +def update_nodes_info(): + processing_nodes = ProcessingNode.objects.all() + for processing_node in processing_nodes: + processing_node.update_node_info() @app.task -def mul(x, y): - return x * y +def cleanup_projects(): + # Delete all projects that are marked for deletion + # and that have no tasks left + total, count_dict = Project.objects.filter(deleting=True).annotate( + tasks_count=Count('task') + ).filter(tasks_count=0).delete() + if total > 0 and 'app.Project' in count_dict: + logger.info("Deleted {} projects".format(count_dict['app.Project'])) @app.task -def xsum(numbers): - return sum(numbers) \ No newline at end of file +def process_task(taskId): + # TODO: would a redis lock perform better here? + with transaction.atomic(): + try: + task = Task.objects.filter(pk=taskId).select_for_update().get() + except ObjectDoesNotExist: + logger.info("Task id {} has already been deleted.".format(taskId)) + return + + if not task.processing_lock: + task.processing_lock = True + task.save() + else: + return # Another worker beat us to it + + try: + task.process() + except Exception as e: + logger.error( + "Uncaught error! This is potentially bad. Please report it to http://github.com/OpenDroneMap/WebODM/issues: {} {}".format( + e, traceback.format_exc())) + if settings.TESTING: raise e + finally: + # Might have been deleted + if task.pk is not None: + task.processing_lock = False + task.save() + + +@app.task +def process_pending_tasks(): + # All tasks that have a processing node assigned + # Or that need one assigned (via auto) + # or tasks that need a status update + # or tasks that have a pending action + # and that are not locked (being processed by another thread) + qs = Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True) | + Q(Q(status=None) | Q(status__in=[status_codes.QUEUED, status_codes.RUNNING]), + processing_node__isnull=False) | + Q(pending_action__isnull=False)).exclude(Q(processing_lock=True)) + + tasks = list(qs) + + if len(qs) > 0: + for task in tasks: + process_task.delay(task.id) +