diff --git a/app/models/task.py b/app/models/task.py index 957291d2..1b628703 100644 --- a/app/models/task.py +++ b/app/models/task.py @@ -333,8 +333,11 @@ class Task(models.Model): images = [image.path() for image in self.imageupload_set.all()] + def callback(progress): + logger.info(progress) + # This takes a while - uuid = self.processing_node.process_new_task(images, self.name, self.options) + uuid = self.processing_node.process_new_task(images, self.name, self.options, callback) # Refresh task object before committing change self.refresh_from_db() diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 225f0cfe..41d8bf3a 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -4,3 +4,6 @@ services: entrypoint: /bin/bash -c "chmod +x /webodm/*.sh && /bin/bash -c \"/webodm/wait-for-postgres.sh db /webodm/wait-for-it.sh -t 0 broker:6379 -- /webodm/start.sh --create-default-pnode --setup-devenv\"" volumes: - .:/webodm + worker: + volumes: + - .:/webodm \ No newline at end of file diff --git a/nodeodm/api_client.py b/nodeodm/api_client.py index 03773b01..49cdb184 100644 --- a/nodeodm/api_client.py +++ b/nodeodm/api_client.py @@ -2,6 +2,8 @@ An interface to NodeODM's API https://github.com/pierotofy/NodeODM/blob/master/docs/index.adoc """ +from requests.packages.urllib3.fields import RequestField +from requests_toolbelt.multipart import encoder import requests import mimetypes import json @@ -9,6 +11,47 @@ import os from urllib.parse import urlunparse, urlencode from app.testwatch import TestWatch +# Extends class to support multipart form data +# fields with the same name +# https://github.com/requests/toolbelt/issues/225 +class MultipartEncoder(encoder.MultipartEncoder): + """Multiple files with the same name support, i.e. files[]""" + + def _iter_fields(self): + _fields = self.fields + if hasattr(self.fields, 'items'): + _fields = list(self.fields.items()) + for k, v in _fields: + for field in self._iter_field(k, v): + yield field + + @classmethod + def _iter_field(cls, field_name, field): + file_name = None + file_type = None + file_headers = None + if field and isinstance(field, (list, tuple)): + if all([isinstance(f, (list, tuple)) for f in field]): + for f in field: + yield next(cls._iter_field(field_name, f)) + else: + raise StopIteration() + if len(field) == 2: + file_name, file_pointer = field + elif len(field) == 3: + file_name, file_pointer, file_type = field + else: + file_name, file_pointer, file_type, file_headers = field + else: + file_pointer = field + + field = RequestField(name=field_name, + data=file_pointer, + filename=file_name, + headers=file_headers) + field.make_multipart(content_type=file_type) + yield field + class ApiClient: def __init__(self, host, port, token = "", timeout=30): self.host = host @@ -56,12 +99,13 @@ class ApiClient: else: return res - def new_task(self, images, name=None, options=[]): + def new_task(self, images, name=None, options=[], progress_callback=None): """ Starts processing of a new task :param images: list of path images :param name: name of the task :param options: options to be used for processing ([{'name': optionName, 'value': optionValue}, ...]) + :param progress_callback: optional callback invoked during the upload images process to be used to report status. :return: UUID or error """ @@ -72,9 +116,24 @@ class ApiClient: with open(path, 'rb') as f: return f.read() - files = [('images', - (os.path.basename(image), read_file(image), (mimetypes.guess_type(image)[0] or "image/jpg")) - ) for image in images] + fields = { + 'name': name, + 'options': json.dumps(options), + 'images': [(os.path.basename(image), read_file(image), (mimetypes.guess_type(image)[0] or "image/jpg")) for image in images] + } + + def create_callback(mpe): + total_bytes = mpe.len + + def callback(monitor): + if progress_callback is not None and total_bytes > 0: + progress_callback(monitor.bytes_read / total_bytes) + + return callback + + e = MultipartEncoder(fields=fields) + m = encoder.MultipartEncoderMonitor(e, create_callback(e)) + return requests.post(self.url("/task/new"), - files=files, - data={'name': name, 'options': json.dumps(options)}).json() + data=m, + headers={'Content-Type': m.content_type}).json() \ No newline at end of file diff --git a/nodeodm/models.py b/nodeodm/models.py index d4618dd3..5f3a34d6 100644 --- a/nodeodm/models.py +++ b/nodeodm/models.py @@ -99,7 +99,7 @@ class ProcessingNode(models.Model): return json.dumps(self.available_options, **kwargs) @api - def process_new_task(self, images, name=None, options=[]): + def process_new_task(self, images, name=None, options=[], progress_callback=None): """ Sends a set of images (and optional GCP file) via the API to start processing. @@ -107,6 +107,7 @@ class ProcessingNode(models.Model): :param images: list of path images :param name: name of the task :param options: options to be used for processing ([{'name': optionName, 'value': optionValue}, ...]) + :param progress_callback: optional callback invoked during the upload images process to be used to report status. :returns UUID of the newly created task """ @@ -114,7 +115,7 @@ class ProcessingNode(models.Model): api_client = self.api_client() try: - result = api_client.new_task(images, name, options) + result = api_client.new_task(images, name, options, progress_callback) except requests.exceptions.ConnectionError as e: raise ProcessingError(e) diff --git a/requirements.txt b/requirements.txt index 325f32a9..979f996f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -41,6 +41,7 @@ pyparsing==2.1.10 pytz==2018.3 rcssmin==1.0.6 redis==2.10.6 +requests-toolbelt==0.8.0 requests==2.20.0 rfc3987==1.3.7 rjsmin==1.0.12