Merge pull request #39 from pierotofy/processing

Processing
pull/52/head
Piero Toffanin 2016-10-26 18:24:34 -04:00 zatwierdzone przez GitHub
commit 53b01aa7e2
32 zmienionych plików z 492 dodań i 143 usunięć

Wyświetl plik

@ -10,13 +10,8 @@ WORKDIR /webodm
# Install pip reqs
ADD requirements.txt /webodm/
RUN pip install --upgrade git+https://github.com/pierotofy/django-knockout
RUN pip install -r requirements.txt
# swagger_spec_validator is not up to date, fetch directly from github
# also install django-knockout
RUN pip install --upgrade git+https://github.com/Yelp/swagger_spec_validator
ADD . /webodm/
RUN git submodule init

Wyświetl plik

@ -62,7 +62,6 @@ Then:
```
pip install -r requirements.txt
pip install --upgrade git+git://github.com/Yelp/swagger_spec_validator
npm install -g webpack
npm install
webpack

Wyświetl plik

@ -3,12 +3,35 @@ from rest_framework.response import Response
from rest_framework.decorators import permission_classes
from rest_framework.permissions import DjangoModelPermissions
from rest_framework.filters import DjangoFilterBackend
from django_filters.rest_framework import FilterSet
from nodeodm.models import ProcessingNode
import django_filters
from django.utils import timezone
from datetime import timedelta
from django.db.models import Q
class ProcessingNodeSerializer(serializers.ModelSerializer):
class Meta:
model = ProcessingNode
fields = '__all__'
class ProcessingNodeFilter(FilterSet):
online = django_filters.MethodFilter()
def filter_online(self, queryset, value):
online_threshold = timezone.now() - timedelta(minutes=5)
if value.lower() in ['true', '1']:
return queryset.filter(last_refreshed__isnull=False, last_refreshed__gte=online_threshold)
elif value.lower() in ['false', '0']:
return queryset.filter(Q(last_refreshed__isnull=True) | Q(last_refreshed__lt=online_threshold))
return queryset
class Meta:
model = ProcessingNode
fields = ['online', 'id', 'hostname', 'port', 'api_version', 'queue_count', ]
class ProcessingNodeViewSet(viewsets.ModelViewSet):
"""
@ -18,7 +41,10 @@ class ProcessingNodeViewSet(viewsets.ModelViewSet):
# Don't need a "view node" permission. If you are logged-in, you can view nodes.
permission_classes = (DjangoModelPermissions, )
filter_backends = (DjangoFilterBackend, )
filter_class = ProcessingNodeFilter
pagination_class = None
serializer_class = ProcessingNodeSerializer
queryset = ProcessingNode.objects.all()
queryset = ProcessingNode.objects.all()

Wyświetl plik

@ -10,6 +10,7 @@ class ProjectSerializer(serializers.ModelSerializer):
class Meta:
model = models.Project
fields = '__all__'
class ProjectViewSet(viewsets.ModelViewSet):

Wyświetl plik

@ -3,7 +3,7 @@ from django.core.exceptions import ObjectDoesNotExist
from rest_framework import status, serializers, viewsets, filters, exceptions, permissions, parsers
from rest_framework.response import Response
from rest_framework.decorators import parser_classes, api_view
from app import models
from app import models, scheduler
from nodeodm.models import ProcessingNode
class TaskIDsSerializer(serializers.BaseSerializer):
@ -16,7 +16,7 @@ class TaskSerializer(serializers.ModelSerializer):
class Meta:
model = models.Task
fields = '__all__'
class TaskViewSet(viewsets.ViewSet):
"""
@ -83,6 +83,10 @@ class TaskViewSet(viewsets.ViewSet):
serializer = TaskSerializer(task, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
serializer.save()
# Call the scheduler (speed things up)
scheduler.process_pending_tasks(background=True)
return Response(serializer.data)
def partial_update(self, request, *args, **kwargs):

Wyświetl plik

@ -1,11 +1,7 @@
from __future__ import unicode_literals
from django.apps import AppConfig
from .boot import boot
class MainConfig(AppConfig):
name = 'app'
verbose_name = 'Application'
def ready(self):
boot()

Wyświetl plik

@ -1,11 +1,13 @@
def boot():
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User, Group
from django.db.utils import ProgrammingError
from . import signals
import logging
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User, Group
from django.db.utils import ProgrammingError
from . import signals, scheduler
import logging, os
from .models import Task
from webodm import settings
def boot():
logger = logging.getLogger('app.logger')
# Check default group
@ -24,5 +26,15 @@ def boot():
if User.objects.filter(is_superuser=True).count() == 0:
User.objects.create_superuser('admin', 'admin@example.com', 'admin')
logger.info("Created superuser")
# Unlock any Task that might have been locked
Task.objects.filter(processing_lock=True).update(processing_lock=False)
if not settings.TESTING:
# Setup and start scheduler
scheduler.setup()
scheduler.update_nodes_info(background=True)
except ProgrammingError:
logger.warn("Could not create default group/user. If running a migration, this is expected.")
logger.warn("Could not touch the database. If running a migration, this is expected.")

Wyświetl plik

@ -6,10 +6,12 @@ from django.utils import timezone
from django.contrib.auth.models import User
from django.contrib.postgres import fields
from nodeodm.models import ProcessingNode
from django.dispatch import receiver
from guardian.shortcuts import get_perms_for_model, assign_perm
from guardian.models import UserObjectPermissionBase
from guardian.models import GroupObjectPermissionBase
from django.core.exceptions import ValidationError
from django.dispatch import receiver
from nodeodm.exceptions import ProcessingException
from django.db import transaction
def assets_directory_path(taskId, projectId, filename):
@ -55,6 +57,19 @@ class ProjectGroupObjectPermission(GroupObjectPermissionBase):
def gcp_directory_path(task, filename):
return assets_directory_path(task.id, task.project.id, filename)
def validate_task_options(value):
"""
Make sure that the format of this options field is valid
"""
if len(value) == 0: return
try:
for option in value:
if not option['name']: raise ValidationError("Name key not found in option")
if not option['value']: raise ValidationError("Value key not found in option")
except:
raise ValidationError("Invalid options")
class Task(models.Model):
STATUS_CODES = (
(10, 'QUEUED'),
@ -64,15 +79,18 @@ class Task(models.Model):
(50, 'CANCELED')
)
uuid = models.CharField(max_length=255, null=True, blank=True, help_text="Identifier of the task (as returned by OpenDroneMap's REST API)")
uuid = models.CharField(max_length=255, db_index=True, null=True, blank=True, help_text="Identifier of the task (as returned by OpenDroneMap's REST API)")
project = models.ForeignKey(Project, on_delete=models.CASCADE, help_text="Project that this task belongs to")
name = models.CharField(max_length=255, null=True, blank=True, help_text="A label for the task")
processing_lock = models.BooleanField(default=False, help_text="A flag indicating whether this task is currently locked for processing. When this flag is turned on, the task is in the middle of a processing step.")
processing_time = models.IntegerField(default=-1, help_text="Number of milliseconds that elapsed since the beginning of this task (-1 indicates that no information is available)")
processing_node = models.ForeignKey(ProcessingNode, null=True, blank=True, help_text="Processing node assigned to this task (or null if this task has not been associated yet)")
status = models.IntegerField(choices=STATUS_CODES, null=True, blank=True, help_text="Current status of the task")
options = fields.JSONField(default=dict(), blank=True, help_text="Options that are being used to process this task")
status = models.IntegerField(choices=STATUS_CODES, db_index=True, null=True, blank=True, help_text="Current status of the task")
last_error = models.TextField(null=True, blank=True, help_text="The last processing error received")
options = fields.JSONField(default=dict(), blank=True, help_text="Options that are being used to process this task", validators=[validate_task_options])
console_output = models.TextField(null=True, blank=True, help_text="Console output of the OpenDroneMap's process")
ground_control_points = models.FileField(null=True, blank=True, upload_to=gcp_directory_path, help_text="Optional Ground Control Points file to use for processing")
# georeferenced_model
# orthophoto
# textured_model
@ -82,6 +100,11 @@ class Task(models.Model):
def __str__(self):
return 'Task ID: {}'.format(self.id)
def save(self, *args, **kwargs):
# Autovalidate on save
self.full_clean()
super(Task, self).save(*args, **kwargs)
@staticmethod
def create_from_images(images, project):
'''
@ -100,6 +123,58 @@ class Task(models.Model):
# In case of error
return None
def process(self):
# Nothing to do if we don't have a processing node...
if not self.processing_node: return
# Need to process some images (UUID not yet set)?
if not self.uuid:
print("Processing... {}".format(self))
images = [image.path() for image in self.imageupload_set.all()]
try:
self.uuid = self.processing_node.process_new_task(images, self.name, self.options)
self.save()
# TODO: log process has started processing
except ProcessingException, e:
print("TASK ERROR: " + e.message)
# Need to update status (first time, queued or running?)
if self.uuid and self.status in [None, 10, 20]:
print("Have UUID: {}".format(self.uuid))
# Update task info from processing node
try:
info = self.processing_node.get_task_info(self.uuid)
self.processing_time = info["processingTime"]
self.status = info["status"]["code"]
if "errorMessage" in info["status"]:
self.last_error = info["status"]["errorMessage"]
# Has the task just been canceled, failed, or completed?
# Note that we don't save the status code right away,
# if the assets retrieval fails we want to retry again.
if self.status in [30, 40, 50]:
print("ALMOST DONE: " + str(self.status))
# Completed?
if self.status == 40:
# TODO: retrieve assets
pass
else:
self.save()
else:
# Still waiting...
self.save()
except ProcessingException, e:
print("TASK ERROR 2: " + e.message)
class Meta:
permissions = (
('view_task', 'Can view task'),
@ -115,3 +190,6 @@ class ImageUpload(models.Model):
def __str__(self):
return self.image.name
def path(self):
return self.image.path

81
app/scheduler.py 100644
Wyświetl plik

@ -0,0 +1,81 @@
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
from threading import Thread, Lock
from multiprocessing.dummy import Pool as ThreadPool
from nodeodm.models import ProcessingNode
from app.models import Task
from django.db.models import Q
import random
logger = logging.getLogger('app.logger')
scheduler = BackgroundScheduler()
def job(func):
"""
Adds background={True|False} param to any function
so that we can call update_nodes_info(background=True) from the outside
"""
def wrapper(*args,**kwargs):
background = kwargs.get('background', False)
if background:
t = Thread(target=func)
t.start()
return t
else:
return func(*args, **kwargs)
return wrapper
@job
def update_nodes_info():
processing_nodes = ProcessingNode.objects.all()
for processing_node in processing_nodes:
processing_node.update_node_info()
tasks_mutex = Lock()
@job
def process_pending_tasks():
tasks = []
try:
tasks_mutex.acquire()
# All tasks that have a processing node assigned
# but don't have a UUID
# and that are not locked (being processed by another thread)
tasks = Task.objects.filter(Q(uuid=None) | Q(status=10) | Q(status=20)).exclude(Q(processing_node=None) | Q(processing_lock=True))
for task in tasks:
logger.info("Acquiring lock: {}".format(task))
task.processing_lock = True
task.save()
finally:
tasks_mutex.release()
def process(task):
task.process()
task.processing_lock = False
task.save()
if tasks.count() > 0:
pool = ThreadPool(tasks.count())
pool.map(process, tasks, chunksize=1)
pool.close()
pool.join()
def setup():
logger.info("Starting background scheduler...")
try:
scheduler.start()
scheduler.add_job(update_nodes_info, 'interval', seconds=30)
scheduler.add_job(process_pending_tasks, 'interval', seconds=5)
except SchedulerAlreadyRunningError:
logger.warn("Scheduler already running (this is OK while testing)")
def teardown():
logger.info("Stopping scheduler...")
try:
scheduler.shutdown()
except SchedulerNotRunningError:
logger.warn("Scheduler not running")

Wyświetl plik

@ -10,6 +10,7 @@ class EditTaskPanel extends React.Component {
this.state = {
name: "",
error: "",
advancedOptions: false,
loadedProcessingNodes: false,
selectedNode: null,
@ -36,8 +37,14 @@ class EditTaskPanel extends React.Component {
setTimeout(loadProcessingNodes, 1000);
}
this.nodesRequest = $.getJSON("/api/processingnodes/", json => {
this.nodesRequest =
$.getJSON("/api/processingnodes/?online=True", json => {
if (Array.isArray(json)){
// All nodes offline?
if (json.length === 0){
this.setState({error: "There are no processing nodes online. Make sure at least one of them is reachable."});
return;
}
let nodes = json.map(node => {
return {
@ -138,6 +145,12 @@ class EditTaskPanel extends React.Component {
}
render() {
if (this.state.error){
return (<div className="alert alert-warning alert-dismissible">
{this.state.error}
</div>);
}
if (this.state.editing){
let processingNodesOptions = "";
if (this.state.loadedProcessingNodes){

Wyświetl plik

@ -16,7 +16,8 @@ class ProjectList extends React.Component {
componentDidMount(){
// Load projects from API
this.serverRequest = $.getJSON(this.props.source, json => {
this.serverRequest =
$.getJSON(this.props.source, json => {
if (json.results){
this.setState({
projects: json.results,

Wyświetl plik

@ -1,7 +1,7 @@
import '../css/ProjectListItem.scss';
import React from 'react';
import update from 'react-addons-update';
import ProjectListItemPanel from './ProjectListItemPanel';
import TaskList from './TaskList';
import EditTaskPanel from './EditTaskPanel';
import UploadProgressBar from './UploadProgressBar';
import Dropzone from '../vendor/dropzone';
@ -13,12 +13,12 @@ class ProjectListItem extends React.Component {
super(props);
this.state = {
showPanel: false,
showTaskList: false,
updatingTask: false,
upload: this.getDefaultUploadState()
};
this.togglePanel = this.togglePanel.bind(this);
this.toggleTaskList = this.toggleTaskList.bind(this);
this.handleUpload = this.handleUpload.bind(this);
this.closeUploadError = this.closeUploadError.bind(this);
this.cancelUpload = this.cancelUpload.bind(this);
@ -133,7 +133,8 @@ class ProjectListItem extends React.Component {
this.setUploadState({showEditTask: false});
this.setState({updatingTask: true});
this.updateTaskRequest = $.ajax({
this.updateTaskRequest =
$.ajax({
url: `/api/projects/${this.props.data.id}/tasks/${this.state.upload.taskId}/`,
contentType: 'application/json',
data: JSON.stringify({
@ -144,10 +145,10 @@ class ProjectListItem extends React.Component {
dataType: 'json',
type: 'PATCH'
}).done(() => {
if (this.state.showPanel){
this.projectListItemPanel.refresh();
if (this.state.showTaskList){
this.taskList.refresh();
}else{
this.setState({showPanel: true});
this.setState({showTaskList: true});
}
}).fail(() => {
this.setUploadState({error: "Could not update task information. Plese try again."});
@ -162,9 +163,9 @@ class ProjectListItem extends React.Component {
}
}
togglePanel(){
toggleTaskList(){
this.setState({
showPanel: !this.state.showPanel
showTaskList: !this.state.showTaskList
});
}
@ -222,10 +223,18 @@ class ProjectListItem extends React.Component {
</ul>
</div>
<i style={{width: 14}} className={'fa ' + (this.state.showPanel ? 'fa-caret-down' : 'fa-caret-right')}>
</i> <a href="javascript:void(0);" onClick={this.togglePanel}>
<span className="project-name">
{this.props.data.name}
</a>
</span>
<div className="project-description">
{this.props.data.description}
</div>
<div className="row project-links">
<i className='fa fa-tasks'>
</i> <a href="javascript:void(0);" onClick={this.toggleTaskList}>
{(this.state.showTaskList ? 'Hide' : 'Show')} Tasks
</a>
</div>
</div>
<div className="row">
<div className="dropzone" ref={this.setRef("dropzone")}>
@ -233,7 +242,7 @@ class ProjectListItem extends React.Component {
</div>
</div>
{this.state.showPanel ? <ProjectListItemPanel ref={this.setRef("projectListItemPanel")}/> : ""}
{this.state.showTaskList ? <TaskList ref={this.setRef("taskList")}/> : ""}
{this.state.upload.showEditTask ? <UploadProgressBar {...this.state.upload}/> : ""}

Wyświetl plik

@ -1,25 +0,0 @@
import React from 'react';
class ProjectListItemPanel extends React.Component {
constructor(){
super();
}
componentDidMount(){
console.log("DISPLAY");
}
refresh(){
console.log("REFRESH");
}
render() {
return (
<div className="project-list-item-panel">
TODO
</div>
);
}
}
export default ProjectListItemPanel;

Wyświetl plik

@ -0,0 +1,26 @@
import React from 'react';
import '../css/TaskList.scss';
class TaskList extends React.Component {
constructor(){
super();
}
componentDidMount(){
console.log("DISPLAY");
}
refresh(){
console.log("REFRESH");
}
render() {
return (
<div className="task-list">
<span>Updating task list... <i className="fa fa-refresh fa-spin fa-fw"></i></span>
</div>
);
}
}
export default TaskList;

Wyświetl plik

@ -1,8 +1,4 @@
#dashboard-app{
.project-list-item-panel{
min-height: 100px;
}
.row{
&.no-margin{
margin: 0;

Wyświetl plik

@ -1,5 +1,16 @@
.project-list-item{
min-height: 60px;
.project-name{
font-weight: bold;
}
.project-links{
font-size: 90%;
i{
margin-right: 4px;
}
}
.dz-preview{
display: none;

Wyświetl plik

@ -0,0 +1,5 @@
.task-list{
background-color: #ecf0f1;
padding: 10px;
min-height: 100px;
}

Wyświetl plik

@ -10,7 +10,7 @@ class BootTestCase(TestCase):
module should derive from this class instead of TestCase.
We don't use fixtures because we have signal initialization login
for some models, which doesn't play well with them, and this: http://blog.namis.me/2012/04/21/burn-your-fixtures/
for some models, which doesn't play well with them.
'''
@classmethod
def setUpClass(cls):
@ -46,3 +46,7 @@ class BootTestCase(TestCase):
boot()
setupUsers()
setupProjects()
@classmethod
def tearDownClass(cls):
super(BootTestCase, cls).tearDownClass()

Wyświetl plik

@ -137,11 +137,26 @@ class TestApi(BootTestCase):
self.assertTrue(len(res.data) == 1)
self.assertTrue(res.data[0]["hostname"] == "localhost")
# Can use filters
res = client.get('/api/processingnodes/?id={}'.format(pnode.id))
self.assertEqual(res.status_code, status.HTTP_200_OK)
self.assertTrue(len(res.data) == 1)
# Can filter online
res = client.get('/api/processingnodes/?online=true')
self.assertEqual(res.status_code, status.HTTP_200_OK)
self.assertTrue(len(res.data) == 0)
res = client.get('/api/processingnodes/?online=false')
self.assertEqual(res.status_code, status.HTTP_200_OK)
self.assertTrue(len(res.data) == 1)
# Can get single processing node as normal user
res = client.get('/api/processingnodes/{}/'.format(pnode.id))
self.assertEqual(res.status_code, status.HTTP_200_OK)
self.assertTrue(res.data["hostname"] == "localhost")
# Cannot delete a processing node as normal user
res = client.delete('/api/processingnodes/{}/'.format(pnode.id))
self.assertTrue(res.status_code, status.HTTP_403_FORBIDDEN)

Wyświetl plik

@ -4,6 +4,8 @@ from django.test import Client
from app.models import Project, Task
from .classes import BootTestCase
from app import scheduler
from django.core.exceptions import ValidationError
class TestApp(BootTestCase):
fixtures = ['test_processingnodes', ]
@ -119,3 +121,30 @@ class TestApp(BootTestCase):
# Should not have permission
self.assertFalse(anotherUser.has_perm("delete_project", p))
def test_tasks(self):
# Create a new task
p = Project.objects.create(owner=User.objects.get(username="testuser"), name="test")
task = Task.objects.create(project=p)
# Test options validation
task.options = [{'name': 'test', 'value': 1}]
self.assertTrue(task.save() == None)
task.options = {'test': 1}
self.assertRaises(ValidationError, task.save)
task.options = [{'name': 'test', 'value': 1}, {"invalid": 1}]
self.assertRaises(ValidationError, task.save)
def test_scheduler(self):
self.assertTrue(scheduler.setup() == None)
# Can call update_nodes_info()
self.assertTrue(scheduler.update_nodes_info() == None)
# Can call function in background
self.assertTrue(scheduler.update_nodes_info(background=True).join() == None)
self.assertTrue(scheduler.teardown() == None)

Wyświetl plik

@ -1,5 +1,7 @@
from django.conf.urls import url, include
from . import views
from app.boot import boot
from webodm import settings
urlpatterns = [
url(r'^$', views.index, name='index'),
@ -7,4 +9,8 @@ urlpatterns = [
url(r'^processingnode/([\d]+)/$', views.processing_node, name='processing_node'),
url(r'^api/', include("app.api.urls")),
]
]
# Test cases call boot() independently
if not settings.TESTING:
boot()

Wyświetl plik

@ -1,35 +1,45 @@
"""
A wrapper around Bravado to communicate with a node-OpenDroneMap node.
An interface to node-OpenDroneMap's API
https://github.com/pierotofy/node-OpenDroneMap/blob/master/docs/index.adoc
"""
from bravado.client import SwaggerClient
from bravado.exception import HTTPError
from requests import ConnectionError
import requests
import mimetypes
import json
import os
from urlparse import urlunparse
class ApiClient:
def check_client(func):
def check(self, *args, **kwargs):
"""
Makes sure that the client has been instantiated.
Sometimes this will fail (rest endpoint might be offline),
so we need to handle it gracefully...
"""
if not hasattr(self, 'client'):
try:
self.client = SwaggerClient.from_url('http://{}:{}/swagger.json'.format(self.host, self.port))
except (ConnectionError, HTTPError) as err:
return None
return func(self, *args, **kwargs)
return check
def __init__(self, host, port):
self.host = host
self.port = port
@check_client
def info(self):
return self.client.server.get_info().result()
def url(self, url):
netloc = self.host if self.port == 80 else "{}:{}".format(self.host, self.port)
# TODO: https support
return urlunparse(('http', netloc, url, '', '', ''))
def info(self):
return requests.get(self.url('/info')).json()
@check_client
def options(self):
return self.client.server.get_options().result()
return requests.get(self.url('/options')).json()
def task_info(self, uuid):
return requests.get(self.url('/task/{}/info').format(uuid)).json()
def new_task(self, images, name=None, options=[]):
"""
Starts processing of a new task
:param images: list of path images
:param name: name of the task
:param options: options to be used for processing ([{'name': optionName, 'value': optionValue}, ...])
:return: UUID or error
"""
files = [('images',
(os.path.basename(image), open(image, 'rb'), (mimetypes.guess_type(image)[0] or "image/jpg"))
) for image in images]
return requests.post(self.url("/task/new"),
files=files,
data={'name': name, 'options': json.dumps(options)}).json()

Wyświetl plik

@ -0,0 +1,2 @@
class ProcessingException(Exception):
pass

Plik binarny nie jest wyświetlany.

Po

Szerokość:  |  Wysokość:  |  Rozmiar: 4.9 MiB

Plik binarny nie jest wyświetlany.

Po

Szerokość:  |  Wysokość:  |  Rozmiar: 5.0 MiB

Plik binarny nie jest wyświetlany.

Po

Szerokość:  |  Wysokość:  |  Rozmiar: 4.9 MiB

Plik binarny nie jest wyświetlany.

Po

Szerokość:  |  Wysokość:  |  Rozmiar: 5.0 MiB

Plik binarny nie jest wyświetlany.

Po

Szerokość:  |  Wysokość:  |  Rozmiar: 4.9 MiB

Wyświetl plik

@ -3,8 +3,12 @@ from __future__ import unicode_literals
from django.db import models
from django.contrib.postgres import fields
from django.utils import timezone
from django.dispatch import receiver
from .api_client import ApiClient
import json
from django.db.models import signals
from requests.exceptions import ConnectionError
from .exceptions import ProcessingException
class ProcessingNode(models.Model):
hostname = models.CharField(max_length=255, help_text="Hostname where the node is located (can be an internal hostname as well)")
@ -14,12 +18,6 @@ class ProcessingNode(models.Model):
queue_count = models.PositiveIntegerField(default=0, help_text="Number of tasks currently being processed by this node (as reported by the node itself)")
available_options = fields.JSONField(default=dict(), help_text="Description of the options that can be used for processing")
def __init__(self, *args, **kwargs):
super(ProcessingNode, self).__init__(*args, **kwargs)
# Initialize api client
self.api_client = ApiClient(self.hostname, self.port)
def __str__(self):
return '{}:{}'.format(self.hostname, self.port)
@ -30,22 +28,64 @@ class ProcessingNode(models.Model):
:returns: True if information could be updated, False otherwise
"""
info = self.api_client.info()
if info != None:
api_client = self.api_client()
try:
info = api_client.info()
self.api_version = info['version']
self.queue_count = info['taskQueueCount']
options = self.api_client.options()
if options != None:
self.available_options = options
self.last_refreshed = timezone.now()
options = api_client.options()
self.available_options = options
self.last_refreshed = timezone.now()
self.save()
return True
except ConnectionError:
return False
self.save()
return True
return False
def api_client(self):
return ApiClient(self.hostname, self.port)
def get_available_options_json(self):
"""
:returns available options in JSON string format
"""
return json.dumps(self.available_options)
def process_new_task(self, images, name=None, options=[]):
"""
Sends a set of images (and optional GCP file) via the API
to start processing.
:param images: list of path images
:param name: name of the task
:param options: options to be used for processing ([{'name': optionName, 'value': optionValue}, ...])
:returns UUID of the newly created task
"""
if len(images) < 2: raise ProcessingException("Need at least 2 images")
api_client = self.api_client()
result = api_client.new_task(images, name, options)
if result['uuid']:
return result['uuid']
elif result['error']:
raise ProcessingException(result['error'])
def get_task_info(self, uuid):
"""
Gets information about this task, such as name, creation date,
processing time, status, command line options and number of
images being processed.
"""
api_client = self.api_client()
result = api_client.task_info(uuid)
if result['uuid']:
return result
elif result['error']:
raise ProcessingException(result['error'])
# First time a processing node is created, automatically try to update
@receiver(signals.post_save, sender=ProcessingNode, dispatch_uid="update_processing_node_info")
def auto_update_node_info(sender, instance, created, **kwargs):
if created:
instance.update_node_info()

Wyświetl plik

@ -4,6 +4,7 @@ import subprocess, time
from os import path
from .models import ProcessingNode
from .api_client import ApiClient
from requests.exceptions import ConnectionError
current_dir = path.dirname(path.realpath(__file__))
@ -31,8 +32,8 @@ class TestClientApi(TestCase):
def test_offline_api(self):
api = ApiClient("offline-host", 3000)
self.assertTrue(api.info() == None)
self.assertTrue(api.options() == None)
self.assertRaises(ConnectionError, api.info)
self.assertRaises(ConnectionError, api.options)
def test_info(self):
info = self.api_client.info()
@ -46,12 +47,12 @@ class TestClientApi(TestCase):
def test_online_processing_node(self):
online_node = ProcessingNode.objects.get(pk=1)
self.assertTrue(str(online_node) == "localhost:11223", "Formatting string works")
self.assertTrue(online_node.last_refreshed != 0, "Last refreshed not yet set")
self.assertTrue(online_node.last_refreshed == None, "Last refreshed not yet set")
self.assertTrue(len(online_node.available_options) == 0, "Available options not yet set")
self.assertTrue(online_node.api_version == "", "API version is not set")
self.assertTrue(online_node.update_node_info(), "Could update info")
self.assertTrue(online_node.last_refreshed != 0, "Last refreshed is set")
self.assertTrue(online_node.last_refreshed != None, "Last refreshed is set")
self.assertTrue(len(online_node.available_options) > 0, "Available options are set")
self.assertTrue(online_node.api_version != "", "API version is set")
@ -61,3 +62,28 @@ class TestClientApi(TestCase):
offline_node = ProcessingNode.objects.get(pk=2)
self.assertFalse(offline_node.update_node_info(), "Could not update info (offline)")
self.assertTrue(offline_node.api_version == "", "API version is not set")
def test_auto_update_node_info(self):
online_node = ProcessingNode.objects.create(hostname="localhost", port=11223)
self.assertTrue(online_node.last_refreshed != None, "Last refreshed info is here (update_node_info() was called)")
def test_client_api(self):
api = ApiClient("localhost", 11223)
# Can call info(), options()
self.assertTrue(type(api.info()['version']) in [str, unicode])
self.assertTrue(len(api.options()) > 0)
# Can call new_task()
import glob
res = api.new_task(
glob.glob("nodeodm/fixtures/test_images/*.JPG"),
"test",
[{'name': 'cmvs-maxImages', 'value': 5}])
uuid = res['uuid']
self.assertTrue(uuid != None)
# Can call task_info()
task_info = api.task_info(uuid)
self.assertTrue(isinstance(task_info['dateCreated'], (int, long)))
self.assertTrue(isinstance(task_info['uuid'], (str, unicode)))

Wyświetl plik

@ -1,39 +1,22 @@
anyjson==0.3.3
attrs==16.2.0
bravado==8.3.0
bravado-core==4.5.0
cffi==1.8.3
crochet==1.5.0
cryptography==1.5
APScheduler==3.2.0
Django==1.10
django-common-helpers==0.8.0
django-filter==0.15.2
django-filter==0.15.3
django-guardian==1.4.6
django-webpack-loader==0.3.3
djangorestframework==3.4.7
djangorestframework==3.5.1
drf-nested-routers==0.11.1
enum34==1.1.6
fido==3.2.0
functools32==3.2.3.post2
idna==2.1
ipaddress==1.0.17
jsonschema==2.5.1
funcsigs==1.0.2
futures==3.0.5
Markdown==2.6.7
pillow==3.3.1
pip-autoremove==0.9.0
psycopg2==2.6.2
pyasn1==0.1.9
pyasn1-modules==0.0.8
pycparser==2.14
pyOpenSSL==16.1.0
python-dateutil==2.5.3
pytz==2016.6.1
PyYAML==3.12
requests==2.11.1
service-identity==16.0.0
simplejson==3.8.2
rfc3987==1.3.7
six==1.10.0
swagger-spec-validator==2.0.2
Twisted==16.4.1
yelp-bytes==0.3.0
yelp-encodings==0.1.3
zope.interface==4.3.2
strict-rfc3339==0.7
tzlocal==1.3
webcolors==1.5

Wyświetl plik

@ -10,7 +10,7 @@ For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.10/ref/settings/
"""
import os
import os, sys
from django.contrib.messages import constants as messages
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
@ -181,6 +181,10 @@ LOGGING = {
'app.logger': {
'handlers': ['console'],
'level': 'INFO',
},
'apscheduler.executors.default': {
'handlers': ['console'],
'level': 'WARN',
}
}
}
@ -212,6 +216,8 @@ REST_FRAMEWORK = {
'PAGE_SIZE': 10,
}
TESTING = sys.argv[1:2] == ['test']
try:
from .local_settings import *
except ImportError: