Merge pull request #428 from pierotofy/redis-lock

Redis lock
pull/432/head
Piero Toffanin 2018-04-08 16:41:41 -04:00 zatwierdzone przez GitHub
commit 1ad79614b6
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
10 zmienionych plików z 65 dodań i 34 usunięć

Wyświetl plik

@ -51,7 +51,7 @@ class TaskSerializer(serializers.ModelSerializer):
class Meta:
model = models.Task
exclude = ('processing_lock', 'console_output', 'orthophoto_extent', 'dsm_extent', 'dtm_extent', )
exclude = ('console_output', 'orthophoto_extent', 'dsm_extent', 'dtm_extent', )
read_only_fields = ('processing_time', 'status', 'last_error', 'created_at', 'pending_action', 'available_assets', )
class TaskViewSet(viewsets.ViewSet):

Wyświetl plik

@ -81,9 +81,6 @@ def boot():
logger.info("Created settings")
# Unlock any Task that might have been locked
Task.objects.filter(processing_lock=True).update(processing_lock=False)
register_plugins()
if not settings.TESTING:

Wyświetl plik

@ -0,0 +1,17 @@
# Generated by Django 2.0.3 on 2018-04-08 16:47
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('app', '0018_auto_20180311_1028'),
]
operations = [
migrations.RemoveField(
model_name='task',
name='processing_lock',
),
]

Wyświetl plik

@ -145,7 +145,6 @@ class Task(models.Model):
uuid = models.CharField(max_length=255, db_index=True, default='', blank=True, help_text="Identifier of the task (as returned by OpenDroneMap's REST API)")
project = models.ForeignKey(Project, on_delete=models.CASCADE, help_text="Project that this task belongs to")
name = models.CharField(max_length=255, null=True, blank=True, help_text="A label for the task")
processing_lock = models.BooleanField(default=False, help_text="A flag indicating whether this task is currently locked for processing. When this flag is turned on, the task is in the middle of a processing step.")
processing_time = models.IntegerField(default=-1, help_text="Number of milliseconds that elapsed since the beginning of this task (-1 indicates that no information is available)")
processing_node = models.ForeignKey(ProcessingNode, on_delete=models.SET_NULL, null=True, blank=True, help_text="Processing node assigned to this task (or null if this task has not been associated yet)")
auto_processing_node = models.BooleanField(default=True, help_text="A flag indicating whether this task should be automatically assigned a processing node")

Wyświetl plik

@ -0,0 +1,13 @@
from worker import tasks
import redis
from webodm import settings
redis_client = redis.Redis().from_url(settings.CELERY_BROKER_URL)
for task in tasks.get_pending_tasks():
msg = "Unlocking {}... ".format(task)
res = redis_client.delete('task_lock_{}'.format(task.id))
print(msg + ("OK" if res else "Already unlocked"))

Wyświetl plik

@ -139,11 +139,11 @@ class Console extends React.Component {
onMouseOut={this.handleMouseOut}
ref={this.setRef}
>
{this.state.lines.map(line => {
if (this.props.lang) return (<div key={i++} dangerouslySetInnerHTML={prettyLine(line)}></div>);
else return line + "\n";
})}
{"\n"}
</pre>
);
}

Wyświetl plik

@ -144,7 +144,7 @@ class ProcessingNode(models.Model):
if isinstance(result, dict) and 'error' in result:
raise ProcessingError(result['error'])
elif isinstance(result, list):
return "".join(result)
return "\n".join(result)
else:
raise ProcessingError("Unknown response for console output: {}".format(result))

Wyświetl plik

@ -1,6 +1,6 @@
{
"name": "WebODM",
"version": "0.5.1",
"version": "0.5.2",
"description": "Open Source Drone Image Processing",
"main": "index.js",
"scripts": {

Wyświetl plik

@ -69,6 +69,7 @@ if [ "$WO_SSL" = "YES" ]; then
proto="https"
fi
cat app/scripts/unlock_all_tasks.py | python manage.py shell
./worker.sh scheduler start
congrats(){

Wyświetl plik

@ -13,8 +13,10 @@ from nodeodm import status_codes
from nodeodm.models import ProcessingNode
from webodm import settings
from .celery import app
import redis
logger = get_task_logger(__name__)
redis_client = redis.Redis.from_url(settings.CELERY_BROKER_URL)
@app.task
def update_nodes_info():
@ -36,45 +38,47 @@ def cleanup_projects():
@app.task
def process_task(taskId):
# TODO: would a redis lock perform better here?
with transaction.atomic():
try:
lock = redis_client.lock('task_lock_{}'.format(taskId))
have_lock = lock.acquire(blocking=False)
if not have_lock:
return
try:
task = Task.objects.filter(pk=taskId).select_for_update().get()
task = Task.objects.get(pk=taskId)
except ObjectDoesNotExist:
logger.info("Task id {} has already been deleted.".format(taskId))
return
if not task.processing_lock:
task.processing_lock = True
task.save()
else:
return # Another worker beat us to it
try:
task.process()
except Exception as e:
logger.error(
"Uncaught error! This is potentially bad. Please report it to http://github.com/OpenDroneMap/WebODM/issues: {} {}".format(
e, traceback.format_exc()))
if settings.TESTING: raise e
try:
task.process()
except Exception as e:
logger.error(
"Uncaught error! This is potentially bad. Please report it to http://github.com/OpenDroneMap/WebODM/issues: {} {}".format(
e, traceback.format_exc()))
if settings.TESTING: raise e
finally:
# Might have been deleted
if task.pk is not None:
task.processing_lock = False
task.save()
try:
if have_lock:
lock.release()
except redis.exceptions.LockError:
# A lock could have expired
pass
@app.task
def process_pending_tasks():
def get_pending_tasks():
# All tasks that have a processing node assigned
# Or that need one assigned (via auto)
# or tasks that need a status update
# or tasks that have a pending action
# and that are not locked (being processed by another thread)
tasks = Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True) |
return Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True) |
Q(Q(status=None) | Q(status__in=[status_codes.QUEUED, status_codes.RUNNING]),
processing_node__isnull=False) |
Q(pending_action__isnull=False)).exclude(Q(processing_lock=True))
Q(pending_action__isnull=False))
@app.task
def process_pending_tasks():
tasks = get_pending_tasks()
for task in tasks:
process_task.delay(task.id)