2016-09-20 22:18:10 +00:00
|
|
|
from __future__ import unicode_literals
|
|
|
|
|
|
|
|
from django.db import models
|
2016-09-21 20:04:47 +00:00
|
|
|
from django.contrib.postgres import fields
|
2016-09-21 22:07:19 +00:00
|
|
|
from django.utils import timezone
|
2016-10-24 18:34:37 +00:00
|
|
|
from django.dispatch import receiver
|
2016-09-21 20:54:22 +00:00
|
|
|
from .api_client import ApiClient
|
2016-09-22 22:18:35 +00:00
|
|
|
import json
|
2016-10-24 18:34:37 +00:00
|
|
|
from django.db.models import signals
|
2016-10-25 16:19:14 +00:00
|
|
|
from requests.exceptions import ConnectionError
|
2016-10-26 16:54:46 +00:00
|
|
|
from .exceptions import ProcessingException
|
2016-09-20 22:18:10 +00:00
|
|
|
|
2016-09-21 20:04:47 +00:00
|
|
|
class ProcessingNode(models.Model):
|
|
|
|
hostname = models.CharField(max_length=255, help_text="Hostname where the node is located (can be an internal hostname as well)")
|
|
|
|
port = models.PositiveIntegerField(help_text="Port that connects to the node's API")
|
2016-10-18 20:23:10 +00:00
|
|
|
api_version = models.CharField(max_length=32, null=True, help_text="API version used by the node")
|
2016-09-21 20:04:47 +00:00
|
|
|
last_refreshed = models.DateTimeField(null=True, help_text="When was the information about this node last retrieved?")
|
|
|
|
queue_count = models.PositiveIntegerField(default=0, help_text="Number of tasks currently being processed by this node (as reported by the node itself)")
|
|
|
|
available_options = fields.JSONField(default=dict(), help_text="Description of the options that can be used for processing")
|
2016-09-21 20:54:22 +00:00
|
|
|
|
2016-09-21 20:04:47 +00:00
|
|
|
def __str__(self):
|
|
|
|
return '{}:{}'.format(self.hostname, self.port)
|
2016-09-21 20:54:22 +00:00
|
|
|
|
2016-09-21 22:07:19 +00:00
|
|
|
def update_node_info(self):
|
2016-09-21 20:54:22 +00:00
|
|
|
"""
|
2016-09-22 22:18:35 +00:00
|
|
|
Retrieves information and options from the node API
|
|
|
|
and saves it into the database.
|
|
|
|
|
|
|
|
:returns: True if information could be updated, False otherwise
|
2016-09-21 20:54:22 +00:00
|
|
|
"""
|
2016-10-25 14:47:49 +00:00
|
|
|
api_client = self.api_client()
|
2016-10-25 16:19:14 +00:00
|
|
|
try:
|
|
|
|
info = api_client.info()
|
2016-09-21 22:07:19 +00:00
|
|
|
self.api_version = info['version']
|
|
|
|
self.queue_count = info['taskQueueCount']
|
|
|
|
|
2016-10-25 14:47:49 +00:00
|
|
|
options = api_client.options()
|
2016-10-25 16:19:14 +00:00
|
|
|
self.available_options = options
|
|
|
|
self.last_refreshed = timezone.now()
|
|
|
|
self.save()
|
|
|
|
return True
|
|
|
|
except ConnectionError:
|
|
|
|
return False
|
2016-09-22 22:18:35 +00:00
|
|
|
|
2016-10-25 14:47:49 +00:00
|
|
|
def api_client(self):
|
|
|
|
return ApiClient(self.hostname, self.port)
|
|
|
|
|
2016-10-28 19:40:03 +00:00
|
|
|
def get_available_options_json(self, pretty=False):
|
2016-09-22 22:18:35 +00:00
|
|
|
"""
|
|
|
|
:returns available options in JSON string format
|
|
|
|
"""
|
2016-10-28 19:40:03 +00:00
|
|
|
kwargs = dict(indent=4, separators=(',', ": ")) if pretty else dict()
|
|
|
|
return json.dumps(self.available_options, **kwargs)
|
2016-10-24 18:34:37 +00:00
|
|
|
|
2016-10-25 20:04:24 +00:00
|
|
|
def process_new_task(self, images, name=None, options=[]):
|
2016-10-25 14:47:49 +00:00
|
|
|
"""
|
|
|
|
Sends a set of images (and optional GCP file) via the API
|
|
|
|
to start processing.
|
|
|
|
|
2016-10-26 16:54:46 +00:00
|
|
|
:param images: list of path images
|
|
|
|
:param name: name of the task
|
|
|
|
:param options: options to be used for processing ([{'name': optionName, 'value': optionValue}, ...])
|
|
|
|
|
2016-10-25 20:04:24 +00:00
|
|
|
:returns UUID of the newly created task
|
2016-10-25 14:47:49 +00:00
|
|
|
"""
|
2016-10-26 16:54:46 +00:00
|
|
|
if len(images) < 2: raise ProcessingException("Need at least 2 images")
|
|
|
|
|
2016-10-25 14:47:49 +00:00
|
|
|
api_client = self.api_client()
|
2016-10-25 20:04:24 +00:00
|
|
|
result = api_client.new_task(images, name, options)
|
|
|
|
if result['uuid']:
|
|
|
|
return result['uuid']
|
|
|
|
elif result['error']:
|
2016-10-26 16:54:46 +00:00
|
|
|
raise ProcessingException(result['error'])
|
|
|
|
|
|
|
|
def get_task_info(self, uuid):
|
|
|
|
"""
|
|
|
|
Gets information about this task, such as name, creation date,
|
|
|
|
processing time, status, command line options and number of
|
|
|
|
images being processed.
|
|
|
|
"""
|
|
|
|
api_client = self.api_client()
|
|
|
|
result = api_client.task_info(uuid)
|
|
|
|
if result['uuid']:
|
|
|
|
return result
|
|
|
|
elif result['error']:
|
|
|
|
raise ProcessingException(result['error'])
|
2016-10-25 14:47:49 +00:00
|
|
|
|
2016-10-24 18:34:37 +00:00
|
|
|
# First time a processing node is created, automatically try to update
|
|
|
|
@receiver(signals.post_save, sender=ProcessingNode, dispatch_uid="update_processing_node_info")
|
|
|
|
def auto_update_node_info(sender, instance, created, **kwargs):
|
|
|
|
if created:
|
|
|
|
instance.update_node_info()
|