From a6ff7b8998f1e928efc93a5b526a604eded1e602 Mon Sep 17 00:00:00 2001 From: Piero Toffanin Date: Sun, 8 Apr 2018 12:48:54 -0400 Subject: [PATCH] Removed task lock from db model, using redis locks instead --- app/api/tasks.py | 2 +- app/boot.py | 3 -- .../0019_remove_task_processing_lock.py | 17 +++++++ app/models/task.py | 1 - worker/tasks.py | 46 ++++++++++--------- 5 files changed, 42 insertions(+), 27 deletions(-) create mode 100644 app/migrations/0019_remove_task_processing_lock.py diff --git a/app/api/tasks.py b/app/api/tasks.py index e5d3b1cd..bd14f7cd 100644 --- a/app/api/tasks.py +++ b/app/api/tasks.py @@ -51,7 +51,7 @@ class TaskSerializer(serializers.ModelSerializer): class Meta: model = models.Task - exclude = ('processing_lock', 'console_output', 'orthophoto_extent', 'dsm_extent', 'dtm_extent', ) + exclude = ('console_output', 'orthophoto_extent', 'dsm_extent', 'dtm_extent', ) read_only_fields = ('processing_time', 'status', 'last_error', 'created_at', 'pending_action', 'available_assets', ) class TaskViewSet(viewsets.ViewSet): diff --git a/app/boot.py b/app/boot.py index ea898715..9f45817e 100644 --- a/app/boot.py +++ b/app/boot.py @@ -81,9 +81,6 @@ def boot(): logger.info("Created settings") - # Unlock any Task that might have been locked - Task.objects.filter(processing_lock=True).update(processing_lock=False) - register_plugins() if not settings.TESTING: diff --git a/app/migrations/0019_remove_task_processing_lock.py b/app/migrations/0019_remove_task_processing_lock.py new file mode 100644 index 00000000..8b692e5d --- /dev/null +++ b/app/migrations/0019_remove_task_processing_lock.py @@ -0,0 +1,17 @@ +# Generated by Django 2.0.3 on 2018-04-08 16:47 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('app', '0018_auto_20180311_1028'), + ] + + operations = [ + migrations.RemoveField( + model_name='task', + name='processing_lock', + ), + ] diff --git a/app/models/task.py b/app/models/task.py index 569df118..f57ceb71 100644 --- a/app/models/task.py +++ b/app/models/task.py @@ -145,7 +145,6 @@ class Task(models.Model): uuid = models.CharField(max_length=255, db_index=True, default='', blank=True, help_text="Identifier of the task (as returned by OpenDroneMap's REST API)") project = models.ForeignKey(Project, on_delete=models.CASCADE, help_text="Project that this task belongs to") name = models.CharField(max_length=255, null=True, blank=True, help_text="A label for the task") - processing_lock = models.BooleanField(default=False, help_text="A flag indicating whether this task is currently locked for processing. When this flag is turned on, the task is in the middle of a processing step.") processing_time = models.IntegerField(default=-1, help_text="Number of milliseconds that elapsed since the beginning of this task (-1 indicates that no information is available)") processing_node = models.ForeignKey(ProcessingNode, on_delete=models.SET_NULL, null=True, blank=True, help_text="Processing node assigned to this task (or null if this task has not been associated yet)") auto_processing_node = models.BooleanField(default=True, help_text="A flag indicating whether this task should be automatically assigned a processing node") diff --git a/worker/tasks.py b/worker/tasks.py index ad84d38d..26f331e3 100644 --- a/worker/tasks.py +++ b/worker/tasks.py @@ -13,8 +13,10 @@ from nodeodm import status_codes from nodeodm.models import ProcessingNode from webodm import settings from .celery import app +import redis logger = get_task_logger(__name__) +redis_client = redis.Redis.from_url(settings.CELERY_BROKER_URL) @app.task def update_nodes_info(): @@ -36,32 +38,33 @@ def cleanup_projects(): @app.task def process_task(taskId): - # TODO: would a redis lock perform better here? - with transaction.atomic(): + try: + lock = redis_client.lock('task_lock_{}'.format(taskId), timeout=360) + have_lock = lock.acquire(blocking=False) + + if not have_lock: + return + try: - task = Task.objects.filter(pk=taskId).select_for_update().get() + task = Task.objects.get(pk=taskId) except ObjectDoesNotExist: logger.info("Task id {} has already been deleted.".format(taskId)) return - if not task.processing_lock: - task.processing_lock = True - task.save() - else: - return # Another worker beat us to it - - try: - task.process() - except Exception as e: - logger.error( - "Uncaught error! This is potentially bad. Please report it to http://github.com/OpenDroneMap/WebODM/issues: {} {}".format( - e, traceback.format_exc())) - if settings.TESTING: raise e + try: + task.process() + except Exception as e: + logger.error( + "Uncaught error! This is potentially bad. Please report it to http://github.com/OpenDroneMap/WebODM/issues: {} {}".format( + e, traceback.format_exc())) + if settings.TESTING: raise e finally: - # Might have been deleted - if task.pk is not None: - task.processing_lock = False - task.save() + try: + if have_lock: + lock.release() + except redis.exceptions.LockError: + # A lock could have expired + pass @app.task @@ -70,11 +73,10 @@ def process_pending_tasks(): # Or that need one assigned (via auto) # or tasks that need a status update # or tasks that have a pending action - # and that are not locked (being processed by another thread) tasks = Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True) | Q(Q(status=None) | Q(status__in=[status_codes.QUEUED, status_codes.RUNNING]), processing_node__isnull=False) | - Q(pending_action__isnull=False)).exclude(Q(processing_lock=True)) + Q(pending_action__isnull=False)) for task in tasks: process_task.delay(task.id)