From 8205fcc888f44ff3270b948be2d83ae0ebf2d4f8 Mon Sep 17 00:00:00 2001 From: Piero Toffanin Date: Thu, 7 Feb 2019 22:07:49 -0500 Subject: [PATCH] Replaced nodeodm ApiClient with pyodm --- app/models/task.py | 93 +++++++++++-------------- app/tests/test_api_task.py | 4 ++ app/testwatch.py | 8 +-- nodeodm/api_client.py | 139 ------------------------------------- nodeodm/exceptions.py | 10 --- nodeodm/models.py | 133 ++++++++++++----------------------- nodeodm/status_codes.py | 11 +-- nodeodm/tests.py | 137 ++++++++++++++++++------------------ requirements.txt | 1 + 9 files changed, 166 insertions(+), 370 deletions(-) delete mode 100644 nodeodm/api_client.py delete mode 100644 nodeodm/exceptions.py diff --git a/app/models/task.py b/app/models/task.py index 759f4a16..cbaf333c 100644 --- a/app/models/task.py +++ b/app/models/task.py @@ -1,7 +1,6 @@ import logging import os import shutil -import zipfile import time import uuid as uuid_module @@ -11,7 +10,7 @@ from shlex import quote import piexif import re -import requests +import zipfile from PIL import Image from django.contrib.gis.gdal import GDALRaster from django.contrib.gis.gdal import OGRGeometry @@ -21,20 +20,18 @@ from django.core.exceptions import ValidationError from django.db import models from django.db import transaction from django.utils import timezone -from requests.packages.urllib3.exceptions import ReadTimeoutError from app import pending_actions from django.contrib.gis.db.models.fields import GeometryField +from app.testwatch import testWatch from nodeodm import status_codes -from nodeodm.exceptions import ProcessingError, ProcessingTimeout, ProcessingException from nodeodm.models import ProcessingNode +from pyodm.exceptions import NodeResponseError, NodeConnectionError, NodeServerError, OdmError from webodm import settings from .project import Project from functools import partial -from multiprocessing import cpu_count -from concurrent.futures import ThreadPoolExecutor import subprocess logger = logging.getLogger('app.logger') @@ -380,7 +377,7 @@ class Task(models.Model): # We can't easily differentiate between the two, so we need # to notify the user because if it crashed due to low memory # the user might need to take action (or be stuck in an infinite loop) - raise ProcessingError("Processing node went offline. This could be due to insufficient memory or a network error.") + raise NodeServerError("Processing node went offline. This could be due to insufficient memory or a network error.") if self.processing_node: # Need to process some images (UUID not yet set and task doesn't have pending actions)? @@ -398,17 +395,22 @@ class Task(models.Model): time_has_elapsed = time.time() - last_update >= 2 if time_has_elapsed: + testWatch.manual_log_call("Task.process.callback") self.check_if_canceled() - - if time_has_elapsed or (progress >= 1.0 - 1e-6 and progress <= 1.0 + 1e-6): - Task.objects.filter(pk=self.id).update(upload_progress=progress) + Task.objects.filter(pk=self.id).update(upload_progress=float(progress) / 100.0) last_update = time.time() # This takes a while - uuid = self.processing_node.process_new_task(images, self.name, self.options, callback) + try: + uuid = self.processing_node.process_new_task(images, self.name, self.options, callback) + except NodeConnectionError as e: + # If we can't create a task because the node is offline + # We want to fail instead of trying again + raise NodeServerError('Connection error: ' + str(e)) # Refresh task object before committing change self.refresh_from_db() + self.upload_progress = 1.0 self.uuid = uuid self.save() @@ -423,14 +425,14 @@ class Task(models.Model): # We don't care if this fails (we tried) try: self.processing_node.cancel_task(self.uuid) - except ProcessingException: + except OdmError: logger.warning("Could not cancel {} on processing node. We'll proceed anyway...".format(self)) self.status = status_codes.CANCELED self.pending_action = None self.save() else: - raise ProcessingError("Cannot cancel a task that has no processing node or UUID") + raise NodeServerError("Cannot cancel a task that has no processing node or UUID") elif self.pending_action == pending_actions.RESTART: logger.info("Restarting {}".format(self)) @@ -443,8 +445,8 @@ class Task(models.Model): if self.uuid: try: info = self.processing_node.get_task_info(self.uuid) - uuid_still_exists = info['uuid'] == self.uuid - except ProcessingException: + uuid_still_exists = info.uuid == self.uuid + except OdmError: pass need_to_reprocess = False @@ -453,7 +455,7 @@ class Task(models.Model): # Good to go try: self.processing_node.restart_task(self.uuid, self.options) - except ProcessingError as e: + except (NodeServerError, NodeResponseError) as e: # Something went wrong logger.warning("Could not restart {}, will start a new one".format(self)) need_to_reprocess = True @@ -481,7 +483,7 @@ class Task(models.Model): self.running_progress = 0 self.save() else: - raise ProcessingError("Cannot restart a task that has no processing node") + raise NodeServerError("Cannot restart a task that has no processing node") elif self.pending_action == pending_actions.REMOVE: logger.info("Removing {}".format(self)) @@ -491,7 +493,7 @@ class Task(models.Model): # Are expected to be purged on their own after a set amount of time anyway try: self.processing_node.remove_task(self.uuid) - except ProcessingException: + except OdmError: pass # What's more important is that we delete our task properly here @@ -506,8 +508,8 @@ class Task(models.Model): # Update task info from processing node info = self.processing_node.get_task_info(self.uuid) - self.processing_time = info["processingTime"] - self.status = info["status"]["code"] + self.processing_time = info.processing_time + self.status = info.status.value current_lines_count = len(self.console_output.split("\n")) console_output = self.processing_node.get_task_console_output(self.uuid, current_lines_count) @@ -521,8 +523,8 @@ class Task(models.Model): self.running_progress = value break - if "errorMessage" in info["status"]: - self.last_error = info["status"]["errorMessage"] + if info.last_error != "": + self.last_error = info.last_error # Has the task just been canceled, failed, or completed? if self.status in [status_codes.FAILED, status_codes.COMPLETED, status_codes.CANCELED]: @@ -541,41 +543,27 @@ class Task(models.Model): logger.info("Downloading all.zip for {}".format(self)) # Download all assets - try: - zip_stream = self.processing_node.download_task_asset(self.uuid, "all.zip") - zip_path = os.path.join(assets_dir, "all.zip") + last_update = 0 - # Keep track of download progress (if possible) - content_length = zip_stream.headers.get('content-length') - total_length = int(content_length) if content_length is not None else None - downloaded = 0 - last_update = 0 + def callback(progress): + nonlocal last_update - with open(zip_path, 'wb') as fd: - for chunk in zip_stream.iter_content(4096): - downloaded += len(chunk) + time_has_elapsed = time.time() - last_update >= 2 - # Track progress if we know the content header length - # every 2 seconds - if total_length > 0 and time.time() - last_update >= 2: - Task.objects.filter(pk=self.id).update(running_progress=(self.TASK_OUTPUT_MILESTONES_LAST_VALUE + (float(downloaded) / total_length) * 0.1)) - last_update = time.time() + if time_has_elapsed or int(progress) == 100: + Task.objects.filter(pk=self.id).update(running_progress=( + self.TASK_OUTPUT_MILESTONES_LAST_VALUE + (float(progress) / 100.0) * 0.1)) + last_update = time.time() - fd.write(chunk) - except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, ReadTimeoutError) as e: - raise ProcessingTimeout(e) + zip_path = self.processing_node.download_task_assets(self.uuid, assets_dir, progress_callback=callback) - logger.info("Done downloading all.zip for {}".format(self)) + logger.info("Extracting all.zip for {}".format(self)) - self.refresh_from_db() - self.console_output += "Extracting results. This could take a few minutes...\n"; - self.save() - - # Extract from zip with zipfile.ZipFile(zip_path, "r") as zip_h: zip_h.extractall(assets_dir) - logger.info("Extracted all.zip for {}".format(self)) + # Rename to all.zip + os.rename(zip_path, os.path.join(os.path.dirname(zip_path), 'all.zip')) # Populate *_extent fields extent_fields = [ @@ -602,7 +590,6 @@ class Task(models.Model): self.update_available_assets_field() self.running_progress = 1.0 self.console_output += "Done!\n" - self.status = status_codes.COMPLETED self.save() from app.plugins import signals as plugin_signals @@ -614,12 +601,10 @@ class Task(models.Model): # Still waiting... self.save() - except ProcessingError as e: + except (NodeServerError, NodeResponseError) as e: self.set_failure(str(e)) - except (ConnectionRefusedError, ConnectionError) as e: - logger.warning("{} cannot communicate with processing node: {}".format(self, str(e))) - except ProcessingTimeout as e: - logger.warning("{} timed out with error: {}. We'll try reprocessing at the next tick.".format(self, str(e))) + except NodeConnectionError as e: + logger.warning("{} connection/timeout error: {}. We'll try reprocessing at the next tick.".format(self, str(e))) except TaskInterruptedException as e: # Task was interrupted during image resize / upload logger.warning("{} interrupted".format(self, str(e))) diff --git a/app/tests/test_api_task.py b/app/tests/test_api_task.py index 376f89a0..f5e171ea 100644 --- a/app/tests/test_api_task.py +++ b/app/tests/test_api_task.py @@ -130,6 +130,7 @@ class TestApiTask(BootTransactionTestCase): self.assertTrue(im.size == img1.size) # Normal case with images[], GCP, name and processing node parameter and resize_to option + testWatch.clear() gcp = open("app/fixtures/gcp.txt", 'r') res = client.post("/api/projects/{}/tasks/".format(project.id), { 'images': [image1, image2, gcp], @@ -169,6 +170,9 @@ class TestApiTask(BootTransactionTestCase): # Upload progress is 100% self.assertEqual(resized_task.upload_progress, 1.0) + # Upload progress callback has been called + self.assertTrue(testWatch.get_calls_count("Task.process.callback") > 0) + # Case with malformed GCP file option with open("app/fixtures/gcp_malformed.txt", 'r') as malformed_gcp: res = client.post("/api/projects/{}/tasks/".format(project.id), { diff --git a/app/testwatch.py b/app/testwatch.py index 3f54a888..9da29873 100644 --- a/app/testwatch.py +++ b/app/testwatch.py @@ -56,10 +56,10 @@ class TestWatch: self.manual_log_call(fname, *args, **kwargs) def manual_log_call(self, fname, *args, **kwargs): - logger.info("{} called".format(fname)) - list = self.get_calls(fname) - list.append({'f': fname, 'args': args, 'kwargs': kwargs}) - self.set_calls(fname, list) + if settings.TESTING: + list = self.get_calls(fname) + list.append({'f': fname, 'args': args, 'kwargs': kwargs}) + self.set_calls(fname, list) def hook_pre(self, func, *args, **kwargs): if settings.TESTING and self.should_prevent_execution(func): diff --git a/nodeodm/api_client.py b/nodeodm/api_client.py deleted file mode 100644 index 173f283a..00000000 --- a/nodeodm/api_client.py +++ /dev/null @@ -1,139 +0,0 @@ -""" -An interface to NodeODM's API -https://github.com/pierotofy/NodeODM/blob/master/docs/index.adoc -""" -from requests.packages.urllib3.fields import RequestField -from requests_toolbelt.multipart import encoder -import requests -import mimetypes -import json -import os -from urllib.parse import urlunparse, urlencode -from app.testwatch import TestWatch - -# Extends class to support multipart form data -# fields with the same name -# https://github.com/requests/toolbelt/issues/225 -class MultipartEncoder(encoder.MultipartEncoder): - """Multiple files with the same name support, i.e. files[]""" - - def _iter_fields(self): - _fields = self.fields - if hasattr(self.fields, 'items'): - _fields = list(self.fields.items()) - for k, v in _fields: - for field in self._iter_field(k, v): - yield field - - @classmethod - def _iter_field(cls, field_name, field): - file_name = None - file_type = None - file_headers = None - if field and isinstance(field, (list, tuple)): - if all([isinstance(f, (list, tuple)) for f in field]): - for f in field: - yield next(cls._iter_field(field_name, f)) - else: - raise StopIteration() - if len(field) == 2: - file_name, file_pointer = field - elif len(field) == 3: - file_name, file_pointer, file_type = field - else: - file_name, file_pointer, file_type, file_headers = field - else: - file_pointer = field - - field = RequestField(name=field_name, - data=file_pointer, - filename=file_name, - headers=file_headers) - field.make_multipart(content_type=file_type) - yield field - -class ApiClient: - def __init__(self, host, port, token = "", timeout=30): - self.host = host - self.port = port - self.token = token - self.timeout = timeout - - def url(self, url, query = {}): - netloc = self.host if (self.port == 80 or self.port == 443) else "{}:{}".format(self.host, self.port) - proto = 'https' if self.port == 443 else 'http' - - if len(self.token) > 0: - query['token'] = self.token - - return urlunparse((proto, netloc, url, '', urlencode(query), '')) - - def info(self): - return requests.get(self.url('/info'), timeout=self.timeout).json() - - def options(self): - return requests.get(self.url('/options'), timeout=self.timeout).json() - - def task_info(self, uuid): - return requests.get(self.url('/task/{}/info').format(uuid), timeout=self.timeout).json() - - @TestWatch.watch() - def task_output(self, uuid, line = 0): - return requests.get(self.url('/task/{}/output', {'line': line}).format(uuid), timeout=self.timeout).json() - - def task_cancel(self, uuid): - return requests.post(self.url('/task/cancel'), data={'uuid': uuid}, timeout=self.timeout).json() - - def task_remove(self, uuid): - return requests.post(self.url('/task/remove'), data={'uuid': uuid}, timeout=self.timeout).json() - - def task_restart(self, uuid, options = None): - data = {'uuid': uuid} - if options is not None: data['options'] = json.dumps(options) - return requests.post(self.url('/task/restart'), data=data, timeout=self.timeout).json() - - def task_download(self, uuid, asset): - res = requests.get(self.url('/task/{}/download/{}').format(uuid, asset), stream=True, timeout=self.timeout) - if "Content-Type" in res.headers and "application/json" in res.headers['Content-Type']: - return res.json() - else: - return res - - def new_task(self, images, name=None, options=[], progress_callback=None): - """ - Starts processing of a new task - :param images: list of path images - :param name: name of the task - :param options: options to be used for processing ([{'name': optionName, 'value': optionValue}, ...]) - :param progress_callback: optional callback invoked during the upload images process to be used to report status. - :return: UUID or error - """ - - # Equivalent as passing the open file descriptor, since requests - # eventually calls read(), but this way we make sure to close - # the file prior to reading the next, so we don't run into open file OS limits - def read_file(path): - with open(path, 'rb') as f: - return f.read() - - fields = { - 'name': name, - 'options': json.dumps(options), - 'images': [(os.path.basename(image), read_file(image), (mimetypes.guess_type(image)[0] or "image/jpg")) for image in images] - } - - def create_callback(mpe): - total_bytes = mpe.len - - def callback(monitor): - if progress_callback is not None and total_bytes > 0: - progress_callback(monitor.bytes_read / total_bytes) - - return callback - - e = MultipartEncoder(fields=fields) - m = encoder.MultipartEncoderMonitor(e, create_callback(e)) - - return requests.post(self.url("/task/new"), - data=m, - headers={'Content-Type': m.content_type}).json() \ No newline at end of file diff --git a/nodeodm/exceptions.py b/nodeodm/exceptions.py deleted file mode 100644 index 313e5fbd..00000000 --- a/nodeodm/exceptions.py +++ /dev/null @@ -1,10 +0,0 @@ -class ProcessingException(Exception): - pass - - -class ProcessingError(ProcessingException): - pass - - -class ProcessingTimeout(ProcessingException): - pass \ No newline at end of file diff --git a/nodeodm/models.py b/nodeodm/models.py index 54a47b22..ffeab88f 100644 --- a/nodeodm/models.py +++ b/nodeodm/models.py @@ -1,6 +1,5 @@ from __future__ import unicode_literals -import requests from django.db import models from django.contrib.postgres import fields from django.utils import timezone @@ -8,30 +7,13 @@ from django.dispatch import receiver from guardian.models import GroupObjectPermissionBase from guardian.models import UserObjectPermissionBase -from .api_client import ApiClient import json +from pyodm import Node +from pyodm import exceptions from django.db.models import signals from datetime import timedelta -from .exceptions import ProcessingError, ProcessingTimeout -import simplejson -def api(func): - """ - Catches JSON decoding errors that might happen when the server - answers unexpectedly - """ - def wrapper(*args,**kwargs): - try: - return func(*args, **kwargs) - except (json.decoder.JSONDecodeError, simplejson.JSONDecodeError) as e: - raise ProcessingError(str(e)) - except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: - raise ProcessingTimeout(str(e)) - - - return wrapper - OFFLINE_MINUTES = 5 # Number of minutes a node hasn't been seen before it should be considered offline class ProcessingNode(models.Model): @@ -65,7 +47,6 @@ class ProcessingNode(models.Model): return self.last_refreshed is not None and \ self.last_refreshed >= timezone.now() - timedelta(minutes=OFFLINE_MINUTES) - @api def update_node_info(self): """ Retrieves information and options from the node API @@ -76,27 +57,22 @@ class ProcessingNode(models.Model): api_client = self.api_client(timeout=5) try: info = api_client.info() - if 'error' in info: - return False - self.api_version = info['version'] - self.queue_count = info['taskQueueCount'] + self.api_version = info.version + self.queue_count = info.task_queue_count + self.max_images = info.max_images + self.odm_version = info.odm_version - if 'maxImages' in info: - self.max_images = info['maxImages'] - if 'odmVersion' in info: - self.odm_version = info['odmVersion'] - - options = api_client.options() + options = list(map(lambda o: o.__dict__, api_client.options())) self.available_options = options self.last_refreshed = timezone.now() self.save() return True - except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, json.decoder.JSONDecodeError, simplejson.JSONDecodeError): + except exceptions.OdmError: return False def api_client(self, timeout=30): - return ApiClient(self.hostname, self.port, self.token, timeout) + return Node(self.hostname, self.port, self.token, timeout) def get_available_options_json(self, pretty=False): """ @@ -105,7 +81,20 @@ class ProcessingNode(models.Model): kwargs = dict(indent=4, separators=(',', ": ")) if pretty else dict() return json.dumps(self.available_options, **kwargs) - @api + def options_list_to_kv(self, options = []): + """ + Convers options formatted as a list ([{'name': optionName, 'value': optionValue}, ...]) + to a dictionary {optionName: value, ...} + :param options: options + :return: dict + """ + opts = {} + if options is not None: + for o in options: + opts[o['name']] = o['value'] + + return opts + def process_new_task(self, images, name=None, options=[], progress_callback=None): """ Sends a set of images (and optional GCP file) via the API @@ -118,22 +107,15 @@ class ProcessingNode(models.Model): :returns UUID of the newly created task """ - if len(images) < 2: raise ProcessingError("Need at least 2 images") + if len(images) < 2: raise exceptions.NodeServerError("Need at least 2 images") api_client = self.api_client() - try: - result = api_client.new_task(images, name, options, progress_callback) - except requests.exceptions.ConnectionError as e: - raise ProcessingError(e) - if isinstance(result, dict) and 'uuid' in result: - return result['uuid'] - elif isinstance(result, dict) and 'error' in result: - raise ProcessingError(result['error']) - else: - raise ProcessingError("Unexpected answer from server: {}".format(result)) + opts = self.options_list_to_kv(options) + + task = api_client.create_task(images, opts, name, progress_callback) + return task.uuid - @api def get_task_info(self, uuid): """ Gets information about this task, such as name, creation date, @@ -141,79 +123,50 @@ class ProcessingNode(models.Model): images being processed. """ api_client = self.api_client() - result = api_client.task_info(uuid) - if isinstance(result, dict) and 'uuid' in result: - return result - elif isinstance(result, dict) and 'error' in result: - raise ProcessingError(result['error']) - else: - raise ProcessingError("Unknown result from task info: {}".format(result)) + task = api_client.get_task(uuid) + return task.info() - @api def get_task_console_output(self, uuid, line): """ Retrieves the console output of the OpenDroneMap's process. Useful for monitoring execution and to provide updates to the user. """ api_client = self.api_client() - result = api_client.task_output(uuid, line) - if isinstance(result, dict) and 'error' in result: - raise ProcessingError(result['error']) - elif isinstance(result, list): - return result - else: - raise ProcessingError("Unknown response for console output: {}".format(result)) + task = api_client.get_task(uuid) + return task.output(line) - @api def cancel_task(self, uuid): """ Cancels a task (stops its execution, or prevents it from being executed) """ api_client = self.api_client() - return self.handle_generic_post_response(api_client.task_cancel(uuid)) + task = api_client.get_task(uuid) + return task.cancel() - @api def remove_task(self, uuid): """ Removes a task and deletes all of its assets """ api_client = self.api_client() - return self.handle_generic_post_response(api_client.task_remove(uuid)) + task = api_client.get_task(uuid) + return task.remove() - @api - def download_task_asset(self, uuid, asset): + def download_task_assets(self, uuid, destination, progress_callback): """ Downloads a task asset """ api_client = self.api_client() - res = api_client.task_download(uuid, asset) - if isinstance(res, dict) and 'error' in res: - raise ProcessingError(res['error']) - else: - return res + task = api_client.get_task(uuid) + return task.download_zip(destination, progress_callback) - @api def restart_task(self, uuid, options = None): """ Restarts a task that was previously canceled or that had failed to process """ - api_client = self.api_client() - return self.handle_generic_post_response(api_client.task_restart(uuid, options)) - @staticmethod - def handle_generic_post_response(result): - """ - Handles a POST response that has either a "success" flag, or an error message. - This is a common response in NodeODM POST calls. - :param result: result of API call - :return: True on success, raises ProcessingException otherwise - """ - if isinstance(result, dict) and 'error' in result: - raise ProcessingError(result['error']) - elif isinstance(result, dict) and 'success' in result: - return True - else: - raise ProcessingError("Unknown response: {}".format(result)) + api_client = self.api_client() + task = api_client.get_task(uuid) + return task.restart(self.options_list_to_kv(options)) def delete(self, using=None, keep_parents=False): pnode_id = self.id @@ -229,7 +182,7 @@ def auto_update_node_info(sender, instance, created, **kwargs): if created: try: instance.update_node_info() - except ProcessingError: + except exceptions.OdmError: pass class ProcessingNodeUserObjectPermission(UserObjectPermissionBase): diff --git a/nodeodm/status_codes.py b/nodeodm/status_codes.py index 71867326..4e891b51 100644 --- a/nodeodm/status_codes.py +++ b/nodeodm/status_codes.py @@ -1,5 +1,6 @@ -QUEUED = 10 -RUNNING = 20 -FAILED = 30 -COMPLETED = 40 -CANCELED = 50 +from pyodm.types import TaskStatus +QUEUED = TaskStatus.QUEUED.value +RUNNING = TaskStatus.RUNNING.value +FAILED = TaskStatus.FAILED.value +COMPLETED = TaskStatus.COMPLETED.value +CANCELED = TaskStatus.CANCELED.value \ No newline at end of file diff --git a/nodeodm/tests.py b/nodeodm/tests.py index 7ae2a1be..ece211c8 100644 --- a/nodeodm/tests.py +++ b/nodeodm/tests.py @@ -1,4 +1,5 @@ -from datetime import timedelta +import os +from datetime import timedelta, datetime import requests from django.test import TestCase @@ -6,10 +7,11 @@ from django.utils import six import subprocess, time from django.utils import timezone from os import path + +from pyodm import Node +from pyodm.exceptions import NodeConnectionError, NodeServerError, NodeResponseError +from webodm import settings from .models import ProcessingNode, OFFLINE_MINUTES -from .api_client import ApiClient -from requests.exceptions import ConnectionError -from .exceptions import ProcessingError from . import status_codes current_dir = path.dirname(path.realpath(__file__)) @@ -31,21 +33,21 @@ class TestClientApi(TestCase): cls.node_odm.terminate() def setUp(self): - self.api_client = ApiClient("localhost", 11223) + self.api_client = Node("localhost", 11223) def tearDown(self): pass def test_offline_api(self): - api = ApiClient("offline-host", 3000) - self.assertRaises(ConnectionError, api.info) - self.assertRaises(ConnectionError, api.options) + api = Node("offline-host", 3000) + self.assertRaises(NodeConnectionError, api.info) + self.assertRaises(NodeConnectionError, api.options) def test_info(self): info = self.api_client.info() - self.assertTrue(isinstance(info['version'], six.string_types), "Found version string") - self.assertTrue(isinstance(info['taskQueueCount'], int), "Found task queue count") - self.assertTrue(info['maxImages'] is None, "Found task max images") + self.assertTrue(isinstance(info.version, six.string_types), "Found version string") + self.assertTrue(isinstance(info.task_queue_count, int), "Found task queue count") + self.assertTrue(info.max_images is None, "Found task max images") def test_options(self): options = self.api_client.options() @@ -82,10 +84,10 @@ class TestClientApi(TestCase): retries = 0 while True: try: - task_info = api.task_info(uuid) - if task_info['status']['code'] == status: + task_info = api.get_task(uuid).info() + if task_info.status.value == status: return True - except ProcessingError: + except (NodeServerError, NodeResponseError): pass time.sleep(0.5) @@ -94,42 +96,43 @@ class TestClientApi(TestCase): self.assertTrue(False, error_description) return False - api = ApiClient("localhost", 11223) + api = Node("localhost", 11223) online_node = ProcessingNode.objects.get(pk=1) # Can call info(), options() - self.assertTrue(type(api.info()['version']) == str) + self.assertTrue(type(api.info().version) == str) self.assertTrue(len(api.options()) > 0) # Can call new_task() import glob - res = api.new_task( + res = api.create_task( glob.glob("nodeodm/fixtures/test_images/*.JPG"), - "test", - [{'name': 'force-ccd', 'value': 6.16}]) - uuid = res['uuid'] + {'force-ccd': 6.16}, + "test") + uuid = res.uuid self.assertTrue(uuid != None) # Can call task_info() - task_info = api.task_info(uuid) - self.assertTrue(isinstance(task_info['dateCreated'], int)) - self.assertTrue(isinstance(task_info['uuid'], str)) + task = api.get_task(uuid) + task_info = task.info() + self.assertTrue(isinstance(task_info.date_created, datetime)) + self.assertTrue(isinstance(task_info.uuid, str)) # Can download assets? # Here we are waiting for the task to be completed wait_for_status(api, uuid, status_codes.COMPLETED, 10, "Could not download assets") - asset = api.task_download(uuid, "all.zip") - self.assertTrue(isinstance(asset, requests.Response)) + asset = api.get_task(uuid).download_zip(settings.MEDIA_TMP) + self.assertTrue(os.path.exists(asset)) # task_output - self.assertTrue(isinstance(api.task_output(uuid, 0), list)) + self.assertTrue(isinstance(api.get_task(uuid).output(0), list)) self.assertTrue(isinstance(online_node.get_task_console_output(uuid, 0), list)) - self.assertRaises(ProcessingError, online_node.get_task_console_output, "wrong-uuid", 0) + self.assertRaises(NodeResponseError, online_node.get_task_console_output, "wrong-uuid", 0) # Can restart task self.assertTrue(online_node.restart_task(uuid)) - self.assertRaises(ProcessingError, online_node.restart_task, "wrong-uuid") + self.assertRaises(NodeResponseError, online_node.restart_task, "wrong-uuid") wait_for_status(api, uuid, status_codes.COMPLETED, 10, "Could not restart task") @@ -139,28 +142,28 @@ class TestClientApi(TestCase): wait_for_status(api, uuid, status_codes.COMPLETED, 10, "Could not restart task with options") # Verify that options have been updated after restarting the task - task_info = api.task_info(uuid) - self.assertTrue(len(task_info['options']) == 1) - self.assertTrue(task_info['options'][0]['name'] == 'mesh-size') - self.assertTrue(task_info['options'][0]['value'] == 12345) + task_info = api.get_task(uuid).info() + self.assertTrue(len(task_info.options) == 1) + self.assertTrue(task_info.options[0]['name'] == 'mesh-size') + self.assertTrue(task_info.options[0]['value'] == 12345) # Can cancel task (should work even if we completed the task) self.assertTrue(online_node.cancel_task(uuid)) - self.assertRaises(ProcessingError, online_node.cancel_task, "wrong-uuid") + self.assertRaises(NodeResponseError, online_node.cancel_task, "wrong-uuid") # Wait for task to be canceled wait_for_status(api, uuid, status_codes.CANCELED, 5, "Could not remove task") self.assertTrue(online_node.remove_task(uuid)) - self.assertRaises(ProcessingError, online_node.remove_task, "wrong-uuid") + self.assertRaises(NodeResponseError, online_node.remove_task, "wrong-uuid") # Cannot delete task again - self.assertRaises(ProcessingError, online_node.remove_task, uuid) + self.assertRaises(NodeResponseError, online_node.remove_task, uuid) # Task has been deleted - self.assertRaises(ProcessingError, online_node.get_task_info, uuid) + self.assertRaises(NodeResponseError, online_node.get_task_info, uuid) # Test URL building for HTTPS - sslApi = ApiClient("localhost", 443, 'abc') + sslApi = Node("localhost", 443, 'abc') self.assertEqual(sslApi.url('/info'), 'https://localhost/info?token=abc') def test_find_best_available_node_and_is_online(self): @@ -204,10 +207,10 @@ class TestClientApi(TestCase): retries = 0 while True: try: - task_info = api.task_info(uuid) - if task_info['status']['code'] == status: + task_info = api.get_task(uuid).info() + if task_info.status.value == status: return True - except ProcessingError: + except (NodeResponseError, NodeServerError): pass time.sleep(0.5) @@ -216,39 +219,37 @@ class TestClientApi(TestCase): self.assertTrue(False, error_description) return False - api = ApiClient("localhost", 11224, "test_token") + api = Node("localhost", 11224, "test_token") online_node = ProcessingNode.objects.get(pk=3) self.assertTrue(online_node.update_node_info(), "Could update info") # Cannot call info(), options() without tokens api.token = "invalid" - self.assertTrue(type(api.info()['error']) == str) - self.assertTrue(type(api.options()['error']) == str) + self.assertRaises(NodeResponseError, api.info) + self.assertRaises(NodeResponseError, api.options) - # Cannot call new_task() without token + # Cannot call create_task() without token import glob - res = api.new_task( - glob.glob("nodeodm/fixtures/test_images/*.JPG")) - self.assertTrue('error' in res) + self.assertRaises(NodeResponseError, api.create_task, glob.glob("nodeodm/fixtures/test_images/*.JPG")) - # Can call new_task() with token + # Can call create_task() with token api.token = "test_token" - res = api.new_task( + res = api.create_task( glob.glob("nodeodm/fixtures/test_images/*.JPG")) - self.assertTrue('uuid' in res) - self.assertFalse('error' in res) - uuid = res['uuid'] + uuid = res.uuid + self.assertTrue(uuid != None) # Can call task_info() with token - task_info = api.task_info(uuid) - self.assertTrue(isinstance(task_info['dateCreated'], int)) + task_info = api.get_task(uuid).info() + self.assertTrue(isinstance(task_info.date_created, datetime)) # Cannot call task_info() without token api.token = "invalid" - res = api.task_info(uuid) - self.assertTrue('error' in res) - self.assertTrue('token does not match' in res['error']) + try: + api.get_task(uuid).info() + except NodeResponseError as e: + self.assertTrue('token does not match' in str(e)) # Here we are waiting for the task to be completed api.token = "test_token" @@ -256,32 +257,32 @@ class TestClientApi(TestCase): # Cannot download assets without token api.token = "invalid" - res = api.task_download(uuid, "all.zip") - self.assertTrue('error' in res) + task = api.get_task(uuid) + self.assertRaises(NodeResponseError, task.download_assets, settings.MEDIA_TMP) api.token = "test_token" - asset = api.task_download(uuid, "all.zip") - self.assertTrue(isinstance(asset, requests.Response)) + asset_archive = task.download_zip(settings.MEDIA_TMP) + self.assertTrue(os.path.exists(asset_archive)) + os.unlink(asset_archive) # Cannot get task output without token api.token = "invalid" - res = api.task_output(uuid, 0) - self.assertTrue('error' in res) + self.assertRaises(NodeResponseError, task.output, 0) api.token = "test_token" - res = api.task_output(uuid, 0) + res = task.output() self.assertTrue(isinstance(res, list)) # Cannot restart task without token online_node.token = "invalid" - self.assertRaises(ProcessingError, online_node.restart_task, uuid) + self.assertRaises(NodeResponseError, online_node.restart_task, uuid) online_node.token = "test_token" self.assertTrue(online_node.restart_task(uuid)) # Cannot cancel task without token online_node.token = "invalid" - self.assertRaises(ProcessingError, online_node.cancel_task, uuid) + self.assertRaises(NodeResponseError, online_node.cancel_task, uuid) online_node.token = "test_token" self.assertTrue(online_node.cancel_task(uuid)) @@ -290,8 +291,8 @@ class TestClientApi(TestCase): # Cannot delete task without token online_node.token = "invalid" - self.assertRaises(ProcessingError, online_node.remove_task, "invalid token") + self.assertRaises(NodeResponseError, online_node.remove_task, "invalid token") online_node.token = "test_token" self.assertTrue(online_node.remove_task(uuid)) - node_odm.terminate(); + node_odm.terminate() diff --git a/requirements.txt b/requirements.txt index 051fa566..95d544d7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -37,6 +37,7 @@ pip-autoremove==0.9.0 psycopg2==2.7.4 psycopg2-binary==2.7.4 PyJWT==1.5.3 +pyodm==1.4.0 pyparsing==2.1.10 pytz==2018.3 rcssmin==1.0.6