api client additions, new task processing, get task info, testing

pull/39/head
Piero Toffanin 2016-10-26 12:54:46 -04:00
rodzic b121e65d5c
commit f7d519e52c
7 zmienionych plików z 121 dodań i 17 usunięć

Wyświetl plik

@ -6,10 +6,12 @@ from django.utils import timezone
from django.contrib.auth.models import User
from django.contrib.postgres import fields
from nodeodm.models import ProcessingNode
from django.dispatch import receiver
from guardian.shortcuts import get_perms_for_model, assign_perm
from guardian.models import UserObjectPermissionBase
from guardian.models import GroupObjectPermissionBase
from django.core.exceptions import ValidationError
from django.dispatch import receiver
from nodeodm.exceptions import ProcessingException
from django.db import transaction
def assets_directory_path(taskId, projectId, filename):
@ -55,6 +57,19 @@ class ProjectGroupObjectPermission(GroupObjectPermissionBase):
def gcp_directory_path(task, filename):
return assets_directory_path(task.id, task.project.id, filename)
def validate_task_options(value):
"""
Make sure that the format of this options field is valid
"""
if len(value) == 0: return
try:
for option in value:
if not option['name']: raise ValidationError("Name key not found in option")
if not option['value']: raise ValidationError("Value key not found in option")
except:
raise ValidationError("Invalid options")
class Task(models.Model):
STATUS_CODES = (
(10, 'QUEUED'),
@ -71,7 +86,7 @@ class Task(models.Model):
processing_time = models.IntegerField(default=-1, help_text="Number of milliseconds that elapsed since the beginning of this task (-1 indicates that no information is available)")
processing_node = models.ForeignKey(ProcessingNode, null=True, blank=True, help_text="Processing node assigned to this task (or null if this task has not been associated yet)")
status = models.IntegerField(choices=STATUS_CODES, null=True, blank=True, help_text="Current status of the task")
options = fields.JSONField(default=dict(), blank=True, help_text="Options that are being used to process this task")
options = fields.JSONField(default=dict(), blank=True, help_text="Options that are being used to process this task", validators=[validate_task_options])
console_output = models.TextField(null=True, blank=True, help_text="Console output of the OpenDroneMap's process")
ground_control_points = models.FileField(null=True, blank=True, upload_to=gcp_directory_path, help_text="Optional Ground Control Points file to use for processing")
@ -84,6 +99,11 @@ class Task(models.Model):
def __str__(self):
return 'Task ID: {}'.format(self.id)
def save(self, *args, **kwargs):
# Autovalidate on save
self.full_clean()
super(Task, self).save(*args, **kwargs)
@staticmethod
def create_from_images(images, project):
'''
@ -103,11 +123,36 @@ class Task(models.Model):
return None
def process(self):
if not self.uuid and self.processing_node:
# Nothing to do if we don't have a processing node...
if not self.processing_node: return
# Need to process some images (UUID not yet set)?
if not self.uuid:
print("Processing... {}".format(self))
import time
time.sleep(30)
print("Done! {}".format(self))
images = [image.path() for image in self.imageupload_set.all()]
try:
self.uuid = self.processing_node.process_new_task(images, self.name, self.options)
self.save()
# TODO: log process has started processing
except ProcessingException, e:
print("TASK ERROR: " + e.message)
# Need to update status (first time, queued or running?)
if self.uuid and self.status in [None, 10, 20]:
print("Have UUID: {}".format(self.uuid))
# Update task info from processing node
# TODO!
# Canceled, failed, or completed
if self.uuid and self.status in [30, 40, 50]:
print("DONE: " + str(self.status))
class Meta:
permissions = (
@ -124,3 +169,6 @@ class ImageUpload(models.Model):
def __str__(self):
return self.image.name
def path(self):
return self.image.path

Wyświetl plik

@ -71,7 +71,7 @@ def setup():
scheduler = BackgroundScheduler()
scheduler.start()
scheduler.add_job(update_nodes_info, 'interval', seconds=30)
scheduler.add_job(process_pending_tasks, 'interval', seconds=3)
scheduler.add_job(process_pending_tasks, 'interval', seconds=15)
except SchedulerAlreadyRunningError:
logger.warn("Scheduler already running (this is OK while testing)")

Wyświetl plik

@ -5,6 +5,7 @@ from django.test import Client
from app.models import Project, Task
from .classes import BootTestCase
from app import scheduler
from django.core.exceptions import ValidationError
class TestApp(BootTestCase):
fixtures = ['test_processingnodes', ]
@ -121,9 +122,29 @@ class TestApp(BootTestCase):
# Should not have permission
self.assertFalse(anotherUser.has_perm("delete_project", p))
def test_tasks(self):
# Create a new task
p = Project.objects.create(owner=User.objects.get(username="testuser"), name="test")
task = Task.objects.create(project=p)
# Test options validation
task.options = [{'name': 'test', 'value': 1}]
self.assertTrue(task.save() == None)
task.options = {'test': 1}
self.assertRaises(ValidationError, task.save)
task.options = [{'name': 'test', 'value': 1}, {"invalid": 1}]
self.assertRaises(ValidationError, task.save)
def test_scheduler(self):
self.assertTrue(scheduler.setup() == None)
# Can call update_nodes_info()
self.assertTrue(scheduler.update_nodes_info() == None)
# Can call function in background
self.assertTrue(scheduler.update_nodes_info(background=True).join() == None)
self.assertTrue(scheduler.teardown() == None)

Wyświetl plik

@ -21,6 +21,9 @@ class ApiClient:
def options(self):
return requests.get(self.url('/options')).json()
def task_info(self, uuid):
return requests.get(self.url('/task/{}/info').format(uuid)).json()
def new_task(self, images, name=None, options=[]):
"""
Starts processing of a new task
@ -29,6 +32,7 @@ class ApiClient:
:param options: options to be used for processing ([{'name': optionName, 'value': optionValue}, ...])
:return: UUID or error
"""
print(options)
files = [('images',
(os.path.basename(image), open(image, 'rb'), (mimetypes.guess_type(image)[0] or "image/jpg"))
) for image in images]

Wyświetl plik

@ -1,2 +1,2 @@
class NewTaskException(Exception):
class ProcessingException(Exception):
pass

Wyświetl plik

@ -8,7 +8,7 @@ from .api_client import ApiClient
import json
from django.db.models import signals
from requests.exceptions import ConnectionError
from .exceptions import NewTaskException
from .exceptions import ProcessingException
class ProcessingNode(models.Model):
hostname = models.CharField(max_length=255, help_text="Hostname where the node is located (can be an internal hostname as well)")
@ -56,15 +56,33 @@ class ProcessingNode(models.Model):
Sends a set of images (and optional GCP file) via the API
to start processing.
:param images: list of path images
:param name: name of the task
:param options: options to be used for processing ([{'name': optionName, 'value': optionValue}, ...])
:returns UUID of the newly created task
"""
if len(images) < 2: raise ProcessingException("Need at least 2 images")
api_client = self.api_client()
result = api_client.new_task(images, name, options)
if result['uuid']:
return result['uuid']
elif result['error']:
raise NewTaskException(result['error'])
raise ProcessingException(result['error'])
def get_task_info(self, uuid):
"""
Gets information about this task, such as name, creation date,
processing time, status, command line options and number of
images being processed.
"""
api_client = self.api_client()
result = api_client.task_info(uuid)
if result['uuid']:
return result
elif result['error']:
raise ProcessingException(result['error'])
# First time a processing node is created, automatically try to update
@receiver(signals.post_save, sender=ProcessingNode, dispatch_uid="update_processing_node_info")

Wyświetl plik

@ -67,10 +67,23 @@ class TestClientApi(TestCase):
online_node = ProcessingNode.objects.create(hostname="localhost", port=11223)
self.assertTrue(online_node.last_refreshed != None, "Last refreshed info is here (update_node_info() was called)")
def test_add_new_task(self):
pass #TODO
def test_client_api(self):
api = ApiClient("localhost", 11223)
# import glob
# a = ApiClient("localhost", 3000)
# print(a.info())
# print(a.new_task(glob.glob("fixtures/test_images/*.JPG"), "test", [{'name': 'cmvs-maxImages', 'value': 5}]))
# Can call info(), options()
self.assertTrue(type(api.info()['version']) in [str, unicode])
self.assertTrue(len(api.options()) > 0)
# Can call new_task()
import glob
res = api.new_task(
glob.glob("nodeodm/fixtures/test_images/*.JPG"),
"test",
[{'name': 'cmvs-maxImages', 'value': 5}])
uuid = res['uuid']
self.assertTrue(uuid != None)
# Can call task_info()
task_info = api.task_info(uuid)
self.assertTrue(type(task_info['dateCreated']) == long)
self.assertTrue(type(task_info['uuid']) in [str, unicode])