2016-09-20 22:18:10 +00:00
|
|
|
from __future__ import unicode_literals
|
|
|
|
|
|
|
|
from django.db import models
|
2016-09-21 20:04:47 +00:00
|
|
|
from django.contrib.postgres import fields
|
2016-09-21 22:07:19 +00:00
|
|
|
from django.utils import timezone
|
2016-10-24 18:34:37 +00:00
|
|
|
from django.dispatch import receiver
|
2016-11-17 23:51:07 +00:00
|
|
|
from guardian.models import GroupObjectPermissionBase
|
|
|
|
from guardian.models import UserObjectPermissionBase
|
|
|
|
|
2016-09-21 20:54:22 +00:00
|
|
|
from .api_client import ApiClient
|
2016-09-22 22:18:35 +00:00
|
|
|
import json
|
2016-10-24 18:34:37 +00:00
|
|
|
from django.db.models import signals
|
2016-10-25 16:19:14 +00:00
|
|
|
from requests.exceptions import ConnectionError
|
2016-10-26 16:54:46 +00:00
|
|
|
from .exceptions import ProcessingException
|
2016-09-20 22:18:10 +00:00
|
|
|
|
2016-11-09 23:08:41 +00:00
|
|
|
def api(func):
|
|
|
|
"""
|
|
|
|
Catches JSON decoding errors that might happen when the server
|
|
|
|
answers unexpectedly
|
|
|
|
"""
|
|
|
|
def wrapper(*args,**kwargs):
|
|
|
|
try:
|
|
|
|
return func(*args, **kwargs)
|
|
|
|
except json.decoder.JSONDecodeError as e:
|
|
|
|
raise ProcessingException(str(e))
|
|
|
|
|
|
|
|
return wrapper
|
|
|
|
|
2016-09-21 20:04:47 +00:00
|
|
|
class ProcessingNode(models.Model):
|
|
|
|
hostname = models.CharField(max_length=255, help_text="Hostname where the node is located (can be an internal hostname as well)")
|
|
|
|
port = models.PositiveIntegerField(help_text="Port that connects to the node's API")
|
2016-10-18 20:23:10 +00:00
|
|
|
api_version = models.CharField(max_length=32, null=True, help_text="API version used by the node")
|
2016-09-21 20:04:47 +00:00
|
|
|
last_refreshed = models.DateTimeField(null=True, help_text="When was the information about this node last retrieved?")
|
|
|
|
queue_count = models.PositiveIntegerField(default=0, help_text="Number of tasks currently being processed by this node (as reported by the node itself)")
|
|
|
|
available_options = fields.JSONField(default=dict(), help_text="Description of the options that can be used for processing")
|
2016-09-21 20:54:22 +00:00
|
|
|
|
2016-09-21 20:04:47 +00:00
|
|
|
def __str__(self):
|
|
|
|
return '{}:{}'.format(self.hostname, self.port)
|
2016-09-21 20:54:22 +00:00
|
|
|
|
2016-11-09 23:08:41 +00:00
|
|
|
@api
|
2016-09-21 22:07:19 +00:00
|
|
|
def update_node_info(self):
|
2016-09-21 20:54:22 +00:00
|
|
|
"""
|
2016-09-22 22:18:35 +00:00
|
|
|
Retrieves information and options from the node API
|
|
|
|
and saves it into the database.
|
|
|
|
|
|
|
|
:returns: True if information could be updated, False otherwise
|
2016-09-21 20:54:22 +00:00
|
|
|
"""
|
2016-10-25 14:47:49 +00:00
|
|
|
api_client = self.api_client()
|
2016-10-25 16:19:14 +00:00
|
|
|
try:
|
|
|
|
info = api_client.info()
|
2016-09-21 22:07:19 +00:00
|
|
|
self.api_version = info['version']
|
|
|
|
self.queue_count = info['taskQueueCount']
|
|
|
|
|
2016-10-25 14:47:49 +00:00
|
|
|
options = api_client.options()
|
2016-10-25 16:19:14 +00:00
|
|
|
self.available_options = options
|
|
|
|
self.last_refreshed = timezone.now()
|
|
|
|
self.save()
|
|
|
|
return True
|
|
|
|
except ConnectionError:
|
|
|
|
return False
|
2016-09-22 22:18:35 +00:00
|
|
|
|
2016-10-25 14:47:49 +00:00
|
|
|
def api_client(self):
|
|
|
|
return ApiClient(self.hostname, self.port)
|
|
|
|
|
2016-10-28 19:40:03 +00:00
|
|
|
def get_available_options_json(self, pretty=False):
|
2016-09-22 22:18:35 +00:00
|
|
|
"""
|
|
|
|
:returns available options in JSON string format
|
|
|
|
"""
|
2016-10-28 19:40:03 +00:00
|
|
|
kwargs = dict(indent=4, separators=(',', ": ")) if pretty else dict()
|
|
|
|
return json.dumps(self.available_options, **kwargs)
|
2016-10-24 18:34:37 +00:00
|
|
|
|
2016-11-09 23:08:41 +00:00
|
|
|
@api
|
2016-10-25 20:04:24 +00:00
|
|
|
def process_new_task(self, images, name=None, options=[]):
|
2016-10-25 14:47:49 +00:00
|
|
|
"""
|
|
|
|
Sends a set of images (and optional GCP file) via the API
|
|
|
|
to start processing.
|
|
|
|
|
2016-10-26 16:54:46 +00:00
|
|
|
:param images: list of path images
|
|
|
|
:param name: name of the task
|
|
|
|
:param options: options to be used for processing ([{'name': optionName, 'value': optionValue}, ...])
|
|
|
|
|
2016-10-25 20:04:24 +00:00
|
|
|
:returns UUID of the newly created task
|
2016-10-25 14:47:49 +00:00
|
|
|
"""
|
2016-10-26 16:54:46 +00:00
|
|
|
if len(images) < 2: raise ProcessingException("Need at least 2 images")
|
|
|
|
|
2016-10-25 14:47:49 +00:00
|
|
|
api_client = self.api_client()
|
2016-10-25 20:04:24 +00:00
|
|
|
result = api_client.new_task(images, name, options)
|
|
|
|
if result['uuid']:
|
|
|
|
return result['uuid']
|
|
|
|
elif result['error']:
|
2016-10-26 16:54:46 +00:00
|
|
|
raise ProcessingException(result['error'])
|
|
|
|
|
2016-11-09 23:08:41 +00:00
|
|
|
@api
|
2016-10-26 16:54:46 +00:00
|
|
|
def get_task_info(self, uuid):
|
|
|
|
"""
|
|
|
|
Gets information about this task, such as name, creation date,
|
|
|
|
processing time, status, command line options and number of
|
|
|
|
images being processed.
|
|
|
|
"""
|
|
|
|
api_client = self.api_client()
|
|
|
|
result = api_client.task_info(uuid)
|
2016-11-04 18:19:18 +00:00
|
|
|
if isinstance(result, dict) and 'uuid' in result:
|
2016-10-26 16:54:46 +00:00
|
|
|
return result
|
2016-11-04 18:19:18 +00:00
|
|
|
elif isinstance(result, dict) and 'error' in result:
|
2016-10-26 16:54:46 +00:00
|
|
|
raise ProcessingException(result['error'])
|
2016-11-04 18:19:18 +00:00
|
|
|
else:
|
|
|
|
raise ProcessingException("Unknown result from task info: {}".format(result))
|
2016-10-25 14:47:49 +00:00
|
|
|
|
2016-11-09 23:08:41 +00:00
|
|
|
@api
|
2016-11-01 21:12:13 +00:00
|
|
|
def get_task_console_output(self, uuid, line):
|
|
|
|
"""
|
|
|
|
Retrieves the console output of the OpenDroneMap's process.
|
|
|
|
Useful for monitoring execution and to provide updates to the user.
|
|
|
|
"""
|
|
|
|
api_client = self.api_client()
|
|
|
|
result = api_client.task_output(uuid, line)
|
|
|
|
if isinstance(result, dict) and 'error' in result:
|
|
|
|
raise ProcessingException(result['error'])
|
|
|
|
elif isinstance(result, list):
|
|
|
|
return "".join(result)
|
|
|
|
else:
|
|
|
|
raise ProcessingException("Unknown response for console output: {}".format(result))
|
|
|
|
|
2016-11-09 23:08:41 +00:00
|
|
|
@api
|
2016-11-02 22:32:24 +00:00
|
|
|
def cancel_task(self, uuid):
|
|
|
|
"""
|
|
|
|
Cancels a task (stops its execution, or prevents it from being executed)
|
|
|
|
"""
|
|
|
|
api_client = self.api_client()
|
|
|
|
return self.handle_generic_post_response(api_client.task_cancel(uuid))
|
2016-11-04 18:19:18 +00:00
|
|
|
|
2016-11-09 23:08:41 +00:00
|
|
|
@api
|
2016-11-04 18:19:18 +00:00
|
|
|
def remove_task(self, uuid):
|
|
|
|
"""
|
|
|
|
Removes a task and deletes all of its assets
|
|
|
|
"""
|
|
|
|
api_client = self.api_client()
|
|
|
|
return self.handle_generic_post_response(api_client.task_remove(uuid))
|
|
|
|
|
2016-11-09 23:08:41 +00:00
|
|
|
@api
|
2016-11-05 20:23:54 +00:00
|
|
|
def download_task_asset(self, uuid, asset):
|
|
|
|
"""
|
|
|
|
Downloads a task asset
|
|
|
|
"""
|
|
|
|
api_client = self.api_client()
|
|
|
|
res = api_client.task_download(uuid, asset)
|
|
|
|
if isinstance(res, dict) and 'error' in res:
|
|
|
|
raise ProcessingException(res['error'])
|
|
|
|
else:
|
|
|
|
return res
|
|
|
|
|
2016-11-09 23:08:41 +00:00
|
|
|
@api
|
2016-11-04 18:19:18 +00:00
|
|
|
def restart_task(self, uuid):
|
|
|
|
"""
|
|
|
|
Restarts a task that was previously canceled or that had failed to process
|
|
|
|
"""
|
|
|
|
api_client = self.api_client()
|
|
|
|
return self.handle_generic_post_response(api_client.task_restart(uuid))
|
|
|
|
|
2016-11-02 22:32:24 +00:00
|
|
|
@staticmethod
|
|
|
|
def handle_generic_post_response(result):
|
|
|
|
"""
|
|
|
|
Handles a POST response that has either a "success" flag, or an error message.
|
|
|
|
This is a common response in node-OpenDroneMap POST calls.
|
|
|
|
:param result: result of API call
|
|
|
|
:return: True on success, raises ProcessingException otherwise
|
|
|
|
"""
|
|
|
|
if isinstance(result, dict) and 'error' in result:
|
|
|
|
raise ProcessingException(result['error'])
|
|
|
|
elif isinstance(result, dict) and 'success' in result:
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
raise ProcessingException("Unknown response: {}".format(result))
|
|
|
|
|
2016-11-17 23:51:07 +00:00
|
|
|
class Meta:
|
|
|
|
permissions = (
|
|
|
|
('view_processingnode', 'Can view processing node'),
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2016-10-24 18:34:37 +00:00
|
|
|
# First time a processing node is created, automatically try to update
|
|
|
|
@receiver(signals.post_save, sender=ProcessingNode, dispatch_uid="update_processing_node_info")
|
|
|
|
def auto_update_node_info(sender, instance, created, **kwargs):
|
|
|
|
if created:
|
|
|
|
instance.update_node_info()
|
2016-11-17 23:51:07 +00:00
|
|
|
|
|
|
|
|
|
|
|
class ProcessingNodeUserObjectPermission(UserObjectPermissionBase):
|
|
|
|
content_object = models.ForeignKey(ProcessingNode)
|
|
|
|
|
|
|
|
class ProcessingNodeGroupObjectPermission(GroupObjectPermissionBase):
|
|
|
|
content_object = models.ForeignKey(ProcessingNode)
|