diff --git a/.gitignore b/.gitignore index 733dcac1..e18a5757 100644 --- a/.gitignore +++ b/.gitignore @@ -89,4 +89,5 @@ ENV/ .ropeproject node_modules/ -webpack-stats.json \ No newline at end of file +webpack-stats.json +pip-selfcheck.json diff --git a/app/api/projects.py b/app/api/projects.py index 16ac2f69..8bae133a 100644 --- a/app/api/projects.py +++ b/app/api/projects.py @@ -1,5 +1,5 @@ from django.contrib.auth.models import User -from rest_framework import serializers, viewsets, filters +from rest_framework import serializers, viewsets from app import models from .tasks import TaskIDsSerializer diff --git a/app/models.py b/app/models.py index 9062c93a..6d50037a 100644 --- a/app/models.py +++ b/app/models.py @@ -1,7 +1,11 @@ from __future__ import unicode_literals +import time, os +import traceback + from django.db import models from django.db.models import signals +from django.contrib.gis.db import models as gismodels from django.utils import timezone from django.contrib.auth.models import User from django.contrib.postgres import fields @@ -14,6 +18,7 @@ from django.dispatch import receiver from nodeodm.exceptions import ProcessingException from django.db import transaction from nodeodm import status_codes +from webodm import settings import logging logger = logging.getLogger('app.logger') @@ -54,6 +59,7 @@ def project_post_save(sender, instance, created, **kwargs): class ProjectUserObjectPermission(UserObjectPermissionBase): content_object = models.ForeignKey(Project) + class ProjectGroupObjectPermission(GroupObjectPermissionBase): content_object = models.ForeignKey(Project) @@ -61,6 +67,7 @@ class ProjectGroupObjectPermission(GroupObjectPermissionBase): def gcp_directory_path(task, filename): return assets_directory_path(task.id, task.project.id, filename) + def validate_task_options(value): """ Make sure that the format of this options field is valid @@ -108,7 +115,7 @@ class Task(models.Model): ground_control_points = models.FileField(null=True, blank=True, upload_to=gcp_directory_path, help_text="Optional Ground Control Points file to use for processing") # georeferenced_model - # orthophoto + orthophoto = gismodels.RasterField(null=True, blank=True, srid=4326, help_text="Orthophoto created by OpenDroneMap") # textured_model # mission created_at = models.DateTimeField(default=timezone.now, help_text="Creation date") @@ -147,129 +154,143 @@ class Task(models.Model): ready to be processed execute some logic. This could be communication with a processing node or executing a pending action. """ + try: + if self.processing_node: + # Need to process some images (UUID not yet set)? + if not self.uuid: + logger.info("Processing... {}".format(self)) - if self.processing_node: - # Need to process some images (UUID not yet set)? - if not self.uuid: - logger.info("Processing... {}".format(self)) + images = [image.path() for image in self.imageupload_set.all()] - images = [image.path() for image in self.imageupload_set.all()] + try: + # This takes a while + uuid = self.processing_node.process_new_task(images, self.name, self.options) + # Refresh task object before committing change + self.refresh_from_db() + self.uuid = uuid + self.save() + + # TODO: log process has started processing + + except ProcessingException as e: + self.set_failure(e.message) + + + if self.pending_action is not None: try: - # This takes a while - uuid = self.processing_node.process_new_task(images, self.name, self.options) + if self.pending_action == self.PendingActions.CANCEL: + # Do we need to cancel the task on the processing node? + logger.info("Canceling task {}".format(self)) + if self.processing_node and self.uuid: + self.processing_node.cancel_task(self.uuid) + self.pending_action = None + self.save() + else: + raise ProcessingException("Cannot cancel a task that has no processing node or UUID") - # Refresh task object before committing change - self.refresh_from_db() - self.uuid = uuid + elif self.pending_action == self.PendingActions.RESTART: + logger.info("Restarting task {}".format(self)) + if self.processing_node and self.uuid: + + # Check if the UUID is still valid, as processing nodes purge + # results after a set amount of time, the UUID might have eliminated. + try: + info = self.processing_node.get_task_info(self.uuid) + uuid_still_exists = info['uuid'] == self.uuid + except ProcessingException: + uuid_still_exists = False + + if uuid_still_exists: + # Good to go + self.processing_node.restart_task(self.uuid) + else: + # Task has been purged (or processing node is offline) + # TODO: what if processing node went offline? + + # Process this as a new task + # Removing its UUID will cause the scheduler + # to process this the next tick + self.uuid = None + + self.console_output = "" + self.processing_time = -1 + self.status = None + self.last_error = None + self.pending_action = None + self.save() + else: + raise ProcessingException("Cannot restart a task that has no processing node or UUID") + + elif self.pending_action == self.PendingActions.REMOVE: + logger.info("Removing task {}".format(self)) + if self.processing_node and self.uuid: + # Attempt to delete the resources on the processing node + # We don't care if this fails, as resources on processing nodes + # Are expected to be purged on their own after a set amount of time anyway + try: + self.processing_node.remove_task(self.uuid) + except ProcessingException: + pass + + # What's more important is that we delete our task properly here + self.delete() + + # Stop right here! + return + + except ProcessingException as e: + self.last_error = e.message self.save() - # TODO: log process has started processing - except ProcessingException as e: - self.set_failure(e.message) + if self.processing_node: + # Need to update status (first time, queued or running?) + if self.uuid and self.status in [None, status_codes.QUEUED, status_codes.RUNNING]: + # Update task info from processing node + try: + info = self.processing_node.get_task_info(self.uuid) + self.processing_time = info["processingTime"] + self.status = info["status"]["code"] - if self.pending_action is not None: - try: - if self.pending_action == self.PendingActions.CANCEL: - # Do we need to cancel the task on the processing node? - logger.info("Canceling task {}".format(self)) - if self.processing_node and self.uuid: - self.processing_node.cancel_task(self.uuid) - self.pending_action = None - self.save() - else: - raise ProcessingException("Cannot cancel a task that has no processing node or UUID") + current_lines_count = len(self.console_output.split("\n")) - 1 + self.console_output += self.processing_node.get_task_console_output(self.uuid, current_lines_count) - elif self.pending_action == self.PendingActions.RESTART: - logger.info("Restarting task {}".format(self)) - if self.processing_node and self.uuid: + if "errorMessage" in info["status"]: + self.last_error = info["status"]["errorMessage"] - # Check if the UUID is still valid, as processing nodes purge - # results after a set amount of time, the UUID might have eliminated. - try: - info = self.processing_node.get_task_info(self.uuid) - uuid_still_exists = info['uuid'] == self.uuid - except ProcessingException: - uuid_still_exists = False + # Has the task just been canceled, failed, or completed? + if self.status in [status_codes.FAILED, status_codes.COMPLETED, status_codes.CANCELED]: + logger.info("Processing status: {} for {}".format(self.status, self)) - if uuid_still_exists: - # Good to go - self.processing_node.restart_task(self.uuid) - else: - # Task has been purged (or processing node is offline) - # TODO: what if processing node went offline? - - # Process this as a new task - # Removing its UUID will cause the scheduler - # to process this the next tick - self.uuid = None - - self.console_output = "" - self.processing_time = -1 - self.status = None - self.last_error = None - self.pending_action = None - self.save() - else: - raise ProcessingException("Cannot restart a task that has no processing node or UUID") - - elif self.pending_action == self.PendingActions.REMOVE: - logger.info("Removing task {}".format(self)) - if self.processing_node and self.uuid: - # Attempt to delete the resources on the processing node - # We don't care if this fails, as resources on processing nodes - # Are expected to be purged on their own after a set amount of time anyway - try: - self.processing_node.remove_task(self.uuid) - except ProcessingException: - pass - - # What's more important is that we delete our task properly here - self.delete() - - # Stop right here! - return - - except ProcessingException as e: - self.last_error = e.message - self.save() - - - if self.processing_node: - # Need to update status (first time, queued or running?) - if self.uuid and self.status in [None, status_codes.QUEUED, status_codes.RUNNING]: - # Update task info from processing node - try: - info = self.processing_node.get_task_info(self.uuid) - - self.processing_time = info["processingTime"] - self.status = info["status"]["code"] - - current_lines_count = len(self.console_output.split("\n")) - 1 - self.console_output += self.processing_node.get_task_console_output(self.uuid, current_lines_count) - - if "errorMessage" in info["status"]: - self.last_error = info["status"]["errorMessage"] - - # Has the task just been canceled, failed, or completed? - # Note that we don't save the status code right away, - # if the assets retrieval fails we want to retry again. - if self.status in [status_codes.FAILED, status_codes.COMPLETED, status_codes.CANCELED]: - logger.info("Processing status: {} for {}".format(self.status, self)) - - if self.status == status_codes.COMPLETED: - # TODO: retrieve assets - pass + if self.status == status_codes.COMPLETED: + try: + orthophoto_stream = self.processing_node.download_task_asset(self.uuid, "orthophoto.tif") + orthophoto_filename = "orthophoto_{}.tif".format(int(time.time())) + orthophoto_path = os.path.join(settings.MEDIA_ROOT, + assets_directory_path(self.id, self.project.id, orthophoto_filename)) + + # Save to disk + with open(orthophoto_path, 'wb') as fd: + for chunk in orthophoto_stream.iter_content(4096): + fd.write(chunk) + + # Create raster layer + self.orthophoto = raster_models.RasterLayer.objects.create(rasterfile=orthophoto_path) + self.save() + except ProcessingException as e: + self.set_failure(e.message) + else: + # FAILED, CANCELED + self.save() else: + # Still waiting... self.save() - else: - # Still waiting... - self.save() - except ProcessingException as e: - self.set_failure(e.message) - + except ProcessingException as e: + self.set_failure(e.message) + except Exception as e: + logger.error("Uncaught error: {} {}".format(e.message, traceback.format_exc())) def set_failure(self, error_message): logger.error("{} ERROR: {}".format(self, error_message)) diff --git a/app/tests/test_app.py b/app/tests/test_app.py index ef26a820..dcf723a2 100644 --- a/app/tests/test_app.py +++ b/app/tests/test_app.py @@ -1,5 +1,4 @@ from django.contrib.auth.models import User, Group -from django.contrib import messages from django.test import Client from app.models import Project, Task diff --git a/nodeodm/api_client.py b/nodeodm/api_client.py index fbb08be4..4382300e 100644 --- a/nodeodm/api_client.py +++ b/nodeodm/api_client.py @@ -6,7 +6,7 @@ import requests import mimetypes import json import os -from urlparse import urlunparse +from urllib.parse import urlunparse class ApiClient: def __init__(self, host, port): @@ -41,11 +41,11 @@ class ApiClient: return requests.post(self.url('/task/restart'), data={'uuid': uuid}).json() def task_download(self, uuid, asset): - res = requests.get(self.url('/task/{}/download/{}').format(uuid, asset)) + res = requests.get(self.url('/task/{}/download/{}').format(uuid, asset), stream=True) if "Content-Type" in res.headers and "application/json" in res.headers['Content-Type']: return res.json() else: - return res.content + return res def new_task(self, images, name=None, options=[]): """ diff --git a/webodm/settings.py b/webodm/settings.py index 8a740b79..ba962fa3 100644 --- a/webodm/settings.py +++ b/webodm/settings.py @@ -38,6 +38,7 @@ INSTALLED_APPS = [ 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', + 'django.contrib.gis', 'guardian', 'rest_framework', 'rest_framework_nested', @@ -86,7 +87,7 @@ WSGI_APPLICATION = 'webodm.wsgi.application' DATABASES = { 'default': { - 'ENGINE': 'django.db.backends.postgresql', + 'ENGINE': 'django.contrib.gis.db.backends.postgis', 'NAME': 'webodm_dev', 'USER': 'postgres', 'PASSWORD': 'postgres', @@ -217,6 +218,10 @@ REST_FRAMEWORK = { 'PAGE_SIZE': 10, } +# Raster +RASTER_USE_CELERY = False + + TESTING = sys.argv[1:2] == ['test'] try: