2016-11-15 16:51:19 +00:00
import logging
import os
import shutil
import zipfile
2017-02-08 16:27:23 +00:00
import requests
2016-11-07 17:10:01 +00:00
2016-11-15 16:51:19 +00:00
from django . contrib . auth . models import User
2016-11-07 22:25:33 +00:00
from django . contrib . gis . gdal import GDALRaster
2016-11-15 16:51:19 +00:00
from django . contrib . postgres import fields
from django . core . exceptions import ValidationError
2016-08-10 20:23:17 +00:00
from django . db import models
2016-11-15 16:51:19 +00:00
from django . db import transaction
2016-10-04 20:36:08 +00:00
from django . db . models import signals
2016-11-15 16:51:19 +00:00
from django . dispatch import receiver
2016-09-09 21:37:04 +00:00
from django . utils import timezone
2016-10-04 20:36:08 +00:00
from guardian . models import GroupObjectPermissionBase
2016-11-15 16:51:19 +00:00
from guardian . models import UserObjectPermissionBase
from guardian . shortcuts import get_perms_for_model , assign_perm
from app import pending_actions
2016-12-12 15:32:17 +00:00
from app . postgis import OffDbRasterField
2016-11-02 22:32:24 +00:00
from nodeodm import status_codes
2016-11-15 16:51:19 +00:00
from nodeodm . exceptions import ProcessingException
from nodeodm . models import ProcessingNode
2016-11-07 17:10:01 +00:00
from webodm import settings
2016-11-02 22:32:24 +00:00
logger = logging . getLogger ( ' app.logger ' )
2016-09-10 15:24:16 +00:00
2016-11-09 21:13:43 +00:00
def task_directory_path ( taskId , projectId ) :
return ' project/ {0} /task/ {1} / ' . format ( projectId , taskId )
2016-09-10 15:24:16 +00:00
def assets_directory_path ( taskId , projectId , filename ) :
2016-10-17 17:19:32 +00:00
# files will be uploaded to MEDIA_ROOT/project/<id>/task/<id>/<filename>
2016-11-09 21:13:43 +00:00
return ' {0} {1} ' . format ( task_directory_path ( taskId , projectId ) , filename )
2016-09-10 15:24:16 +00:00
2016-09-09 21:37:04 +00:00
class Project ( models . Model ) :
owner = models . ForeignKey ( User , on_delete = models . PROTECT , help_text = " The person who created the project " )
name = models . CharField ( max_length = 255 , help_text = " A label used to describe the project " )
2016-10-04 20:36:08 +00:00
description = models . TextField ( null = True , blank = True , help_text = " More in-depth description of the project " )
2016-09-09 21:37:04 +00:00
created_at = models . DateTimeField ( default = timezone . now , help_text = " Creation date " )
2016-11-15 16:51:19 +00:00
deleting = models . BooleanField ( db_index = True , default = False ,
help_text = " Whether this project has been marked for deletion. Projects that have running tasks need to wait for tasks to be properly cleaned up before they can be deleted. " )
def delete ( self , * args ) :
# No tasks?
if self . task_set . count ( ) == 0 :
# Just delete normally
logger . info ( " Deleted project {} " . format ( self . id ) )
super ( ) . delete ( * args )
else :
# Need to remove all tasks before we can remove this project
# which will be deleted on the scheduler after pending actions
# have been completed
self . task_set . update ( pending_action = pending_actions . REMOVE )
self . deleting = True
self . save ( )
logger . info ( " Tasks pending, set project {} deleting flag " . format ( self . id ) )
2016-09-09 21:37:04 +00:00
def __str__ ( self ) :
return self . name
2016-11-15 20:55:48 +00:00
def tasks ( self ) :
2016-11-15 16:51:19 +00:00
return self . task_set . only ( ' id ' )
2016-10-12 22:18:37 +00:00
2016-11-21 21:32:37 +00:00
def get_tile_json_data ( self ) :
return [ task . get_tile_json_data ( ) for task in self . task_set . filter (
2016-11-16 18:02:43 +00:00
status = status_codes . COMPLETED
) . only ( ' id ' , ' project_id ' ) ]
2016-10-04 20:36:08 +00:00
class Meta :
permissions = (
( ' view_project ' , ' Can view project ' ) ,
)
@receiver ( signals . post_save , sender = Project , dispatch_uid = " project_post_save " )
def project_post_save ( sender , instance , created , * * kwargs ) :
"""
Automatically assigns all permissions to the owner . If the owner changes
it ' s up to the user/developer to remove the previous owner ' s permissions .
"""
for perm in get_perms_for_model ( sender ) . all ( ) :
assign_perm ( perm . codename , instance . owner , instance )
class ProjectUserObjectPermission ( UserObjectPermissionBase ) :
content_object = models . ForeignKey ( Project )
2016-11-07 17:10:01 +00:00
2016-10-04 20:36:08 +00:00
class ProjectGroupObjectPermission ( GroupObjectPermissionBase ) :
content_object = models . ForeignKey ( Project )
2016-09-10 15:24:16 +00:00
def gcp_directory_path ( task , filename ) :
return assets_directory_path ( task . id , task . project . id , filename )
2016-11-07 17:10:01 +00:00
2016-10-26 16:54:46 +00:00
def validate_task_options ( value ) :
"""
Make sure that the format of this options field is valid
"""
if len ( value ) == 0 : return
try :
for option in value :
if not option [ ' name ' ] : raise ValidationError ( " Name key not found in option " )
if not option [ ' value ' ] : raise ValidationError ( " Value key not found in option " )
except :
raise ValidationError ( " Invalid options " )
2016-11-02 22:32:24 +00:00
2016-09-09 21:37:04 +00:00
class Task ( models . Model ) :
2016-09-10 15:24:16 +00:00
STATUS_CODES = (
2016-11-02 22:32:24 +00:00
( status_codes . QUEUED , ' QUEUED ' ) ,
( status_codes . RUNNING , ' RUNNING ' ) ,
( status_codes . FAILED , ' FAILED ' ) ,
( status_codes . COMPLETED , ' COMPLETED ' ) ,
( status_codes . CANCELED , ' CANCELED ' )
)
PENDING_ACTIONS = (
2016-11-15 16:51:19 +00:00
( pending_actions . CANCEL , ' CANCEL ' ) ,
( pending_actions . REMOVE , ' REMOVE ' ) ,
( pending_actions . RESTART , ' RESTART ' ) ,
2016-09-10 15:24:16 +00:00
)
2016-10-29 16:09:35 +00:00
uuid = models . CharField ( max_length = 255 , db_index = True , default = ' ' , blank = True , help_text = " Identifier of the task (as returned by OpenDroneMap ' s REST API) " )
2016-09-10 15:24:16 +00:00
project = models . ForeignKey ( Project , on_delete = models . CASCADE , help_text = " Project that this task belongs to " )
2016-10-12 22:18:37 +00:00
name = models . CharField ( max_length = 255 , null = True , blank = True , help_text = " A label for the task " )
2016-10-25 20:04:24 +00:00
processing_lock = models . BooleanField ( default = False , help_text = " A flag indicating whether this task is currently locked for processing. When this flag is turned on, the task is in the middle of a processing step. " )
2016-09-09 21:37:04 +00:00
processing_time = models . IntegerField ( default = - 1 , help_text = " Number of milliseconds that elapsed since the beginning of this task (-1 indicates that no information is available) " )
2016-10-12 22:18:37 +00:00
processing_node = models . ForeignKey ( ProcessingNode , null = True , blank = True , help_text = " Processing node assigned to this task (or null if this task has not been associated yet) " )
2016-10-26 20:56:32 +00:00
status = models . IntegerField ( choices = STATUS_CODES , db_index = True , null = True , blank = True , help_text = " Current status of the task " )
last_error = models . TextField ( null = True , blank = True , help_text = " The last processing error received " )
2016-10-26 16:54:46 +00:00
options = fields . JSONField ( default = dict ( ) , blank = True , help_text = " Options that are being used to process this task " , validators = [ validate_task_options ] )
2016-11-01 21:12:13 +00:00
console_output = models . TextField ( null = False , default = " " , blank = True , help_text = " Console output of the OpenDroneMap ' s process " )
2016-10-12 22:18:37 +00:00
ground_control_points = models . FileField ( null = True , blank = True , upload_to = gcp_directory_path , help_text = " Optional Ground Control Points file to use for processing " )
2016-10-25 20:04:24 +00:00
2016-09-09 21:37:04 +00:00
# georeferenced_model
2016-12-12 15:32:17 +00:00
orthophoto = OffDbRasterField ( null = True , blank = True , srid = 4326 , help_text = " Orthophoto created by OpenDroneMap " )
2016-09-09 21:37:04 +00:00
# textured_model
# mission
created_at = models . DateTimeField ( default = timezone . now , help_text = " Creation date " )
2016-11-15 16:51:19 +00:00
pending_action = models . IntegerField ( choices = PENDING_ACTIONS , db_index = True , null = True , blank = True , help_text = " A requested action to be performed on the task. The selected action will be performed by the scheduler at the next iteration. " )
2016-08-10 20:23:17 +00:00
2016-09-09 21:37:04 +00:00
def __str__ ( self ) :
2017-02-08 16:27:23 +00:00
name = self . name if self . name is not None else " unnamed "
return ' Task {} ( {} ) ' . format ( name , self . id )
2016-10-13 20:28:32 +00:00
2016-10-26 16:54:46 +00:00
def save ( self , * args , * * kwargs ) :
# Autovalidate on save
self . full_clean ( )
super ( Task , self ) . save ( * args , * * kwargs )
2016-11-11 17:55:56 +00:00
def assets_path ( self , * args ) :
2016-11-09 21:13:43 +00:00
"""
2016-11-11 17:55:56 +00:00
Get a path relative to the place where assets are stored
2016-11-09 21:13:43 +00:00
"""
return os . path . join ( settings . MEDIA_ROOT ,
2016-11-11 17:55:56 +00:00
assets_directory_path ( self . id , self . project . id , " " ) ,
" assets " ,
* args )
2016-11-09 21:13:43 +00:00
2016-10-13 20:28:32 +00:00
@staticmethod
def create_from_images ( images , project ) :
'''
Create a new task from a set of input images ( such as the ones coming from request . FILES ) .
This will happen inside a transaction so if one of the images
fails to load , the task will not be created .
'''
with transaction . atomic ( ) :
task = Task . objects . create ( project = project )
for image in images :
ImageUpload . objects . create ( task = task , image = image )
return task
2016-09-10 15:24:16 +00:00
2016-10-25 20:04:24 +00:00
def process ( self ) :
2016-11-02 22:32:24 +00:00
"""
This method contains the logic for processing tasks asynchronously
from a background thread or from the scheduler . Here tasks that are
ready to be processed execute some logic . This could be communication
with a processing node or executing a pending action .
"""
2016-12-10 18:28:34 +00:00
try :
if self . processing_node :
2017-02-07 22:09:30 +00:00
# Need to process some images (UUID not yet set and task doesn't have pending actions)?
if not self . uuid and self . pending_action is None :
2016-12-10 18:28:34 +00:00
logger . info ( " Processing... {} " . format ( self ) )
2016-11-02 22:32:24 +00:00
2016-12-10 18:28:34 +00:00
images = [ image . path ( ) for image in self . imageupload_set . all ( ) ]
2016-11-02 22:32:24 +00:00
2016-12-10 18:28:34 +00:00
try :
# This takes a while
uuid = self . processing_node . process_new_task ( images , self . name , self . options )
# Refresh task object before committing change
self . refresh_from_db ( )
self . uuid = uuid
self . save ( )
# TODO: log process has started processing
except ProcessingException as e :
self . set_failure ( str ( e ) )
if self . pending_action is not None :
2016-11-07 22:25:33 +00:00
try :
2016-12-10 18:28:34 +00:00
if self . pending_action == pending_actions . CANCEL :
# Do we need to cancel the task on the processing node?
logger . info ( " Canceling task {} " . format ( self ) )
if self . processing_node and self . uuid :
self . processing_node . cancel_task ( self . uuid )
self . pending_action = None
2017-02-07 19:28:50 +00:00
self . status = None
2016-12-10 18:28:34 +00:00
self . save ( )
else :
raise ProcessingException ( " Cannot cancel a task that has no processing node or UUID " )
2016-10-26 16:54:46 +00:00
2016-12-10 18:28:34 +00:00
elif self . pending_action == pending_actions . RESTART :
2017-02-07 22:09:30 +00:00
logger . info ( " Restarting {} " . format ( self ) )
if self . processing_node :
2016-10-26 16:54:46 +00:00
2016-12-10 18:28:34 +00:00
# Check if the UUID is still valid, as processing nodes purge
# results after a set amount of time, the UUID might have eliminated.
2017-02-07 22:09:30 +00:00
uuid_still_exists = False
if self . uuid :
try :
info = self . processing_node . get_task_info ( self . uuid )
uuid_still_exists = info [ ' uuid ' ] == self . uuid
except ProcessingException :
pass
2016-12-10 18:28:34 +00:00
if uuid_still_exists :
# Good to go
self . processing_node . restart_task ( self . uuid )
else :
# Task has been purged (or processing node is offline)
# Process this as a new task
# Removing its UUID will cause the scheduler
# to process this the next tick
2016-12-12 17:25:54 +00:00
self . uuid = ' '
2016-12-10 18:28:34 +00:00
self . console_output = " "
self . processing_time = - 1
self . status = None
self . last_error = None
self . pending_action = None
self . save ( )
else :
raise ProcessingException ( " Cannot restart a task that has no processing node or UUID " )
elif self . pending_action == pending_actions . REMOVE :
logger . info ( " Removing task {} " . format ( self ) )
if self . processing_node and self . uuid :
# Attempt to delete the resources on the processing node
# We don't care if this fails, as resources on processing nodes
# Are expected to be purged on their own after a set amount of time anyway
try :
self . processing_node . remove_task ( self . uuid )
except ProcessingException :
pass
# What's more important is that we delete our task properly here
self . delete ( )
# Stop right here!
return
2016-10-26 16:54:46 +00:00
2016-11-07 22:25:33 +00:00
except ProcessingException as e :
2016-12-10 18:28:34 +00:00
self . last_error = str ( e )
self . save ( )
2016-11-07 17:10:01 +00:00
2016-12-10 18:28:34 +00:00
if self . processing_node :
# Need to update status (first time, queued or running?)
if self . uuid and self . status in [ None , status_codes . QUEUED , status_codes . RUNNING ] :
# Update task info from processing node
try :
info = self . processing_node . get_task_info ( self . uuid )
2016-11-07 17:10:01 +00:00
2016-12-10 18:28:34 +00:00
self . processing_time = info [ " processingTime " ]
self . status = info [ " status " ] [ " code " ]
2016-11-01 21:12:13 +00:00
2016-12-10 18:28:34 +00:00
current_lines_count = len ( self . console_output . split ( " \n " ) ) - 1
self . console_output + = self . processing_node . get_task_console_output ( self . uuid , current_lines_count )
2016-10-26 20:56:32 +00:00
2016-12-10 18:28:34 +00:00
if " errorMessage " in info [ " status " ] :
self . last_error = info [ " status " ] [ " errorMessage " ]
2016-11-01 21:12:13 +00:00
2016-12-10 18:28:34 +00:00
# Has the task just been canceled, failed, or completed?
if self . status in [ status_codes . FAILED , status_codes . COMPLETED , status_codes . CANCELED ] :
logger . info ( " Processing status: {} for {} " . format ( self . status , self ) )
if self . status == status_codes . COMPLETED :
try :
assets_dir = self . assets_path ( " " )
if not os . path . exists ( assets_dir ) :
os . makedirs ( assets_dir )
logger . info ( " Downloading all.zip for {} " . format ( self ) )
# Download all assets
2016-12-12 17:25:54 +00:00
zip_stream = self . processing_node . download_task_asset ( self . uuid , " all.zip " )
2016-12-10 18:28:34 +00:00
zip_path = os . path . join ( assets_dir , " all.zip " )
2016-12-12 17:25:54 +00:00
with open ( zip_path , ' wb ' ) as fd :
for chunk in zip_stream . iter_content ( 4096 ) :
fd . write ( chunk )
2016-12-10 18:28:34 +00:00
logger . info ( " Done downloading all.zip for {} " . format ( self ) )
2016-10-26 20:56:32 +00:00
2016-12-10 18:28:34 +00:00
# Extract from zip
with zipfile . ZipFile ( zip_path , " r " ) as zip_h :
zip_h . extractall ( assets_dir )
logger . info ( " Extracted all.zip for {} " . format ( self ) )
# Add to database orthophoto
2016-12-12 17:25:54 +00:00
orthophoto_path = os . path . realpath ( self . assets_path ( " odm_orthophoto " , " odm_orthophoto.tif " ) )
2016-12-10 18:28:34 +00:00
if os . path . exists ( orthophoto_path ) :
2016-12-12 17:25:54 +00:00
orthophoto = GDALRaster ( orthophoto_path , write = True )
# We need to transform to 4326 before we can store it
# as an offdb raster field
orthophoto_4326_path = os . path . realpath ( self . assets_path ( " odm_orthophoto " , " odm_orthophoto_4326.tif " ) )
self . orthophoto = orthophoto . transform ( 4326 , ' GTiff ' , orthophoto_4326_path )
logger . info ( " Imported orthophoto {} for {} " . format ( orthophoto_4326_path , self ) )
2016-12-10 18:28:34 +00:00
self . save ( )
except ProcessingException as e :
self . set_failure ( str ( e ) )
else :
# FAILED, CANCELED
2016-11-07 17:10:01 +00:00
self . save ( )
2016-11-02 22:32:24 +00:00
else :
2016-12-10 18:28:34 +00:00
# Still waiting...
2016-11-02 22:32:24 +00:00
self . save ( )
2016-12-10 18:28:34 +00:00
except ProcessingException as e :
self . set_failure ( str ( e ) )
2017-02-08 16:27:23 +00:00
except ( ConnectionRefusedError , ConnectionError ) as e :
logger . warning ( " {} cannot communicate with processing node: {} " . format ( self , str ( e ) ) )
except requests . exceptions . ConnectTimeout as e :
logger . warning ( " {} timed out with error: {} . We ' ll try reprocessing at the next tick. " . format ( self , str ( e ) ) )
2016-12-10 18:28:34 +00:00
2016-11-07 22:25:33 +00:00
2016-11-09 21:13:43 +00:00
def get_tile_path ( self , z , x , y ) :
2016-11-11 17:55:56 +00:00
return self . assets_path ( " orthophoto_tiles " , z , x , " {} .png " . format ( y ) )
2016-11-09 21:13:43 +00:00
2016-11-21 21:32:37 +00:00
def get_tile_json_url ( self ) :
2016-11-16 18:02:43 +00:00
return " /api/projects/ {} /tasks/ {} /tiles.json " . format ( self . project . id , self . id )
2016-11-21 21:32:37 +00:00
def get_tile_json_data ( self ) :
2016-11-16 18:02:43 +00:00
return {
2016-11-21 21:32:37 +00:00
' url ' : self . get_tile_json_url ( ) ,
2016-11-16 18:02:43 +00:00
' meta ' : {
' task ' : self . id ,
' project ' : self . project . id
}
}
2016-11-09 21:13:43 +00:00
def delete ( self , using = None , keep_parents = False ) :
directory_to_delete = os . path . join ( settings . MEDIA_ROOT ,
task_directory_path ( self . id , self . project . id ) )
super ( Task , self ) . delete ( using , keep_parents )
# Remove files related to this task
try :
shutil . rmtree ( directory_to_delete )
except FileNotFoundError as e :
2017-02-07 19:28:50 +00:00
logger . warning ( e )
2016-11-09 21:13:43 +00:00
2016-10-26 16:54:46 +00:00
2016-11-01 21:12:13 +00:00
def set_failure ( self , error_message ) :
2017-02-08 16:27:23 +00:00
logger . error ( " FAILURE FOR {} : {} " . format ( self , error_message ) )
2016-11-01 21:12:13 +00:00
self . last_error = error_message
2016-11-02 22:32:24 +00:00
self . status = status_codes . FAILED
2016-11-01 21:12:13 +00:00
self . save ( )
2016-10-25 20:04:24 +00:00
2016-10-13 16:21:12 +00:00
class Meta :
permissions = (
( ' view_task ' , ' Can view task ' ) ,
)
2016-09-10 15:24:16 +00:00
2016-11-11 17:55:56 +00:00
def image_directory_path ( image_upload , filename ) :
return assets_directory_path ( image_upload . task . id , image_upload . task . project . id , filename )
2016-09-10 15:24:16 +00:00
class ImageUpload ( models . Model ) :
task = models . ForeignKey ( Task , on_delete = models . CASCADE , help_text = " Task this image belongs to " )
image = models . ImageField ( upload_to = image_directory_path , help_text = " File uploaded by a user " )
def __str__ ( self ) :
return self . image . name
2016-10-26 16:54:46 +00:00
def path ( self ) :
return self . image . path