diff --git a/app/scripts/unlock_all_tasks.py b/app/scripts/unlock_all_tasks.py new file mode 100644 index 00000000..9d878c16 --- /dev/null +++ b/app/scripts/unlock_all_tasks.py @@ -0,0 +1,13 @@ +from worker import tasks +import redis +from webodm import settings + +redis_client = redis.Redis().from_url(settings.CELERY_BROKER_URL) + +for task in tasks.get_pending_tasks(): + msg = "Unlocking {}... ".format(task) + res = redis_client.delete('task_lock_{}'.format(task.id)) + print(msg + ("OK" if res else "Already unlocked")) + + + diff --git a/start.sh b/start.sh index c52780ca..c0401fba 100755 --- a/start.sh +++ b/start.sh @@ -69,6 +69,7 @@ if [ "$WO_SSL" = "YES" ]; then proto="https" fi +cat app/scripts/unlock_all_tasks.py | python manage.py shell ./worker.sh scheduler start congrats(){ diff --git a/worker/tasks.py b/worker/tasks.py index 26f331e3..13228557 100644 --- a/worker/tasks.py +++ b/worker/tasks.py @@ -39,7 +39,7 @@ def cleanup_projects(): @app.task def process_task(taskId): try: - lock = redis_client.lock('task_lock_{}'.format(taskId), timeout=360) + lock = redis_client.lock('task_lock_{}'.format(taskId)) have_lock = lock.acquire(blocking=False) if not have_lock: @@ -66,18 +66,20 @@ def process_task(taskId): # A lock could have expired pass - -@app.task -def process_pending_tasks(): +def get_pending_tasks(): # All tasks that have a processing node assigned # Or that need one assigned (via auto) # or tasks that need a status update # or tasks that have a pending action - tasks = Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True) | + return Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True) | Q(Q(status=None) | Q(status__in=[status_codes.QUEUED, status_codes.RUNNING]), processing_node__isnull=False) | Q(pending_action__isnull=False)) +@app.task +def process_pending_tasks(): + tasks = get_pending_tasks() + for task in tasks: process_task.delay(task.id)