Async task worker refactoring

pull/782/head
Piero Toffanin 2020-01-17 13:19:03 -05:00
rodzic 7874d21a8e
commit 692c9264b3
10 zmienionych plików z 222 dodań i 60 usunięć

Wyświetl plik

@ -11,6 +11,7 @@ from rio_tiler.profiles import img_profiles
import numpy as np
from app.raster_utils import export_raster_index
from .hsvblend import hsv_blend
from .hillshade import LightSource
from .formulas import lookup_formula, get_algorithm_list
@ -406,4 +407,47 @@ class Tiles(TaskNestedView):
return HttpResponse(
array_to_image(rgb, rmask, img_format=driver, **options),
content_type="image/{}".format(ext)
)
)
class Export(TaskNestedView):
def get(self, request, pk=None, project_pk=None):
"""
Export an orthophoto after applying a formula
"""
task = self.get_and_check_task(request, pk)
nodata = None
formula = self.request.query_params.get('formula')
bands = self.request.query_params.get('bands')
rescale = self.request.query_params.get('rescale')
if formula == '': formula = None
if bands == '': bands = None
if rescale == '': rescale = None
if not formula:
raise exceptions.ValidationError("You need to specify a formula parameter")
if not bands:
raise exceptions.ValidationError("You need to specify a bands parameter")
try:
expr, _ = lookup_formula(formula, bands)
except ValueError as e:
raise exceptions.ValidationError(str(e))
if formula is not None and rescale is None:
rescale = "-1,1"
if nodata is not None:
nodata = np.nan if nodata == "nan" else float(nodata)
url = get_raster_path(task, "orthophoto")
if not os.path.isfile(url):
raise exceptions.NotFound()
export_raster_index(url, expr, "/webodm/app/media/project/2/task/5392337b-cd3f-42ef-879d-b36149ef442f/assets/odm_orthophoto/export.tif")
return HttpResponse("OK")

Wyświetl plik

@ -8,7 +8,8 @@ from .processingnodes import ProcessingNodeViewSet, ProcessingNodeOptionsView
from .admin import UserViewSet, GroupViewSet
from rest_framework_nested import routers
from rest_framework_jwt.views import obtain_jwt_token
from .tiler import TileJson, Bounds, Metadata, Tiles
from .tiler import TileJson, Bounds, Metadata, Tiles, Export
from .workers import CheckTask, GetTaskResult
router = routers.DefaultRouter()
router.register(r'projects', ProjectViewSet)
@ -37,11 +38,15 @@ urlpatterns = [
url(r'projects/(?P<project_pk>[^/.]+)/tasks/(?P<pk>[^/.]+)/(?P<tile_type>orthophoto|dsm|dtm)/metadata$', Metadata.as_view()),
url(r'projects/(?P<project_pk>[^/.]+)/tasks/(?P<pk>[^/.]+)/(?P<tile_type>orthophoto|dsm|dtm)/tiles/(?P<z>[\d]+)/(?P<x>[\d]+)/(?P<y>[\d]+)\.png$', Tiles.as_view()),
url(r'projects/(?P<project_pk>[^/.]+)/tasks/(?P<pk>[^/.]+)/(?P<tile_type>orthophoto|dsm|dtm)/tiles/(?P<z>[\d]+)/(?P<x>[\d]+)/(?P<y>[\d]+)@(?P<scale>[\d]+)x\.png$', Tiles.as_view()),
url(r'projects/(?P<project_pk>[^/.]+)/tasks/(?P<pk>[^/.]+)/orthophoto/export$', Export.as_view()),
url(r'projects/(?P<project_pk>[^/.]+)/tasks/(?P<pk>[^/.]+)/download/(?P<asset>.+)$', TaskDownloads.as_view()),
url(r'projects/(?P<project_pk>[^/.]+)/tasks/(?P<pk>[^/.]+)/assets/(?P<unsafe_asset_path>.+)$', TaskAssets.as_view()),
url(r'projects/(?P<project_pk>[^/.]+)/tasks/import$', TaskAssetsImport.as_view()),
url(r'workers/check/(?P<celery_task_id>.+)', CheckTask.as_view()),
url(r'workers/get/(?P<celery_task_id>.+)', GetTaskResult.as_view()),
url(r'^auth/', include('rest_framework.urls')),
url(r'^token-auth/', obtain_jwt_token),
]

74
app/api/workers.py 100644
Wyświetl plik

@ -0,0 +1,74 @@
import os
import mimetypes
from worker.celery import app as celery
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status, permissions
from django.http import FileResponse
from django.http import HttpResponse
from wsgiref.util import FileWrapper
class CheckTask(APIView):
permission_classes = (permissions.AllowAny,)
def get(self, request, celery_task_id=None):
res = celery.AsyncResult(celery_task_id)
if not res.ready():
return Response({'ready': False}, status=status.HTTP_200_OK)
else:
result = res.get()
if result.get('error', None) is not None:
msg = self.on_error(result)
return Response({'ready': True, 'error': msg})
if self.error_check(result) is not None:
msg = self.on_error(result)
return Response({'ready': True, 'error': msg})
return Response({'ready': True})
def on_error(self, result):
return result['error']
def error_check(self, result):
pass
class GetTaskResult(APIView):
permission_classes = (permissions.AllowAny,)
def get(self, request, celery_task_id=None):
res = celery.AsyncResult(celery_task_id)
if res.ready():
result = res.get()
file = result.get('file', None) # File path
output = result.get('output', None) # String/object
else:
return Response({'error': 'Task not ready'})
if file is not None:
filename = os.path.basename(file)
filesize = os.stat(file).st_size
f = open(file, "rb")
# More than 100mb, normal http response, otherwise stream
# Django docs say to avoid streaming when possible
stream = filesize > 1e8
if stream:
response = FileResponse(f)
else:
response = HttpResponse(FileWrapper(f),
content_type=(mimetypes.guess_type(filename)[0] or "application/zip"))
response['Content-Type'] = mimetypes.guess_type(filename)[0] or "application/zip"
response['Content-Disposition'] = "attachment; filename={}".format(filename)
response['Content-Length'] = filesize
return response
elif output is not None:
return Response({'output': output})
else:
return Response({'error': 'Invalid task output (cannot find valid key)'})

Wyświetl plik

@ -1 +1,3 @@
from app.api.tasks import TaskNestedView as TaskView
from app.api.workers import CheckTask as CheckTask
from app.api.workers import GetTaskResult as GetTaskResult

Wyświetl plik

@ -0,0 +1,43 @@
# Export a raster index after applying a band expression
import rasterio
import re
import numpy as np
import numexpr as ne
from rasterio.enums import ColorInterp
from rio_tiler.utils import has_alpha_band
def export_raster_index(input, expression, output):
with rasterio.open(input) as src:
profile = src.profile
profile.update(
dtype=rasterio.float32,
count=1,
nodata=-9999
)
data = src.read().astype(np.float32)
alpha_index = None
if has_alpha_band(src):
try:
alpha_index = src.colorinterp.index(ColorInterp.alpha)
except ValueError:
pass
bands_names = ["b{}".format(b) for b in tuple(set(re.findall(r"b(?P<bands>[0-9]{1,2})", expression)))]
rgb = expression.split(",")
arr = dict(zip(bands_names, data))
arr = np.array([np.nan_to_num(ne.evaluate(bloc.strip(), local_dict=arr)) for bloc in rgb])
# Set nodata values
index_band = arr[0]
if alpha_index is not None:
index_band[data[alpha_index] == 0] = -9999
# Remove infinity values
index_band[index_band>1e+30] = -9999
index_band[index_band<-1e+30] = -9999
with rasterio.open(output, 'w', **profile) as dst:
dst.write(arr)

Wyświetl plik

@ -38,7 +38,8 @@ export default class LayersControlLayer extends React.Component {
formula: params.formula || "",
bands: params.bands || "",
hillshade: params.hillshade || "",
histogramLoading: false
histogramLoading: false,
exportLoading: false
};
this.rescale = params.rescale || "";
@ -186,8 +187,14 @@ export default class LayersControlLayer extends React.Component {
this.updateLayer();
}
handleExport = e => {
this.setState({exportLoading: true});
}
render(){
const { colorMap, bands, hillshade, formula, histogramLoading } = this.state;
const { colorMap, bands, hillshade, formula, histogramLoading, exportLoading } = this.state;
const { meta, tmeta } = this;
const { color_maps, algorithms } = tmeta;
const algo = this.getAlgorithm(formula);
@ -256,6 +263,17 @@ export default class LayersControlLayer extends React.Component {
</select>
</div>
</div> : ""}
{formula !== "" && algorithms ?
<div className="row form-group form-inline">
<label className="col-sm-3 control-label">Export: </label>
<div className="col-sm-9">
<button onClick={this.handleExport} disabled={exportLoading} type="button" className="btn btn-sm btn-default">
{exportLoading ? <i className="fa fa-spin fa-circle-notch"/> : <i className="far fa-image fa-fw" />} GeoTIFF
</button>
</div>
</div>
: ""}
</div> : ""}
</div>);

Wyświetl plik

@ -38,6 +38,10 @@
margin-bottom: 8px;
}
}
.btn{
padding: 0px 9px;
}
.toggle{
float: left;

Wyświetl plik

@ -1,15 +1,10 @@
import mimetypes
import os
from django.http import FileResponse
from django.http import HttpResponse
from wsgiref.util import FileWrapper
from rest_framework import status
from rest_framework.response import Response
from app.plugins.views import TaskView
from app.plugins.views import TaskView, CheckTask, GetTaskResult
from worker.tasks import execute_grass_script
from app.plugins.grass_engine import grass, GrassEngineException, cleanup_grass_context
from worker.celery import app as celery
class TaskContoursGenerate(TaskView):
def post(self, request, pk=None):
@ -47,55 +42,20 @@ class TaskContoursGenerate(TaskView):
celery_task_id = execute_grass_script.delay(os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"calc_contours.grass"
), context.serialize()).task_id
), context.serialize(), 'file').task_id
return Response({'celery_task_id': celery_task_id}, status=status.HTTP_200_OK)
except GrassEngineException as e:
return Response({'error': str(e)}, status=status.HTTP_200_OK)
class TaskContoursCheck(TaskView):
def get(self, request, pk=None, celery_task_id=None):
res = celery.AsyncResult(celery_task_id)
if not res.ready():
return Response({'ready': False}, status=status.HTTP_200_OK)
else:
result = res.get()
if result.get('error', None) is not None:
cleanup_grass_context(result['context'])
return Response({'ready': True, 'error': result['error']})
class TaskContoursCheck(CheckTask):
def on_error(self, result):
cleanup_grass_context(result['context'])
contours_file = result.get('output')
if not contours_file or not os.path.exists(contours_file):
cleanup_grass_context(result['context'])
return Response({'ready': True, 'error': 'Contours file could not be generated. This might be a bug.'})
def error_check(self, result):
contours_file = result.get('file')
if not contours_file or not os.path.exists(contours_file):
return 'Contours file could not be generated. This might be a bug.'
request.session['contours_' + celery_task_id] = contours_file
return Response({'ready': True})
class TaskContoursDownload(TaskView):
def get(self, request, pk=None, celery_task_id=None):
contours_file = request.session.get('contours_' + celery_task_id, None)
if contours_file is not None:
filename = os.path.basename(contours_file)
filesize = os.stat(contours_file).st_size
f = open(contours_file, "rb")
# More than 100mb, normal http response, otherwise stream
# Django docs say to avoid streaming when possible
stream = filesize > 1e8
if stream:
response = FileResponse(f)
else:
response = HttpResponse(FileWrapper(f),
content_type=(mimetypes.guess_type(filename)[0] or "application/zip"))
response['Content-Type'] = mimetypes.guess_type(filename)[0] or "application/zip"
response['Content-Disposition'] = "attachment; filename={}".format(filename)
response['Content-Length'] = filesize
return response
else:
return Response({'error': 'Invalid contours download id'})
class TaskContoursDownload(GetTaskResult):
pass

Wyświetl plik

@ -15,6 +15,6 @@ class Plugin(PluginBase):
def api_mount_points(self):
return [
MountPoint('task/(?P<pk>[^/.]+)/contours/generate', TaskContoursGenerate.as_view()),
MountPoint('task/(?P<pk>[^/.]+)/contours/check/(?P<celery_task_id>.+)', TaskContoursCheck.as_view()),
MountPoint('task/(?P<pk>[^/.]+)/contours/download/(?P<celery_task_id>.+)', TaskContoursDownload.as_view()),
MountPoint('task/[^/.]+/contours/check/(?P<celery_task_id>.+)', TaskContoursCheck.as_view()),
MountPoint('task/[^/.]+/contours/download/(?P<celery_task_id>.+)', TaskContoursDownload.as_view()),
]

Wyświetl plik

@ -1,5 +1,6 @@
import os
import shutil
import tempfile
import traceback
import time
@ -16,6 +17,7 @@ from nodeodm import status_codes
from nodeodm.models import ProcessingNode
from webodm import settings
from .celery import app
from app.raster_utils import export_raster_index as export_raster_index_sync
import redis
logger = get_task_logger("app.logger")
@ -138,9 +140,19 @@ def process_pending_tasks():
@app.task
def execute_grass_script(script, serialized_context = {}):
def execute_grass_script(script, serialized_context = {}, out_key='output'):
try:
ctx = grass.create_context(serialized_context)
return {'output': ctx.execute(script), 'context': ctx.serialize()}
return {out_key: ctx.execute(script), 'context': ctx.serialize()}
except GrassEngineException as e:
return {'error': str(e), 'context': ctx.serialize()}
return {'error': str(e), 'context': ctx.serialize()}
@app.task
def export_raster_index(input, expression):
try:
tmpfile = tempfile.mktemp('_raster_index.tif', dir=settings.MEDIA_TMP)
export_raster_index_sync(input, expression, tmpfile)
return {'file': tmpfile}
except Exception as e:
return {'error': str(e)}