diff --git a/worker/tasks.py b/worker/tasks.py index 70b09cd7..473e399a 100644 --- a/worker/tasks.py +++ b/worker/tasks.py @@ -3,6 +3,7 @@ import shutil import traceback import time +from threading import Event, Thread from celery.utils.log import get_task_logger from django.core.exceptions import ObjectDoesNotExist from django.db.models import Count @@ -58,21 +59,42 @@ def cleanup_tmp_directory(): logger.info('Cleaned up: %s (%s)' % (f, modified)) +# Based on https://stackoverflow.com/questions/22498038/improve-current-implementation-of-a-setinterval-python/22498708#22498708 +def setInterval(interval, func, *args): + stopped = Event() + def loop(): + while not stopped.wait(interval): + func(*args) + t = Thread(target=loop) + t.daemon = True + t.start() + return stopped.set + @app.task def process_task(taskId): - have_lock = False + lock_id = 'task_lock_{}'.format(taskId) + cancel_monitor = None try: - lock = redis_client.lock('task_lock_{}'.format(taskId)) - have_lock = lock.acquire(blocking=False) + task_lock_last_update = redis_client.getset(lock_id, time.time()) + if task_lock_last_update is not None: + # Check if lock has expired + if time.time() - float(task_lock_last_update) <= 30: + # Locked + return + else: + # Expired + logger.warning("Task {} has an expired lock! This could mean that WebODM is running out of memory. Check your server configuration.") - if not have_lock: - return + # Set lock + def update_lock(): + redis_client.set(lock_id, time.time()) + cancel_monitor = setInterval(10, update_lock) try: task = Task.objects.get(pk=taskId) except ObjectDoesNotExist: - logger.info("Task id {} has already been deleted.".format(taskId)) + logger.info("Task {} has already been deleted.".format(taskId)) return try: @@ -84,12 +106,14 @@ def process_task(taskId): if settings.TESTING: raise e finally: try: - if have_lock: - lock.release() - except redis.exceptions.LockError: - # A lock could have expired + redis_client.delete(lock_id) + except redis.exceptions.RedisError: + # Ignore errors, the lock will expire at some point pass + if cancel_monitor is not None: + cancel_monitor() + def get_pending_tasks(): # All tasks that have a processing node assigned # Or that need one assigned (via auto)