Adds streaming multipart form support, callback for monitoring upload progress

pull/571/head
Piero Toffanin 2018-12-04 16:29:54 -05:00
rodzic b310453f32
commit 37ae888e76
5 zmienionych plików z 76 dodań i 9 usunięć

Wyświetl plik

@ -333,8 +333,11 @@ class Task(models.Model):
images = [image.path() for image in self.imageupload_set.all()]
def callback(progress):
logger.info(progress)
# This takes a while
uuid = self.processing_node.process_new_task(images, self.name, self.options)
uuid = self.processing_node.process_new_task(images, self.name, self.options, callback)
# Refresh task object before committing change
self.refresh_from_db()

Wyświetl plik

@ -4,3 +4,6 @@ services:
entrypoint: /bin/bash -c "chmod +x /webodm/*.sh && /bin/bash -c \"/webodm/wait-for-postgres.sh db /webodm/wait-for-it.sh -t 0 broker:6379 -- /webodm/start.sh --create-default-pnode --setup-devenv\""
volumes:
- .:/webodm
worker:
volumes:
- .:/webodm

Wyświetl plik

@ -2,6 +2,8 @@
An interface to NodeODM's API
https://github.com/pierotofy/NodeODM/blob/master/docs/index.adoc
"""
from requests.packages.urllib3.fields import RequestField
from requests_toolbelt.multipart import encoder
import requests
import mimetypes
import json
@ -9,6 +11,47 @@ import os
from urllib.parse import urlunparse, urlencode
from app.testwatch import TestWatch
# Extends class to support multipart form data
# fields with the same name
# https://github.com/requests/toolbelt/issues/225
class MultipartEncoder(encoder.MultipartEncoder):
"""Multiple files with the same name support, i.e. files[]"""
def _iter_fields(self):
_fields = self.fields
if hasattr(self.fields, 'items'):
_fields = list(self.fields.items())
for k, v in _fields:
for field in self._iter_field(k, v):
yield field
@classmethod
def _iter_field(cls, field_name, field):
file_name = None
file_type = None
file_headers = None
if field and isinstance(field, (list, tuple)):
if all([isinstance(f, (list, tuple)) for f in field]):
for f in field:
yield next(cls._iter_field(field_name, f))
else:
raise StopIteration()
if len(field) == 2:
file_name, file_pointer = field
elif len(field) == 3:
file_name, file_pointer, file_type = field
else:
file_name, file_pointer, file_type, file_headers = field
else:
file_pointer = field
field = RequestField(name=field_name,
data=file_pointer,
filename=file_name,
headers=file_headers)
field.make_multipart(content_type=file_type)
yield field
class ApiClient:
def __init__(self, host, port, token = "", timeout=30):
self.host = host
@ -56,12 +99,13 @@ class ApiClient:
else:
return res
def new_task(self, images, name=None, options=[]):
def new_task(self, images, name=None, options=[], progress_callback=None):
"""
Starts processing of a new task
:param images: list of path images
:param name: name of the task
:param options: options to be used for processing ([{'name': optionName, 'value': optionValue}, ...])
:param progress_callback: optional callback invoked during the upload images process to be used to report status.
:return: UUID or error
"""
@ -72,9 +116,24 @@ class ApiClient:
with open(path, 'rb') as f:
return f.read()
files = [('images',
(os.path.basename(image), read_file(image), (mimetypes.guess_type(image)[0] or "image/jpg"))
) for image in images]
fields = {
'name': name,
'options': json.dumps(options),
'images': [(os.path.basename(image), read_file(image), (mimetypes.guess_type(image)[0] or "image/jpg")) for image in images]
}
def create_callback(mpe):
total_bytes = mpe.len
def callback(monitor):
if progress_callback is not None and total_bytes > 0:
progress_callback(monitor.bytes_read / total_bytes)
return callback
e = MultipartEncoder(fields=fields)
m = encoder.MultipartEncoderMonitor(e, create_callback(e))
return requests.post(self.url("/task/new"),
files=files,
data={'name': name, 'options': json.dumps(options)}).json()
data=m,
headers={'Content-Type': m.content_type}).json()

Wyświetl plik

@ -99,7 +99,7 @@ class ProcessingNode(models.Model):
return json.dumps(self.available_options, **kwargs)
@api
def process_new_task(self, images, name=None, options=[]):
def process_new_task(self, images, name=None, options=[], progress_callback=None):
"""
Sends a set of images (and optional GCP file) via the API
to start processing.
@ -107,6 +107,7 @@ class ProcessingNode(models.Model):
:param images: list of path images
:param name: name of the task
:param options: options to be used for processing ([{'name': optionName, 'value': optionValue}, ...])
:param progress_callback: optional callback invoked during the upload images process to be used to report status.
:returns UUID of the newly created task
"""
@ -114,7 +115,7 @@ class ProcessingNode(models.Model):
api_client = self.api_client()
try:
result = api_client.new_task(images, name, options)
result = api_client.new_task(images, name, options, progress_callback)
except requests.exceptions.ConnectionError as e:
raise ProcessingError(e)

Wyświetl plik

@ -41,6 +41,7 @@ pyparsing==2.1.10
pytz==2018.3
rcssmin==1.0.6
redis==2.10.6
requests-toolbelt==0.8.0
requests==2.20.0
rfc3987==1.3.7
rjsmin==1.0.12