2016-09-20 22:18:10 +00:00
from __future__ import unicode_literals
2016-12-13 14:52:15 +00:00
import requests
2016-09-20 22:18:10 +00:00
from django . db import models
2016-09-21 20:04:47 +00:00
from django . contrib . postgres import fields
2016-09-21 22:07:19 +00:00
from django . utils import timezone
2016-10-24 18:34:37 +00:00
from django . dispatch import receiver
2016-11-17 23:51:07 +00:00
from guardian . models import GroupObjectPermissionBase
from guardian . models import UserObjectPermissionBase
2016-09-21 20:54:22 +00:00
from . api_client import ApiClient
2016-09-22 22:18:35 +00:00
import json
2016-10-24 18:34:37 +00:00
from django . db . models import signals
2018-02-16 20:07:53 +00:00
from datetime import timedelta
2017-02-08 17:30:11 +00:00
from . exceptions import ProcessingError , ProcessingTimeout
2017-01-21 17:50:53 +00:00
import simplejson
2017-02-15 19:20:41 +00:00
2016-11-09 23:08:41 +00:00
def api ( func ) :
"""
Catches JSON decoding errors that might happen when the server
answers unexpectedly
"""
def wrapper ( * args , * * kwargs ) :
try :
return func ( * args , * * kwargs )
2017-01-21 17:50:53 +00:00
except ( json . decoder . JSONDecodeError , simplejson . JSONDecodeError ) as e :
2017-02-08 17:30:11 +00:00
raise ProcessingError ( str ( e ) )
2017-02-15 19:20:41 +00:00
except ( requests . exceptions . Timeout , requests . exceptions . ConnectionError ) as e :
2017-02-08 17:30:11 +00:00
raise ProcessingTimeout ( str ( e ) )
2016-11-09 23:08:41 +00:00
2017-01-21 17:50:53 +00:00
2016-11-09 23:08:41 +00:00
return wrapper
2017-02-15 19:20:41 +00:00
OFFLINE_MINUTES = 5 # Number of minutes a node hasn't been seen before it should be considered offline
2016-09-21 20:04:47 +00:00
class ProcessingNode ( models . Model ) :
2016-12-01 13:32:45 +00:00
hostname = models . CharField ( max_length = 255 , help_text = " Hostname or IP address where the node is located (can be an internal hostname as well). If you are using Docker, this is never 127.0.0.1 or localhost. Find the IP address of your host machine by running ifconfig on Linux or by checking your network settings. " )
2016-09-21 20:04:47 +00:00
port = models . PositiveIntegerField ( help_text = " Port that connects to the node ' s API " )
2016-10-18 20:23:10 +00:00
api_version = models . CharField ( max_length = 32 , null = True , help_text = " API version used by the node " )
2016-09-21 20:04:47 +00:00
last_refreshed = models . DateTimeField ( null = True , help_text = " When was the information about this node last retrieved? " )
queue_count = models . PositiveIntegerField ( default = 0 , help_text = " Number of tasks currently being processed by this node (as reported by the node itself) " )
2019-01-16 15:47:35 +00:00
available_options = fields . JSONField ( default = dict , help_text = " Description of the options that can be used for processing " )
2018-06-25 17:31:42 +00:00
token = models . CharField ( max_length = 1024 , blank = True , default = " " , help_text = " Token to use for authentication. If the node doesn ' t have authentication, you can leave this field blank. " )
2018-12-04 17:11:22 +00:00
max_images = models . PositiveIntegerField ( help_text = " Maximum number of images accepted by this node. " , blank = True , null = True )
2019-01-15 14:06:22 +00:00
odm_version = models . CharField ( max_length = 32 , null = True , help_text = " OpenDroneMap version used by the node " )
label = models . CharField ( max_length = 255 , default = " " , blank = True , help_text = " Optional label for this node. When set, this label will be shown instead of the hostname:port name. " )
2018-06-25 17:31:42 +00:00
2016-09-21 20:04:47 +00:00
def __str__ ( self ) :
2019-01-15 14:06:22 +00:00
if self . label != " " :
return self . label
else :
return ' {} : {} ' . format ( self . hostname , self . port )
2016-09-21 20:54:22 +00:00
2017-02-15 19:20:41 +00:00
@staticmethod
def find_best_available_node ( ) :
"""
Attempts to find an available node ( seen in the last 5 minutes , and with lowest queue count )
: return : ProcessingNode | None
"""
return ProcessingNode . objects . filter ( last_refreshed__gte = timezone . now ( ) - timedelta ( minutes = OFFLINE_MINUTES ) ) \
. order_by ( ' queue_count ' ) . first ( )
def is_online ( self ) :
return self . last_refreshed is not None and \
self . last_refreshed > = timezone . now ( ) - timedelta ( minutes = OFFLINE_MINUTES )
2016-11-09 23:08:41 +00:00
@api
2016-09-21 22:07:19 +00:00
def update_node_info ( self ) :
2016-09-21 20:54:22 +00:00
"""
2016-09-22 22:18:35 +00:00
Retrieves information and options from the node API
and saves it into the database .
: returns : True if information could be updated , False otherwise
2016-09-21 20:54:22 +00:00
"""
2018-04-22 18:49:20 +00:00
api_client = self . api_client ( timeout = 5 )
2016-10-25 16:19:14 +00:00
try :
info = api_client . info ( )
2018-12-04 15:45:09 +00:00
if ' error ' in info :
return False
2016-09-21 22:07:19 +00:00
self . api_version = info [ ' version ' ]
self . queue_count = info [ ' taskQueueCount ' ]
2018-12-04 17:11:22 +00:00
if ' maxImages ' in info :
self . max_images = info [ ' maxImages ' ]
2019-01-15 14:06:22 +00:00
if ' odmVersion ' in info :
self . odm_version = info [ ' odmVersion ' ]
2018-12-04 17:11:22 +00:00
2016-10-25 14:47:49 +00:00
options = api_client . options ( )
2016-10-25 16:19:14 +00:00
self . available_options = options
self . last_refreshed = timezone . now ( )
self . save ( )
return True
2017-02-15 19:20:41 +00:00
except ( requests . exceptions . Timeout , requests . exceptions . ConnectionError , json . decoder . JSONDecodeError , simplejson . JSONDecodeError ) :
2016-10-25 16:19:14 +00:00
return False
2016-09-22 22:18:35 +00:00
2018-04-22 18:49:20 +00:00
def api_client ( self , timeout = 30 ) :
2018-06-25 17:31:42 +00:00
return ApiClient ( self . hostname , self . port , self . token , timeout )
2016-10-25 14:47:49 +00:00
2016-10-28 19:40:03 +00:00
def get_available_options_json ( self , pretty = False ) :
2016-09-22 22:18:35 +00:00
"""
: returns available options in JSON string format
"""
2016-10-28 19:40:03 +00:00
kwargs = dict ( indent = 4 , separators = ( ' , ' , " : " ) ) if pretty else dict ( )
return json . dumps ( self . available_options , * * kwargs )
2016-10-24 18:34:37 +00:00
2016-11-09 23:08:41 +00:00
@api
2018-12-04 21:29:54 +00:00
def process_new_task ( self , images , name = None , options = [ ] , progress_callback = None ) :
2016-10-25 14:47:49 +00:00
"""
Sends a set of images ( and optional GCP file ) via the API
to start processing .
2016-10-26 16:54:46 +00:00
: param images : list of path images
: param name : name of the task
: param options : options to be used for processing ( [ { ' name ' : optionName , ' value ' : optionValue } , . . . ] )
2018-12-04 21:29:54 +00:00
: param progress_callback : optional callback invoked during the upload images process to be used to report status .
2016-10-26 16:54:46 +00:00
2016-10-25 20:04:24 +00:00
: returns UUID of the newly created task
2016-10-25 14:47:49 +00:00
"""
2017-02-08 17:30:11 +00:00
if len ( images ) < 2 : raise ProcessingError ( " Need at least 2 images " )
2016-10-26 16:54:46 +00:00
2016-10-25 14:47:49 +00:00
api_client = self . api_client ( )
2016-12-13 14:52:15 +00:00
try :
2018-12-04 21:29:54 +00:00
result = api_client . new_task ( images , name , options , progress_callback )
2016-12-13 14:52:15 +00:00
except requests . exceptions . ConnectionError as e :
2017-02-08 17:30:11 +00:00
raise ProcessingError ( e )
2016-12-13 14:52:15 +00:00
2017-02-08 16:27:23 +00:00
if isinstance ( result , dict ) and ' uuid ' in result :
2016-10-25 20:04:24 +00:00
return result [ ' uuid ' ]
2017-02-08 16:27:23 +00:00
elif isinstance ( result , dict ) and ' error ' in result :
2017-02-08 17:30:11 +00:00
raise ProcessingError ( result [ ' error ' ] )
2017-02-01 23:11:39 +00:00
else :
2017-02-08 17:30:11 +00:00
raise ProcessingError ( " Unexpected answer from server: {} " . format ( result ) )
2016-10-26 16:54:46 +00:00
2016-11-09 23:08:41 +00:00
@api
2016-10-26 16:54:46 +00:00
def get_task_info ( self , uuid ) :
"""
Gets information about this task , such as name , creation date ,
processing time , status , command line options and number of
images being processed .
"""
api_client = self . api_client ( )
result = api_client . task_info ( uuid )
2016-11-04 18:19:18 +00:00
if isinstance ( result , dict ) and ' uuid ' in result :
2016-10-26 16:54:46 +00:00
return result
2016-11-04 18:19:18 +00:00
elif isinstance ( result , dict ) and ' error ' in result :
2017-02-08 17:30:11 +00:00
raise ProcessingError ( result [ ' error ' ] )
2016-11-04 18:19:18 +00:00
else :
2017-02-08 17:30:11 +00:00
raise ProcessingError ( " Unknown result from task info: {} " . format ( result ) )
2016-10-25 14:47:49 +00:00
2016-11-09 23:08:41 +00:00
@api
2016-11-01 21:12:13 +00:00
def get_task_console_output ( self , uuid , line ) :
"""
Retrieves the console output of the OpenDroneMap ' s process.
Useful for monitoring execution and to provide updates to the user .
"""
api_client = self . api_client ( )
result = api_client . task_output ( uuid , line )
if isinstance ( result , dict ) and ' error ' in result :
2017-02-08 17:30:11 +00:00
raise ProcessingError ( result [ ' error ' ] )
2016-11-01 21:12:13 +00:00
elif isinstance ( result , list ) :
2018-12-07 20:13:49 +00:00
return result
2016-11-01 21:12:13 +00:00
else :
2017-02-08 17:30:11 +00:00
raise ProcessingError ( " Unknown response for console output: {} " . format ( result ) )
2016-11-01 21:12:13 +00:00
2016-11-09 23:08:41 +00:00
@api
2016-11-02 22:32:24 +00:00
def cancel_task ( self , uuid ) :
"""
Cancels a task ( stops its execution , or prevents it from being executed )
"""
api_client = self . api_client ( )
return self . handle_generic_post_response ( api_client . task_cancel ( uuid ) )
2016-11-04 18:19:18 +00:00
2016-11-09 23:08:41 +00:00
@api
2016-11-04 18:19:18 +00:00
def remove_task ( self , uuid ) :
"""
Removes a task and deletes all of its assets
"""
api_client = self . api_client ( )
return self . handle_generic_post_response ( api_client . task_remove ( uuid ) )
2016-11-09 23:08:41 +00:00
@api
2016-11-05 20:23:54 +00:00
def download_task_asset ( self , uuid , asset ) :
"""
Downloads a task asset
"""
api_client = self . api_client ( )
res = api_client . task_download ( uuid , asset )
if isinstance ( res , dict ) and ' error ' in res :
2017-02-08 17:30:11 +00:00
raise ProcessingError ( res [ ' error ' ] )
2016-11-05 20:23:54 +00:00
else :
return res
2016-11-09 23:08:41 +00:00
@api
2018-01-24 14:41:57 +00:00
def restart_task ( self , uuid , options = None ) :
2016-11-04 18:19:18 +00:00
"""
Restarts a task that was previously canceled or that had failed to process
"""
api_client = self . api_client ( )
2018-01-24 14:41:57 +00:00
return self . handle_generic_post_response ( api_client . task_restart ( uuid , options ) )
2016-11-04 18:19:18 +00:00
2016-11-02 22:32:24 +00:00
@staticmethod
def handle_generic_post_response ( result ) :
"""
Handles a POST response that has either a " success " flag , or an error message .
2018-12-04 15:02:13 +00:00
This is a common response in NodeODM POST calls .
2016-11-02 22:32:24 +00:00
: param result : result of API call
: return : True on success , raises ProcessingException otherwise
"""
if isinstance ( result , dict ) and ' error ' in result :
2017-02-08 17:30:11 +00:00
raise ProcessingError ( result [ ' error ' ] )
2016-11-02 22:32:24 +00:00
elif isinstance ( result , dict ) and ' success ' in result :
return True
else :
2017-02-08 17:30:11 +00:00
raise ProcessingError ( " Unknown response: {} " . format ( result ) )
2016-11-02 22:32:24 +00:00
2018-12-31 19:35:55 +00:00
def delete ( self , using = None , keep_parents = False ) :
pnode_id = self . id
super ( ProcessingNode , self ) . delete ( using , keep_parents )
from app . plugins import signals as plugin_signals
plugin_signals . processing_node_removed . send_robust ( sender = self . __class__ , processing_node_id = pnode_id )
2016-10-24 18:34:37 +00:00
# First time a processing node is created, automatically try to update
@receiver ( signals . post_save , sender = ProcessingNode , dispatch_uid = " update_processing_node_info " )
def auto_update_node_info ( sender , instance , created , * * kwargs ) :
if created :
2017-01-21 17:50:53 +00:00
try :
instance . update_node_info ( )
2017-02-08 17:30:11 +00:00
except ProcessingError :
2017-01-21 17:50:53 +00:00
pass
2016-11-17 23:51:07 +00:00
class ProcessingNodeUserObjectPermission ( UserObjectPermissionBase ) :
2018-03-11 14:06:09 +00:00
content_object = models . ForeignKey ( ProcessingNode , on_delete = models . CASCADE )
2016-11-17 23:51:07 +00:00
2017-01-29 20:29:25 +00:00
2016-11-17 23:51:07 +00:00
class ProcessingNodeGroupObjectPermission ( GroupObjectPermissionBase ) :
2018-03-11 14:06:09 +00:00
content_object = models . ForeignKey ( ProcessingNode , on_delete = models . CASCADE )