Task locking monitor

pull/673/head
Piero Toffanin 2019-06-14 11:15:24 -04:00
rodzic 9558cc804c
commit 5a5ba195e9
1 zmienionych plików z 34 dodań i 10 usunięć

Wyświetl plik

@ -3,6 +3,7 @@ import shutil
import traceback
import time
from threading import Event, Thread
from celery.utils.log import get_task_logger
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Count
@ -58,21 +59,42 @@ def cleanup_tmp_directory():
logger.info('Cleaned up: %s (%s)' % (f, modified))
# Based on https://stackoverflow.com/questions/22498038/improve-current-implementation-of-a-setinterval-python/22498708#22498708
def setInterval(interval, func, *args):
stopped = Event()
def loop():
while not stopped.wait(interval):
func(*args)
t = Thread(target=loop)
t.daemon = True
t.start()
return stopped.set
@app.task
def process_task(taskId):
have_lock = False
lock_id = 'task_lock_{}'.format(taskId)
cancel_monitor = None
try:
lock = redis_client.lock('task_lock_{}'.format(taskId))
have_lock = lock.acquire(blocking=False)
task_lock_last_update = redis_client.getset(lock_id, time.time())
if task_lock_last_update is not None:
# Check if lock has expired
if time.time() - float(task_lock_last_update) <= 30:
# Locked
return
else:
# Expired
logger.warning("Task {} has an expired lock! This could mean that WebODM is running out of memory. Check your server configuration.")
if not have_lock:
return
# Set lock
def update_lock():
redis_client.set(lock_id, time.time())
cancel_monitor = setInterval(10, update_lock)
try:
task = Task.objects.get(pk=taskId)
except ObjectDoesNotExist:
logger.info("Task id {} has already been deleted.".format(taskId))
logger.info("Task {} has already been deleted.".format(taskId))
return
try:
@ -84,12 +106,14 @@ def process_task(taskId):
if settings.TESTING: raise e
finally:
try:
if have_lock:
lock.release()
except redis.exceptions.LockError:
# A lock could have expired
redis_client.delete(lock_id)
except redis.exceptions.RedisError:
# Ignore errors, the lock will expire at some point
pass
if cancel_monitor is not None:
cancel_monitor()
def get_pending_tasks():
# All tasks that have a processing node assigned
# Or that need one assigned (via auto)