OpenDroneMap-WebODM/app/tests/test_api_task.py

481 wiersze
19 KiB
Python

import os
import subprocess
import time
import shutil
import logging
from datetime import timedelta
import json
import requests
from django.contrib.auth.models import User
from django.contrib.gis.gdal import GDALRaster
from django.contrib.gis.gdal import OGRGeometry
from rest_framework import status
from rest_framework.test import APIClient
from app import pending_actions
from app import scheduler
from django.utils import timezone
from app.models import Project, Task, ImageUpload, task_directory_path, full_task_directory_path
from app.tests.classes import BootTransactionTestCase
from nodeodm import status_codes
from nodeodm.models import ProcessingNode, OFFLINE_MINUTES
from app.testwatch import testWatch
# We need to test the task API in a TransactionTestCase because
# task processing happens on a separate thread, and normal TestCases
# do not commit changes to the DB, so spawning a new thread will show no
# data in it.
from webodm import settings
logger = logging.getLogger('app.logger')
DELAY = 2 # time to sleep for during process launch, background processing, etc.
def start_processing_node(*args):
current_dir = os.path.dirname(os.path.realpath(__file__))
node_odm = subprocess.Popen(['node', 'index.js', '--port', '11223', '--test'] + list(args), shell=False,
cwd=os.path.join(current_dir, "..", "..", "nodeodm", "external", "node-OpenDroneMap"))
time.sleep(DELAY) # Wait for the server to launch
return node_odm
class TestApiTask(BootTransactionTestCase):
def setUp(self):
super().setUp()
# We need to clear previous media_root content
# This points to the test directory, but just in case
# we double check that the directory is indeed a test directory
if "_test" in settings.MEDIA_ROOT:
if os.path.exists(settings.MEDIA_ROOT):
logger.info("Cleaning up {}".format(settings.MEDIA_ROOT))
shutil.rmtree(settings.MEDIA_ROOT)
else:
logger.warning("We did not remove MEDIA_ROOT because we couldn't find a _test suffix in its path.")
def test_task(self):
client = APIClient()
node_odm = start_processing_node()
user = User.objects.get(username="testuser")
self.assertFalse(user.is_superuser)
other_user = User.objects.get(username="testuser2")
project = Project.objects.create(
owner=user,
name="test project"
)
other_project = Project.objects.create(
owner=other_user,
name="another test project"
)
other_task = Task.objects.create(project=other_project)
# Start processing node
# Create processing node
pnode = ProcessingNode.objects.create(hostname="localhost", port=11223)
# Verify that it's working
self.assertTrue(pnode.api_version is not None)
# task creation via file upload
image1 = open("app/fixtures/tiny_drone_image.jpg", 'rb')
image2 = open("app/fixtures/tiny_drone_image_2.jpg", 'rb')
# Not authenticated?
res = client.post("/api/projects/{}/tasks/".format(project.id), {
'images': [image1, image2]
}, format="multipart")
self.assertTrue(res.status_code == status.HTTP_403_FORBIDDEN);
client.login(username="testuser", password="test1234")
# Cannot create a task for a project that does not exist
res = client.post("/api/projects/0/tasks/", {
'images': [image1, image2]
}, format="multipart")
self.assertTrue(res.status_code == status.HTTP_404_NOT_FOUND)
# Cannot create a task for a project for which we have no access to
res = client.post("/api/projects/{}/tasks/".format(other_project.id), {
'images': [image1, image2]
}, format="multipart")
self.assertTrue(res.status_code == status.HTTP_404_NOT_FOUND)
# Cannot create a task without images
res = client.post("/api/projects/{}/tasks/".format(project.id), {
'images': []
}, format="multipart")
self.assertTrue(res.status_code == status.HTTP_400_BAD_REQUEST)
# Cannot create a task with just 1 image
res = client.post("/api/projects/{}/tasks/".format(project.id), {
'images': image1
}, format="multipart")
self.assertTrue(res.status_code == status.HTTP_400_BAD_REQUEST)
# Normal case with images[], name and processing node parameter
res = client.post("/api/projects/{}/tasks/".format(project.id), {
'images': [image1, image2],
'name': 'test_task',
'processing_node': pnode.id
}, format="multipart")
self.assertTrue(res.status_code == status.HTTP_201_CREATED)
multiple_param_task = Task.objects.latest('created_at')
self.assertTrue(multiple_param_task.name == 'test_task')
self.assertTrue(multiple_param_task.processing_node.id == pnode.id)
# Cannot create a task with images[], name, but invalid processing node parameter
res = client.post("/api/projects/{}/tasks/".format(project.id), {
'images': [image1, image2],
'name': 'test_task',
'processing_node': 9999
}, format="multipart")
self.assertTrue(res.status_code == status.HTTP_400_BAD_REQUEST)
# Normal case with images[] parameter
res = client.post("/api/projects/{}/tasks/".format(project.id), {
'images': [image1, image2],
'auto_processing_node': 'false'
}, format="multipart")
self.assertTrue(res.status_code == status.HTTP_201_CREATED)
# Should have returned the id of the newly created task
task = Task.objects.latest('created_at')
self.assertTrue('id' in res.data)
self.assertTrue(task.id == res.data['id'])
# Two images should have been uploaded
self.assertTrue(ImageUpload.objects.filter(task=task).count() == 2)
# No processing node is set
self.assertTrue(task.processing_node is None)
# tiles.json should not be accessible at this point
res = client.get("/api/projects/{}/tasks/{}/tiles.json".format(project.id, task.id))
self.assertTrue(res.status_code == status.HTTP_400_BAD_REQUEST)
# Neither should an individual tile
# Z/X/Y coords are choosen based on node-odm test dataset for orthophoto_tiles/
res = client.get("/api/projects/{}/tasks/{}/tiles/16/16020/42443.png".format(project.id, task.id))
self.assertTrue(res.status_code == status.HTTP_404_NOT_FOUND)
# Cannot access a tiles.json we have no access to
res = client.get("/api/projects/{}/tasks/{}/tiles.json".format(other_project.id, other_task.id))
self.assertTrue(res.status_code == status.HTTP_404_NOT_FOUND)
# Cannot access an individual tile we have no access to
res = client.get("/api/projects/{}/tasks/{}/tiles/16/16020/42443.png".format(other_project.id, other_task.id))
self.assertTrue(res.status_code == status.HTTP_404_NOT_FOUND)
# Cannot download assets (they don't exist yet)
for asset in list(task.ASSETS_MAP.keys()):
res = client.get("/api/projects/{}/tasks/{}/download/{}".format(project.id, task.id, asset))
self.assertTrue(res.status_code == status.HTTP_404_NOT_FOUND)
# Cannot access raw assets (they don't exist yet)
res = client.get("/api/projects/{}/tasks/{}/assets/odm_orthophoto/odm_orthophoto.tif".format(project.id, task.id))
self.assertTrue(res.status_code == status.HTTP_404_NOT_FOUND)
# Cannot assign processing node to a task we have no access to
res = client.patch("/api/projects/{}/tasks/{}/".format(other_project.id, other_task.id), {
'processing_node': pnode.id
})
self.assertTrue(res.status_code == status.HTTP_404_NOT_FOUND)
testWatch.clear()
# No UUID at this point
self.assertTrue(len(task.uuid) == 0)
# Assign processing node to task via API
res = client.patch("/api/projects/{}/tasks/{}/".format(project.id, task.id), {
'processing_node': pnode.id
})
self.assertTrue(res.status_code == status.HTTP_200_OK)
# On update scheduler.processing_pending_tasks should have been called in the background
testWatch.wait_until_call("app.scheduler.process_pending_tasks", timeout=5)
# Processing should have started and a UUID is assigned
task.refresh_from_db()
self.assertTrue(task.status in [status_codes.RUNNING, status_codes.COMPLETED]) # Sometimes the task finishes and we can't test for RUNNING state
self.assertTrue(len(task.uuid) > 0)
time.sleep(DELAY)
# Calling process pending tasks should finish the process
scheduler.process_pending_tasks()
task.refresh_from_db()
self.assertTrue(task.status == status_codes.COMPLETED)
# Can download assets
for asset in list(task.ASSETS_MAP.keys()):
res = client.get("/api/projects/{}/tasks/{}/download/{}".format(project.id, task.id, asset))
self.assertTrue(res.status_code == status.HTTP_200_OK)
# A textured mesh archive file should exist
self.assertTrue(os.path.exists(task.assets_path(task.ASSETS_MAP["textured_model.zip"]["deferred_path"])))
# Can download raw assets
res = client.get("/api/projects/{}/tasks/{}/assets/odm_orthophoto/odm_orthophoto.tif".format(project.id, task.id))
self.assertTrue(res.status_code == status.HTTP_200_OK)
# Can access tiles.json
res = client.get("/api/projects/{}/tasks/{}/tiles.json".format(project.id, task.id))
self.assertTrue(res.status_code == status.HTTP_200_OK)
# Bounds are what we expect them to be
# (4 coords in lat/lon)
tiles = json.loads(res.content)
self.assertTrue(len(tiles['bounds']) == 4)
self.assertTrue(tiles['bounds'][0] == -91.99451323800884)
# Can access individual tiles
res = client.get("/api/projects/{}/tasks/{}/tiles/16/16020/42443.png".format(project.id, task.id))
self.assertTrue(res.status_code == status.HTTP_200_OK)
# Restart a task
testWatch.clear()
res = client.post("/api/projects/{}/tasks/{}/restart/".format(project.id, task.id))
self.assertTrue(res.status_code == status.HTTP_200_OK)
testWatch.wait_until_call("app.scheduler.process_pending_tasks", timeout=5)
task.refresh_from_db()
self.assertTrue(task.status in [status_codes.RUNNING, status_codes.COMPLETED])
# Cancel a task
testWatch.clear()
res = client.post("/api/projects/{}/tasks/{}/cancel/".format(project.id, task.id))
self.assertTrue(res.status_code == status.HTTP_200_OK)
testWatch.wait_until_call("app.scheduler.process_pending_tasks", timeout=5)
# Should have been canceled
task.refresh_from_db()
self.assertTrue(task.status == status_codes.CANCELED)
# Remove a task
res = client.post("/api/projects/{}/tasks/{}/remove/".format(project.id, task.id))
self.assertTrue(res.status_code == status.HTTP_200_OK)
testWatch.wait_until_call("app.scheduler.process_pending_tasks", 2, timeout=5)
# Has been removed along with assets
self.assertFalse(Task.objects.filter(pk=task.id).exists())
self.assertFalse(ImageUpload.objects.filter(task=task).exists())
task_assets_path = os.path.join(settings.MEDIA_ROOT, task_directory_path(task.id, task.project.id))
self.assertFalse(os.path.exists(task_assets_path))
testWatch.clear()
testWatch.intercept("app.scheduler.process_pending_tasks")
# Create a task, then kill the processing node
res = client.post("/api/projects/{}/tasks/".format(project.id), {
'images': [image1, image2],
'name': 'test_task_offline',
'processing_node': pnode.id,
'auto_processing_node': 'false'
}, format="multipart")
self.assertTrue(res.status_code == status.HTTP_201_CREATED)
task = Task.objects.get(pk=res.data['id'])
# Stop processing node
node_odm.terminate()
task.refresh_from_db()
self.assertTrue(task.last_error is None)
scheduler.process_pending_tasks()
# Processing should fail and set an error
task.refresh_from_db()
self.assertTrue(task.last_error is not None)
self.assertTrue(task.status == status_codes.FAILED)
# Now bring it back online
node_odm = start_processing_node()
# Restart
res = client.post("/api/projects/{}/tasks/{}/restart/".format(project.id, task.id))
self.assertTrue(res.status_code == status.HTTP_200_OK)
task.refresh_from_db()
self.assertTrue(task.pending_action == pending_actions.RESTART)
# After processing, the task should have restarted, and have no UUID or status
scheduler.process_pending_tasks()
task.refresh_from_db()
self.assertTrue(task.status is None)
self.assertTrue(len(task.uuid) == 0)
# Another step and it should have acquired a UUID
scheduler.process_pending_tasks()
task.refresh_from_db()
self.assertTrue(task.status in [status_codes.RUNNING, status_codes.COMPLETED])
self.assertTrue(len(task.uuid) > 0)
# Another step and it should be completed
time.sleep(DELAY)
scheduler.process_pending_tasks()
task.refresh_from_db()
self.assertTrue(task.status == status_codes.COMPLETED)
# Test connection, timeout errors
res = client.post("/api/projects/{}/tasks/{}/restart/".format(project.id, task.id))
def connTimeout(*args, **kwargs):
raise requests.exceptions.ConnectTimeout("Simulated timeout")
testWatch.intercept("nodeodm.api_client.task_output", connTimeout)
scheduler.process_pending_tasks()
# Timeout errors should be handled by retrying again at a later time
# and not fail
task.refresh_from_db()
self.assertTrue(task.last_error is None)
# Reassigning the task to another project should move its assets
self.assertTrue(os.path.exists(full_task_directory_path(task.id, project.id)))
self.assertTrue(len(task.imageupload_set.all()) == 2)
for image in task.imageupload_set.all():
self.assertTrue('project/{}/'.format(project.id) in image.image.path)
task.project = other_project
task.save()
task.refresh_from_db()
self.assertFalse(os.path.exists(full_task_directory_path(task.id, project.id)))
self.assertTrue(os.path.exists(full_task_directory_path(task.id, other_project.id)))
for image in task.imageupload_set.all():
self.assertTrue('project/{}/'.format(other_project.id) in image.image.path)
node_odm.terminate()
# Restart node-odm as to not generate orthophotos
testWatch.clear()
node_odm = start_processing_node("--test_skip_orthophotos")
res = client.post("/api/projects/{}/tasks/".format(project.id), {
'images': [image1, image2],
'name': 'test_task_no_orthophoto',
'processing_node': pnode.id,
'auto_processing_node': 'false'
}, format="multipart")
self.assertTrue(res.status_code == status.HTTP_201_CREATED)
scheduler.process_pending_tasks()
time.sleep(DELAY)
scheduler.process_pending_tasks()
task = Task.objects.get(pk=res.data['id'])
self.assertTrue(task.status == status_codes.COMPLETED)
# Orthophoto files/directories should be missing
self.assertFalse(os.path.exists(task.assets_path("odm_orthophoto", "odm_orthophoto.tif")))
self.assertFalse(os.path.exists(task.assets_path("orthophoto_tiles")))
# orthophoto_extent should be none
self.assertTrue(task.orthophoto_extent is None)
# Available assets should be missing orthophoto.tif type
# but others such as textured_model.zip should be available
res = client.get("/api/projects/{}/tasks/{}/".format(project.id, task.id))
self.assertFalse('orthophoto.tif' in res.data['available_assets'])
self.assertTrue('textured_model.zip' in res.data['available_assets'])
image1.close()
image2.close()
node_odm.terminate()
def test_task_auto_processing_node(self):
project = Project.objects.get(name="User Test Project")
task = Task.objects.create(project=project, name="Test")
pnode = ProcessingNode.objects.create(hostname="invalid-host", port=11223)
another_pnode = ProcessingNode.objects.create(hostname="invalid-host-2", port=11223)
# By default
self.assertTrue(task.auto_processing_node)
self.assertTrue(task.processing_node is None)
# Simulate an error
task.last_error = "Test error"
task.save()
scheduler.process_pending_tasks()
# A processing node should not have been assigned
task.refresh_from_db()
self.assertTrue(task.processing_node is None)
# Remove error
task.last_error = None
task.save()
scheduler.process_pending_tasks()
# A processing node should not have been assigned because no processing nodes are online
task.refresh_from_db()
self.assertTrue(task.processing_node is None)
# Bring a proessing node online
pnode.last_refreshed = timezone.now()
pnode.save()
self.assertTrue(pnode.is_online())
# A processing node has been assigned
scheduler.process_pending_tasks()
task.refresh_from_db()
self.assertTrue(task.processing_node.id == pnode.id)
# Task should have failed (no images provided, invalid host...)
self.assertTrue(task.last_error is not None)
# Bring another processing node online, and bring the old one offline
pnode.last_refreshed = timezone.now() - timedelta(minutes=OFFLINE_MINUTES)
pnode.save()
another_pnode.last_refreshed = timezone.now()
another_pnode.save()
# Remove error
task.last_error = None
task.status = None
task.save()
scheduler.process_pending_tasks()
# Processing node is now cleared and a new one will be assigned on the next tick
task.refresh_from_db()
self.assertTrue(task.processing_node is None)
scheduler.process_pending_tasks()
task.refresh_from_db()
self.assertTrue(task.processing_node.id == another_pnode.id)
def test_task_manual_processing_node(self):
user = User.objects.get(username="testuser")
project = Project.objects.create(name="User Test Project", owner=user)
task = Task.objects.create(project=project, name="Test", auto_processing_node=False)
# Bring a processing node online
pnode = ProcessingNode.objects.create(hostname="invalid-host", port=11223)
pnode.last_refreshed = timezone.now()
pnode.save()
self.assertTrue(pnode.is_online())
scheduler.process_pending_tasks()
# A processing node should not have been assigned because we asked
# not to via auto_processing_node = false
task.refresh_from_db()
self.assertTrue(task.processing_node is None)