Strengthened timeout error handling, added auto processing node selection mechanism on server side, automatic processing node reassignment, unit testing

pull/103/head
Piero Toffanin 2017-02-15 14:20:41 -05:00
rodzic a741d789f8
commit 482502dea1
12 zmienionych plików z 246 dodań i 45 usunięć

Wyświetl plik

@ -4,6 +4,7 @@ import os
from django.contrib.gis.db.models import GeometryField
from django.contrib.gis.db.models.functions import Envelope
from django.core.exceptions import ObjectDoesNotExist, SuspiciousFileOperation
from django.db import transaction
from django.db.models.functions import Cast
from django.http import HttpResponse
from wsgiref.util import FileWrapper
@ -124,17 +125,24 @@ class TaskViewSet(viewsets.ViewSet):
if len(files) <= 1:
raise exceptions.ValidationError(detail="Cannot create task, you need at least 2 images")
task = models.Task.create_from_images(files, project)
if task is not None:
with transaction.atomic():
task = models.Task.objects.create(project=project)
for image in files:
models.ImageUpload.objects.create(task=task, image=image)
# Update other parameters such as processing node, task name, etc.
serializer = TaskSerializer(task, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
serializer.save()
# Call the scheduler (speed things up)
scheduler.process_pending_tasks(background=True)
return Response({"id": task.id}, status=status.HTTP_201_CREATED)
else:
raise exceptions.ValidationError(detail="Cannot create task, input provided is not valid.")
# on transaction fail
raise exceptions.ValidationError(detail="Cannot create task, input provided is not valid.")
def update(self, request, pk=None, project_pk=None, partial=False):
get_and_check_project(request, project_pk, ('change_project', ))

Wyświetl plik

@ -2,7 +2,6 @@ from threading import Thread
import logging
from django import db
from webodm import settings
from app.testwatch import testWatch
logger = logging.getLogger('app.logger')

Wyświetl plik

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.10.5 on 2017-02-15 15:33
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('app', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='task',
name='auto_processing_node',
field=models.BooleanField(default=True, help_text='A flag indicating whether this task should be automatically assigned a processing node'),
),
]

Wyświetl plik

@ -2,9 +2,9 @@ import logging
import os
import shutil
import zipfile
import requests
from django.contrib.auth.models import User
from django.contrib.gis.gdal import GDALException
from django.contrib.gis.gdal import GDALRaster
from django.contrib.postgres import fields
from django.core.exceptions import ValidationError
@ -133,6 +133,7 @@ class Task(models.Model):
processing_lock = models.BooleanField(default=False, help_text="A flag indicating whether this task is currently locked for processing. When this flag is turned on, the task is in the middle of a processing step.")
processing_time = models.IntegerField(default=-1, help_text="Number of milliseconds that elapsed since the beginning of this task (-1 indicates that no information is available)")
processing_node = models.ForeignKey(ProcessingNode, null=True, blank=True, help_text="Processing node assigned to this task (or null if this task has not been associated yet)")
auto_processing_node = models.BooleanField(default=True, help_text="A flag indicating whether this task should be automatically assigned a processing node")
status = models.IntegerField(choices=STATUS_CODES, db_index=True, null=True, blank=True, help_text="Current status of the task")
last_error = models.TextField(null=True, blank=True, help_text="The last processing error received")
options = fields.JSONField(default=dict(), blank=True, help_text="Options that are being used to process this task", validators=[validate_task_options])
@ -153,7 +154,12 @@ class Task(models.Model):
def save(self, *args, **kwargs):
# Autovalidate on save
self.full_clean()
try:
self.full_clean()
except GDALException as e:
logger.warning("Problem while handling GDAL raster: {}. We're going to attempt to remove the reference to it...".format(e))
self.orthophoto = None
super(Task, self).save(*args, **kwargs)
@ -166,22 +172,6 @@ class Task(models.Model):
"assets",
*args)
@staticmethod
def create_from_images(images, project):
'''
Create a new task from a set of input images (such as the ones coming from request.FILES).
This will happen inside a transaction so if one of the images
fails to load, the task will not be created.
'''
with transaction.atomic():
task = Task.objects.create(project=project)
for image in images:
ImageUpload.objects.create(task=task, image=image)
return task
def process(self):
"""
This method contains the logic for processing tasks asynchronously
@ -191,6 +181,26 @@ class Task(models.Model):
"""
try:
if self.auto_processing_node and self.last_error is None:
# No processing node assigned and need to auto assign
if self.processing_node is None:
# Assign first online node with lowest queue count
self.processing_node = ProcessingNode.find_best_available_node()
if self.processing_node:
self.processing_node.queue_count += 1 # Doesn't have to be accurate, it will get overriden later
self.processing_node.save()
logger.info("Automatically assigned processing node {} to {}".format(self.processing_node, self))
self.save()
# Processing node assigned, but is offline and no errors
if self.processing_node and not self.processing_node.is_online():
# Detach processing node, will be processed at the next tick
logger.info("Processing node {} went offline, reassigning {}...".format(self.processing_node, self))
self.uuid = ''
self.processing_node = None
self.save()
if self.processing_node:
# Need to process some images (UUID not yet set and task doesn't have pending actions)?
if not self.uuid and self.pending_action is None:

Wyświetl plik

@ -35,10 +35,12 @@ def process_pending_tasks():
tasks_mutex.acquire()
# All tasks that have a processing node assigned
# Or that need one assigned (via auto)
# but don't have a UUID
# or tasks that have a pending action
# and that are not locked (being processed by another thread)
tasks = Task.objects.filter(Q(uuid='', last_error__isnull=True, processing_node__isnull=False) |
tasks = Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True) |
Q(uuid='', last_error__isnull=True, processing_node__isnull=False) |
Q(status__in=[status_codes.QUEUED, status_codes.RUNNING], processing_node__isnull=False) |
Q(status=None, processing_node__isnull=False) |
Q(pending_action__isnull=False)).exclude(Q(processing_lock=True))

Wyświetl plik

@ -66,7 +66,8 @@ class EditTaskForm extends React.Component {
that you have granted the current user sufficient permissions to view
the processing node (by going to Administration -- Processing Nodes -- Select Node -- Object Permissions -- Add User/Group and check CAN VIEW PROCESSING NODE).
If you are bringing a node back online, it will take about 30 seconds for WebODM to recognize it.`});
}
};
if (json.length === 0){
noProcessingNodesError();
return;
@ -89,17 +90,27 @@ class EditTaskForm extends React.Component {
};
});
// Find a node with lowest queue count
let minQueueCount = Math.min(...nodes.filter(node => node.enabled).map(node => node.queue_count));
let minQueueCountNodes = nodes.filter(node => node.enabled && node.queue_count === minQueueCount);
let autoNode = null;
if (minQueueCountNodes.length === 0){
noProcessingNodesError(nodes);
return;
// If the user has selected auto, and the a processing node has been assigned
// we need attempt to find the "auto" node to be the one that has been assigned
if (this.props.task && this.props.task.processing_node && this.props.task.auto_processing_node){
autoNode = nodes.find(node => node.id === this.props.task.processing_node);
}
// Choose at random
let autoNode = minQueueCountNodes[~~(Math.random() * minQueueCountNodes.length)];
if (!autoNode){
// Find a node with lowest queue count
let minQueueCount = Math.min(...nodes.filter(node => node.enabled).map(node => node.queue_count));
let minQueueCountNodes = nodes.filter(node => node.enabled && node.queue_count === minQueueCount);
if (minQueueCountNodes.length === 0){
noProcessingNodesError(nodes);
return;
}
// Choose at random
autoNode = minQueueCountNodes[~~(Math.random() * minQueueCountNodes.length)];
}
nodes.unshift({
id: autoNode.id,
@ -116,9 +127,13 @@ class EditTaskForm extends React.Component {
// Have we specified a node?
if (this.props.task && this.props.task.processing_node){
this.selectNodeByKey(this.props.task.processing_node);
if (this.props.task.auto_processing_node){
this.selectNodeByKey("auto");
}else{
this.selectNodeByKey(this.props.task.processing_node);
}
}else{
this.selectNodeByKey(nodes[0].key);
this.selectNodeByKey("auto");
}
if (this.props.onFormLoaded) this.props.onFormLoaded();

Wyświetl plik

@ -155,6 +155,9 @@ class ProjectListItem extends React.Component {
})
.on("dragenter", () => {
this.resetUploadState();
})
.on("sending", (file, xhr, formData) => {
formData.append('auto_processing_node', "false");
});
}
@ -172,7 +175,8 @@ class ProjectListItem extends React.Component {
data: JSON.stringify({
name: taskInfo.name,
options: taskInfo.options,
processing_node: taskInfo.selectedNode.id
processing_node: taskInfo.selectedNode.id,
auto_processing_node: taskInfo.selectedNode.key == "auto"
}),
dataType: 'json',
type: 'PATCH'

Wyświetl plik

@ -189,6 +189,7 @@ class TaskListItem extends React.Component {
taskInfo.uuid = ""; // TODO: we could reuse the UUID so that images don't need to be re-uploaded! This needs changes on node-odm as well.
taskInfo.processing_node = taskInfo.selectedNode.id;
taskInfo.auto_processing_node = taskInfo.selectedNode.key == "auto";
delete(taskInfo.selectedNode);
$.ajax({

Wyświetl plik

@ -61,9 +61,8 @@ class BootTransactionTestCase(TransactionTestCase):
'''
Same as above, but inherits from TransactionTestCase
'''
@classmethod
def setUpClass(cls):
super(BootTransactionTestCase, cls).setUpClass()
def setUp(self):
super().setUp()
boot()
setupUsers()
setupProjects()

Wyświetl plik

@ -5,6 +5,7 @@ import time
import shutil
import logging
from datetime import timedelta
import requests
from django.contrib.auth.models import User
@ -13,10 +14,11 @@ from rest_framework.test import APIClient
from app import pending_actions
from app import scheduler
from django.utils import timezone
from app.models import Project, Task, ImageUpload, task_directory_path
from app.tests.classes import BootTransactionTestCase
from nodeodm import status_codes
from nodeodm.models import ProcessingNode
from nodeodm.models import ProcessingNode, OFFLINE_MINUTES
from app.testwatch import testWatch
# We need to test the task API in a TransactionTestCase because
@ -37,6 +39,8 @@ def start_processing_node():
class TestApiTask(BootTransactionTestCase):
def setUp(self):
super().setUp()
# We need to clear previous media_root content
# This points to the test directory, but just in case
# we double check that the directory is indeed a test directory
@ -323,4 +327,91 @@ class TestApiTask(BootTransactionTestCase):
image2.close()
node_odm.terminate()
def test_task_auto_processing_node(self):
project = Project.objects.get(name="User Test Project")
task = Task.objects.create(project=project, name="Test")
pnode = ProcessingNode.objects.create(hostname="invalid-host", port=11223)
another_pnode = ProcessingNode.objects.create(hostname="invalid-host-2", port=11223)
# By default
self.assertTrue(task.auto_processing_node)
self.assertTrue(task.processing_node is None)
# Simulate an error
task.last_error = "Test error"
task.save()
scheduler.process_pending_tasks()
# A processing node should not have been assigned
task.refresh_from_db()
self.assertTrue(task.processing_node is None)
# Remove error
task.last_error = None
task.save()
scheduler.process_pending_tasks()
# A processing node should not have been assigned because no processing nodes are online
task.refresh_from_db()
self.assertTrue(task.processing_node is None)
# Bring a proessing node online
pnode.last_refreshed = timezone.now()
pnode.save()
self.assertTrue(pnode.is_online())
# A processing node has been assigned
scheduler.process_pending_tasks()
task.refresh_from_db()
self.assertTrue(task.processing_node.id == pnode.id)
# Task should have failed (no images provided, invalid host...)
self.assertTrue(task.last_error is not None)
# Bring another processing node online, and bring the old one offline
pnode.last_refreshed = timezone.now() - timedelta(minutes=OFFLINE_MINUTES)
pnode.save()
another_pnode.last_refreshed = timezone.now()
another_pnode.save()
# Remove error
task.last_error = None
task.save()
scheduler.process_pending_tasks()
# Processing node is now cleared and a new one will be assigned on the next tick
task.refresh_from_db()
self.assertTrue(task.processing_node is None)
scheduler.process_pending_tasks()
task.refresh_from_db()
self.assertTrue(task.processing_node.id == another_pnode.id)
def test_task_manual_processing_node(self):
user = User.objects.get(username="testuser")
project = Project.objects.create(name="User Test Project", owner=user)
task = Task.objects.create(project=project, name="Test", auto_processing_node=False)
# Bring a processing node online
pnode = ProcessingNode.objects.create(hostname="invalid-host", port=11223)
pnode.last_refreshed = timezone.now()
pnode.save()
self.assertTrue(pnode.is_online())
scheduler.process_pending_tasks()
# A processing node should not have been assigned because we asked
# not to via auto_processing_node = false
task.refresh_from_db()
self.assertTrue(task.processing_node is None)

Wyświetl plik

@ -11,9 +11,12 @@ from guardian.models import UserObjectPermissionBase
from .api_client import ApiClient
import json
from django.db.models import signals
from requests.exceptions import ConnectionError
from datetime import datetime, timedelta
from .exceptions import ProcessingError, ProcessingTimeout
import simplejson
import django.utils.timezone
def api(func):
"""
Catches JSON decoding errors that might happen when the server
@ -24,12 +27,14 @@ def api(func):
return func(*args, **kwargs)
except (json.decoder.JSONDecodeError, simplejson.JSONDecodeError) as e:
raise ProcessingError(str(e))
except (requests.exceptions.ConnectTimeout, requests.exceptions.ConnectionError) as e:
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
raise ProcessingTimeout(str(e))
return wrapper
OFFLINE_MINUTES = 5 # Number of minutes a node hasn't been seen before it should be considered offline
class ProcessingNode(models.Model):
hostname = models.CharField(max_length=255, help_text="Hostname or IP address where the node is located (can be an internal hostname as well). If you are using Docker, this is never 127.0.0.1 or localhost. Find the IP address of your host machine by running ifconfig on Linux or by checking your network settings.")
port = models.PositiveIntegerField(help_text="Port that connects to the node's API")
@ -41,6 +46,19 @@ class ProcessingNode(models.Model):
def __str__(self):
return '{}:{}'.format(self.hostname, self.port)
@staticmethod
def find_best_available_node():
"""
Attempts to find an available node (seen in the last 5 minutes, and with lowest queue count)
:return: ProcessingNode | None
"""
return ProcessingNode.objects.filter(last_refreshed__gte=timezone.now() - timedelta(minutes=OFFLINE_MINUTES)) \
.order_by('queue_count').first()
def is_online(self):
return self.last_refreshed is not None and \
self.last_refreshed >= timezone.now() - timedelta(minutes=OFFLINE_MINUTES)
@api
def update_node_info(self):
"""
@ -60,7 +78,7 @@ class ProcessingNode(models.Model):
self.last_refreshed = timezone.now()
self.save()
return True
except (ConnectionError, json.decoder.JSONDecodeError, simplejson.JSONDecodeError):
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, json.decoder.JSONDecodeError, simplejson.JSONDecodeError):
return False
def api_client(self):

Wyświetl plik

@ -1,9 +1,12 @@
from datetime import timedelta
import requests
from django.test import TestCase
from django.utils import six
import subprocess, time
from django.utils import timezone
from os import path
from .models import ProcessingNode
from .models import ProcessingNode, OFFLINE_MINUTES
from .api_client import ApiClient
from requests.exceptions import ConnectionError
from .exceptions import ProcessingError
@ -141,4 +144,35 @@ class TestClientApi(TestCase):
self.assertRaises(ProcessingError, online_node.remove_task, uuid)
# Task has been deleted
self.assertRaises(ProcessingError, online_node.get_task_info, uuid)
self.assertRaises(ProcessingError, online_node.get_task_info, uuid)
def test_find_best_available_node_and_is_online(self):
# Fixtures are all offline
self.assertTrue(ProcessingNode.find_best_available_node() is None)
# Bring one online
pnode = ProcessingNode.objects.get(pk=1)
self.assertFalse(pnode.is_online())
pnode.last_refreshed = timezone.now()
pnode.queue_count = 2
pnode.save()
self.assertTrue(pnode.is_online())
self.assertTrue(ProcessingNode.find_best_available_node().id == pnode.id)
# Bring another online with lower queue count
another_pnode = ProcessingNode.objects.get(pk=2)
another_pnode.last_refreshed = pnode.last_refreshed
another_pnode.queue_count = 1
another_pnode.save()
self.assertTrue(ProcessingNode.find_best_available_node().id == another_pnode.id)
# Bring it offline
another_pnode.last_refreshed -= timedelta(minutes=OFFLINE_MINUTES)
another_pnode.save()
self.assertFalse(another_pnode.is_online())
# Best choice now is original processing node
self.assertTrue(ProcessingNode.find_best_available_node().id == pnode.id)