diff --git a/app/api/tasks.py b/app/api/tasks.py index 1649c342..d05aa706 100644 --- a/app/api/tasks.py +++ b/app/api/tasks.py @@ -4,6 +4,7 @@ import os from django.contrib.gis.db.models import GeometryField from django.contrib.gis.db.models.functions import Envelope from django.core.exceptions import ObjectDoesNotExist, SuspiciousFileOperation +from django.db import transaction from django.db.models.functions import Cast from django.http import HttpResponse from wsgiref.util import FileWrapper @@ -124,17 +125,24 @@ class TaskViewSet(viewsets.ViewSet): if len(files) <= 1: raise exceptions.ValidationError(detail="Cannot create task, you need at least 2 images") - task = models.Task.create_from_images(files, project) - if task is not None: + with transaction.atomic(): + task = models.Task.objects.create(project=project) + + for image in files: + models.ImageUpload.objects.create(task=task, image=image) # Update other parameters such as processing node, task name, etc. serializer = TaskSerializer(task, data=request.data, partial=True) serializer.is_valid(raise_exception=True) serializer.save() + # Call the scheduler (speed things up) + scheduler.process_pending_tasks(background=True) + return Response({"id": task.id}, status=status.HTTP_201_CREATED) - else: - raise exceptions.ValidationError(detail="Cannot create task, input provided is not valid.") + + # on transaction fail + raise exceptions.ValidationError(detail="Cannot create task, input provided is not valid.") def update(self, request, pk=None, project_pk=None, partial=False): get_and_check_project(request, project_pk, ('change_project', )) diff --git a/app/background.py b/app/background.py index 57b31cb8..6f1745cf 100644 --- a/app/background.py +++ b/app/background.py @@ -2,7 +2,6 @@ from threading import Thread import logging from django import db -from webodm import settings from app.testwatch import testWatch logger = logging.getLogger('app.logger') diff --git a/app/migrations/0002_task_auto_processing_node.py b/app/migrations/0002_task_auto_processing_node.py new file mode 100644 index 00000000..eb8129ce --- /dev/null +++ b/app/migrations/0002_task_auto_processing_node.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.10.5 on 2017-02-15 15:33 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('app', '0001_initial'), + ] + + operations = [ + migrations.AddField( + model_name='task', + name='auto_processing_node', + field=models.BooleanField(default=True, help_text='A flag indicating whether this task should be automatically assigned a processing node'), + ), + ] diff --git a/app/models.py b/app/models.py index 5ee7b84b..56cde86d 100644 --- a/app/models.py +++ b/app/models.py @@ -2,9 +2,9 @@ import logging import os import shutil import zipfile -import requests from django.contrib.auth.models import User +from django.contrib.gis.gdal import GDALException from django.contrib.gis.gdal import GDALRaster from django.contrib.postgres import fields from django.core.exceptions import ValidationError @@ -133,6 +133,7 @@ class Task(models.Model): processing_lock = models.BooleanField(default=False, help_text="A flag indicating whether this task is currently locked for processing. When this flag is turned on, the task is in the middle of a processing step.") processing_time = models.IntegerField(default=-1, help_text="Number of milliseconds that elapsed since the beginning of this task (-1 indicates that no information is available)") processing_node = models.ForeignKey(ProcessingNode, null=True, blank=True, help_text="Processing node assigned to this task (or null if this task has not been associated yet)") + auto_processing_node = models.BooleanField(default=True, help_text="A flag indicating whether this task should be automatically assigned a processing node") status = models.IntegerField(choices=STATUS_CODES, db_index=True, null=True, blank=True, help_text="Current status of the task") last_error = models.TextField(null=True, blank=True, help_text="The last processing error received") options = fields.JSONField(default=dict(), blank=True, help_text="Options that are being used to process this task", validators=[validate_task_options]) @@ -153,7 +154,12 @@ class Task(models.Model): def save(self, *args, **kwargs): # Autovalidate on save - self.full_clean() + try: + self.full_clean() + except GDALException as e: + logger.warning("Problem while handling GDAL raster: {}. We're going to attempt to remove the reference to it...".format(e)) + self.orthophoto = None + super(Task, self).save(*args, **kwargs) @@ -166,22 +172,6 @@ class Task(models.Model): "assets", *args) - @staticmethod - def create_from_images(images, project): - ''' - Create a new task from a set of input images (such as the ones coming from request.FILES). - This will happen inside a transaction so if one of the images - fails to load, the task will not be created. - ''' - with transaction.atomic(): - task = Task.objects.create(project=project) - - for image in images: - ImageUpload.objects.create(task=task, image=image) - - return task - - def process(self): """ This method contains the logic for processing tasks asynchronously @@ -191,6 +181,26 @@ class Task(models.Model): """ try: + if self.auto_processing_node and self.last_error is None: + # No processing node assigned and need to auto assign + if self.processing_node is None: + # Assign first online node with lowest queue count + self.processing_node = ProcessingNode.find_best_available_node() + if self.processing_node: + self.processing_node.queue_count += 1 # Doesn't have to be accurate, it will get overriden later + self.processing_node.save() + + logger.info("Automatically assigned processing node {} to {}".format(self.processing_node, self)) + self.save() + + # Processing node assigned, but is offline and no errors + if self.processing_node and not self.processing_node.is_online(): + # Detach processing node, will be processed at the next tick + logger.info("Processing node {} went offline, reassigning {}...".format(self.processing_node, self)) + self.uuid = '' + self.processing_node = None + self.save() + if self.processing_node: # Need to process some images (UUID not yet set and task doesn't have pending actions)? if not self.uuid and self.pending_action is None: diff --git a/app/scheduler.py b/app/scheduler.py index ebf1f962..a5cb8f94 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -35,10 +35,12 @@ def process_pending_tasks(): tasks_mutex.acquire() # All tasks that have a processing node assigned + # Or that need one assigned (via auto) # but don't have a UUID # or tasks that have a pending action # and that are not locked (being processed by another thread) - tasks = Task.objects.filter(Q(uuid='', last_error__isnull=True, processing_node__isnull=False) | + tasks = Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True) | + Q(uuid='', last_error__isnull=True, processing_node__isnull=False) | Q(status__in=[status_codes.QUEUED, status_codes.RUNNING], processing_node__isnull=False) | Q(status=None, processing_node__isnull=False) | Q(pending_action__isnull=False)).exclude(Q(processing_lock=True)) diff --git a/app/static/app/js/components/EditTaskForm.jsx b/app/static/app/js/components/EditTaskForm.jsx index a12940af..904c8409 100644 --- a/app/static/app/js/components/EditTaskForm.jsx +++ b/app/static/app/js/components/EditTaskForm.jsx @@ -66,7 +66,8 @@ class EditTaskForm extends React.Component { that you have granted the current user sufficient permissions to view the processing node (by going to Administration -- Processing Nodes -- Select Node -- Object Permissions -- Add User/Group and check CAN VIEW PROCESSING NODE). If you are bringing a node back online, it will take about 30 seconds for WebODM to recognize it.`}); - } + }; + if (json.length === 0){ noProcessingNodesError(); return; @@ -89,17 +90,27 @@ class EditTaskForm extends React.Component { }; }); - // Find a node with lowest queue count - let minQueueCount = Math.min(...nodes.filter(node => node.enabled).map(node => node.queue_count)); - let minQueueCountNodes = nodes.filter(node => node.enabled && node.queue_count === minQueueCount); + let autoNode = null; - if (minQueueCountNodes.length === 0){ - noProcessingNodesError(nodes); - return; + // If the user has selected auto, and the a processing node has been assigned + // we need attempt to find the "auto" node to be the one that has been assigned + if (this.props.task && this.props.task.processing_node && this.props.task.auto_processing_node){ + autoNode = nodes.find(node => node.id === this.props.task.processing_node); } - // Choose at random - let autoNode = minQueueCountNodes[~~(Math.random() * minQueueCountNodes.length)]; + if (!autoNode){ + // Find a node with lowest queue count + let minQueueCount = Math.min(...nodes.filter(node => node.enabled).map(node => node.queue_count)); + let minQueueCountNodes = nodes.filter(node => node.enabled && node.queue_count === minQueueCount); + + if (minQueueCountNodes.length === 0){ + noProcessingNodesError(nodes); + return; + } + + // Choose at random + autoNode = minQueueCountNodes[~~(Math.random() * minQueueCountNodes.length)]; + } nodes.unshift({ id: autoNode.id, @@ -116,9 +127,13 @@ class EditTaskForm extends React.Component { // Have we specified a node? if (this.props.task && this.props.task.processing_node){ - this.selectNodeByKey(this.props.task.processing_node); + if (this.props.task.auto_processing_node){ + this.selectNodeByKey("auto"); + }else{ + this.selectNodeByKey(this.props.task.processing_node); + } }else{ - this.selectNodeByKey(nodes[0].key); + this.selectNodeByKey("auto"); } if (this.props.onFormLoaded) this.props.onFormLoaded(); diff --git a/app/static/app/js/components/ProjectListItem.jsx b/app/static/app/js/components/ProjectListItem.jsx index 7ecfb425..9af771cf 100644 --- a/app/static/app/js/components/ProjectListItem.jsx +++ b/app/static/app/js/components/ProjectListItem.jsx @@ -155,6 +155,9 @@ class ProjectListItem extends React.Component { }) .on("dragenter", () => { this.resetUploadState(); + }) + .on("sending", (file, xhr, formData) => { + formData.append('auto_processing_node', "false"); }); } @@ -172,7 +175,8 @@ class ProjectListItem extends React.Component { data: JSON.stringify({ name: taskInfo.name, options: taskInfo.options, - processing_node: taskInfo.selectedNode.id + processing_node: taskInfo.selectedNode.id, + auto_processing_node: taskInfo.selectedNode.key == "auto" }), dataType: 'json', type: 'PATCH' diff --git a/app/static/app/js/components/TaskListItem.jsx b/app/static/app/js/components/TaskListItem.jsx index 4d45984c..20462c56 100644 --- a/app/static/app/js/components/TaskListItem.jsx +++ b/app/static/app/js/components/TaskListItem.jsx @@ -189,6 +189,7 @@ class TaskListItem extends React.Component { taskInfo.uuid = ""; // TODO: we could reuse the UUID so that images don't need to be re-uploaded! This needs changes on node-odm as well. taskInfo.processing_node = taskInfo.selectedNode.id; + taskInfo.auto_processing_node = taskInfo.selectedNode.key == "auto"; delete(taskInfo.selectedNode); $.ajax({ diff --git a/app/tests/classes.py b/app/tests/classes.py index 32435cf9..d40cba01 100644 --- a/app/tests/classes.py +++ b/app/tests/classes.py @@ -61,9 +61,8 @@ class BootTransactionTestCase(TransactionTestCase): ''' Same as above, but inherits from TransactionTestCase ''' - @classmethod - def setUpClass(cls): - super(BootTransactionTestCase, cls).setUpClass() + def setUp(self): + super().setUp() boot() setupUsers() setupProjects() diff --git a/app/tests/test_api_task.py b/app/tests/test_api_task.py index d6495cc2..a3efbf2a 100644 --- a/app/tests/test_api_task.py +++ b/app/tests/test_api_task.py @@ -5,6 +5,7 @@ import time import shutil import logging +from datetime import timedelta import requests from django.contrib.auth.models import User @@ -13,10 +14,11 @@ from rest_framework.test import APIClient from app import pending_actions from app import scheduler +from django.utils import timezone from app.models import Project, Task, ImageUpload, task_directory_path from app.tests.classes import BootTransactionTestCase from nodeodm import status_codes -from nodeodm.models import ProcessingNode +from nodeodm.models import ProcessingNode, OFFLINE_MINUTES from app.testwatch import testWatch # We need to test the task API in a TransactionTestCase because @@ -37,6 +39,8 @@ def start_processing_node(): class TestApiTask(BootTransactionTestCase): def setUp(self): + super().setUp() + # We need to clear previous media_root content # This points to the test directory, but just in case # we double check that the directory is indeed a test directory @@ -323,4 +327,91 @@ class TestApiTask(BootTransactionTestCase): image2.close() node_odm.terminate() + def test_task_auto_processing_node(self): + project = Project.objects.get(name="User Test Project") + task = Task.objects.create(project=project, name="Test") + pnode = ProcessingNode.objects.create(hostname="invalid-host", port=11223) + another_pnode = ProcessingNode.objects.create(hostname="invalid-host-2", port=11223) + + # By default + self.assertTrue(task.auto_processing_node) + self.assertTrue(task.processing_node is None) + + # Simulate an error + task.last_error = "Test error" + task.save() + + scheduler.process_pending_tasks() + + # A processing node should not have been assigned + task.refresh_from_db() + self.assertTrue(task.processing_node is None) + + # Remove error + task.last_error = None + task.save() + + scheduler.process_pending_tasks() + + # A processing node should not have been assigned because no processing nodes are online + task.refresh_from_db() + self.assertTrue(task.processing_node is None) + + # Bring a proessing node online + pnode.last_refreshed = timezone.now() + pnode.save() + self.assertTrue(pnode.is_online()) + + # A processing node has been assigned + scheduler.process_pending_tasks() + task.refresh_from_db() + self.assertTrue(task.processing_node.id == pnode.id) + + # Task should have failed (no images provided, invalid host...) + self.assertTrue(task.last_error is not None) + + # Bring another processing node online, and bring the old one offline + pnode.last_refreshed = timezone.now() - timedelta(minutes=OFFLINE_MINUTES) + pnode.save() + + another_pnode.last_refreshed = timezone.now() + another_pnode.save() + + # Remove error + task.last_error = None + task.save() + + scheduler.process_pending_tasks() + + # Processing node is now cleared and a new one will be assigned on the next tick + task.refresh_from_db() + self.assertTrue(task.processing_node is None) + + scheduler.process_pending_tasks() + + task.refresh_from_db() + self.assertTrue(task.processing_node.id == another_pnode.id) + + def test_task_manual_processing_node(self): + user = User.objects.get(username="testuser") + project = Project.objects.create(name="User Test Project", owner=user) + task = Task.objects.create(project=project, name="Test", auto_processing_node=False) + + # Bring a processing node online + pnode = ProcessingNode.objects.create(hostname="invalid-host", port=11223) + pnode.last_refreshed = timezone.now() + pnode.save() + self.assertTrue(pnode.is_online()) + + scheduler.process_pending_tasks() + + # A processing node should not have been assigned because we asked + # not to via auto_processing_node = false + task.refresh_from_db() + self.assertTrue(task.processing_node is None) + + + + + diff --git a/nodeodm/models.py b/nodeodm/models.py index 17d5ab7e..27e8dfe7 100644 --- a/nodeodm/models.py +++ b/nodeodm/models.py @@ -11,9 +11,12 @@ from guardian.models import UserObjectPermissionBase from .api_client import ApiClient import json from django.db.models import signals -from requests.exceptions import ConnectionError +from datetime import datetime, timedelta from .exceptions import ProcessingError, ProcessingTimeout import simplejson +import django.utils.timezone + + def api(func): """ Catches JSON decoding errors that might happen when the server @@ -24,12 +27,14 @@ def api(func): return func(*args, **kwargs) except (json.decoder.JSONDecodeError, simplejson.JSONDecodeError) as e: raise ProcessingError(str(e)) - except (requests.exceptions.ConnectTimeout, requests.exceptions.ConnectionError) as e: + except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: raise ProcessingTimeout(str(e)) return wrapper +OFFLINE_MINUTES = 5 # Number of minutes a node hasn't been seen before it should be considered offline + class ProcessingNode(models.Model): hostname = models.CharField(max_length=255, help_text="Hostname or IP address where the node is located (can be an internal hostname as well). If you are using Docker, this is never 127.0.0.1 or localhost. Find the IP address of your host machine by running ifconfig on Linux or by checking your network settings.") port = models.PositiveIntegerField(help_text="Port that connects to the node's API") @@ -41,6 +46,19 @@ class ProcessingNode(models.Model): def __str__(self): return '{}:{}'.format(self.hostname, self.port) + @staticmethod + def find_best_available_node(): + """ + Attempts to find an available node (seen in the last 5 minutes, and with lowest queue count) + :return: ProcessingNode | None + """ + return ProcessingNode.objects.filter(last_refreshed__gte=timezone.now() - timedelta(minutes=OFFLINE_MINUTES)) \ + .order_by('queue_count').first() + + def is_online(self): + return self.last_refreshed is not None and \ + self.last_refreshed >= timezone.now() - timedelta(minutes=OFFLINE_MINUTES) + @api def update_node_info(self): """ @@ -60,7 +78,7 @@ class ProcessingNode(models.Model): self.last_refreshed = timezone.now() self.save() return True - except (ConnectionError, json.decoder.JSONDecodeError, simplejson.JSONDecodeError): + except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, json.decoder.JSONDecodeError, simplejson.JSONDecodeError): return False def api_client(self): diff --git a/nodeodm/tests.py b/nodeodm/tests.py index 3ff740ac..9151b156 100644 --- a/nodeodm/tests.py +++ b/nodeodm/tests.py @@ -1,9 +1,12 @@ +from datetime import timedelta + import requests from django.test import TestCase from django.utils import six import subprocess, time +from django.utils import timezone from os import path -from .models import ProcessingNode +from .models import ProcessingNode, OFFLINE_MINUTES from .api_client import ApiClient from requests.exceptions import ConnectionError from .exceptions import ProcessingError @@ -141,4 +144,35 @@ class TestClientApi(TestCase): self.assertRaises(ProcessingError, online_node.remove_task, uuid) # Task has been deleted - self.assertRaises(ProcessingError, online_node.get_task_info, uuid) \ No newline at end of file + self.assertRaises(ProcessingError, online_node.get_task_info, uuid) + + def test_find_best_available_node_and_is_online(self): + # Fixtures are all offline + self.assertTrue(ProcessingNode.find_best_available_node() is None) + + # Bring one online + pnode = ProcessingNode.objects.get(pk=1) + self.assertFalse(pnode.is_online()) + + pnode.last_refreshed = timezone.now() + pnode.queue_count = 2 + pnode.save() + + self.assertTrue(pnode.is_online()) + self.assertTrue(ProcessingNode.find_best_available_node().id == pnode.id) + + # Bring another online with lower queue count + another_pnode = ProcessingNode.objects.get(pk=2) + another_pnode.last_refreshed = pnode.last_refreshed + another_pnode.queue_count = 1 + another_pnode.save() + + self.assertTrue(ProcessingNode.find_best_available_node().id == another_pnode.id) + + # Bring it offline + another_pnode.last_refreshed -= timedelta(minutes=OFFLINE_MINUTES) + another_pnode.save() + self.assertFalse(another_pnode.is_online()) + + # Best choice now is original processing node + self.assertTrue(ProcessingNode.find_best_available_node().id == pnode.id)