Removed task lock from db model, using redis locks instead

pull/428/head
Piero Toffanin 2018-04-08 12:48:54 -04:00
rodzic 5b229ce4ed
commit a6ff7b8998
5 zmienionych plików z 42 dodań i 27 usunięć

Wyświetl plik

@ -51,7 +51,7 @@ class TaskSerializer(serializers.ModelSerializer):
class Meta:
model = models.Task
exclude = ('processing_lock', 'console_output', 'orthophoto_extent', 'dsm_extent', 'dtm_extent', )
exclude = ('console_output', 'orthophoto_extent', 'dsm_extent', 'dtm_extent', )
read_only_fields = ('processing_time', 'status', 'last_error', 'created_at', 'pending_action', 'available_assets', )
class TaskViewSet(viewsets.ViewSet):

Wyświetl plik

@ -81,9 +81,6 @@ def boot():
logger.info("Created settings")
# Unlock any Task that might have been locked
Task.objects.filter(processing_lock=True).update(processing_lock=False)
register_plugins()
if not settings.TESTING:

Wyświetl plik

@ -0,0 +1,17 @@
# Generated by Django 2.0.3 on 2018-04-08 16:47
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('app', '0018_auto_20180311_1028'),
]
operations = [
migrations.RemoveField(
model_name='task',
name='processing_lock',
),
]

Wyświetl plik

@ -145,7 +145,6 @@ class Task(models.Model):
uuid = models.CharField(max_length=255, db_index=True, default='', blank=True, help_text="Identifier of the task (as returned by OpenDroneMap's REST API)")
project = models.ForeignKey(Project, on_delete=models.CASCADE, help_text="Project that this task belongs to")
name = models.CharField(max_length=255, null=True, blank=True, help_text="A label for the task")
processing_lock = models.BooleanField(default=False, help_text="A flag indicating whether this task is currently locked for processing. When this flag is turned on, the task is in the middle of a processing step.")
processing_time = models.IntegerField(default=-1, help_text="Number of milliseconds that elapsed since the beginning of this task (-1 indicates that no information is available)")
processing_node = models.ForeignKey(ProcessingNode, on_delete=models.SET_NULL, null=True, blank=True, help_text="Processing node assigned to this task (or null if this task has not been associated yet)")
auto_processing_node = models.BooleanField(default=True, help_text="A flag indicating whether this task should be automatically assigned a processing node")

Wyświetl plik

@ -13,8 +13,10 @@ from nodeodm import status_codes
from nodeodm.models import ProcessingNode
from webodm import settings
from .celery import app
import redis
logger = get_task_logger(__name__)
redis_client = redis.Redis.from_url(settings.CELERY_BROKER_URL)
@app.task
def update_nodes_info():
@ -36,32 +38,33 @@ def cleanup_projects():
@app.task
def process_task(taskId):
# TODO: would a redis lock perform better here?
with transaction.atomic():
try:
lock = redis_client.lock('task_lock_{}'.format(taskId), timeout=360)
have_lock = lock.acquire(blocking=False)
if not have_lock:
return
try:
task = Task.objects.filter(pk=taskId).select_for_update().get()
task = Task.objects.get(pk=taskId)
except ObjectDoesNotExist:
logger.info("Task id {} has already been deleted.".format(taskId))
return
if not task.processing_lock:
task.processing_lock = True
task.save()
else:
return # Another worker beat us to it
try:
task.process()
except Exception as e:
logger.error(
"Uncaught error! This is potentially bad. Please report it to http://github.com/OpenDroneMap/WebODM/issues: {} {}".format(
e, traceback.format_exc()))
if settings.TESTING: raise e
try:
task.process()
except Exception as e:
logger.error(
"Uncaught error! This is potentially bad. Please report it to http://github.com/OpenDroneMap/WebODM/issues: {} {}".format(
e, traceback.format_exc()))
if settings.TESTING: raise e
finally:
# Might have been deleted
if task.pk is not None:
task.processing_lock = False
task.save()
try:
if have_lock:
lock.release()
except redis.exceptions.LockError:
# A lock could have expired
pass
@app.task
@ -70,11 +73,10 @@ def process_pending_tasks():
# Or that need one assigned (via auto)
# or tasks that need a status update
# or tasks that have a pending action
# and that are not locked (being processed by another thread)
tasks = Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True) |
Q(Q(status=None) | Q(status__in=[status_codes.QUEUED, status_codes.RUNNING]),
processing_node__isnull=False) |
Q(pending_action__isnull=False)).exclude(Q(processing_lock=True))
Q(pending_action__isnull=False))
for task in tasks:
process_task.delay(task.id)