diff --git a/app/plugins/worker.py b/app/plugins/worker.py index 56620d77..3c43b735 100644 --- a/app/plugins/worker.py +++ b/app/plugins/worker.py @@ -1,5 +1,29 @@ +import inspect from worker.celery import app # noinspection PyUnresolvedReferences from worker.tasks import execute_grass_script task = app.task + +def run_function_async(func, *args, **kwargs): + """ + Run a function asynchronously using Celery. + Plugins should use this function so that they don't + have to register new Celery tasks at startup. Functions + should import any required library at the top of the function body. + :param {Function} a function to execute + """ + source = inspect.getsource(func) + return eval_async.delay(source, func.__name__, *args, **kwargs) + + +@app.task +def eval_async(source, funcname, *args, **kwargs): + """ + Run Python code asynchronously using Celery. + It's recommended to use run_function_async instead. + """ + ns = {} + code = compile(source, 'file', 'exec') + eval(code, ns, ns) + return ns[funcname](*args, **kwargs) \ No newline at end of file diff --git a/package.json b/package.json index 3a30c4e8..81c45137 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "WebODM", - "version": "1.4.1", + "version": "1.4.2", "description": "User-friendly, extendable application and API for processing aerial imagery.", "main": "index.js", "scripts": { diff --git a/plugins/cloudimport/api_views.py b/plugins/cloudimport/api_views.py index a911db26..24100ab0 100644 --- a/plugins/cloudimport/api_views.py +++ b/plugins/cloudimport/api_views.py @@ -5,8 +5,8 @@ from os import path from app import models, pending_actions from app.plugins.views import TaskView -from app.plugins.worker import task -from app.plugins import logger, get_current_plugin +from app.plugins.worker import run_function_async +from app.plugins import get_current_plugin from worker.celery import app from rest_framework.response import Response @@ -52,7 +52,9 @@ class ImportFolderTaskView(TaskView): # Start importing the files in the background serialized = [file.serialize() for file in files] - import_files.delay(task.id, serialized) + run_function_async(import_files, task.id, serialized) + + #import_files.delay(task.id, serialized) return Response({}, status=status.HTTP_200_OK) @@ -99,11 +101,21 @@ class PlatformsTaskView(TaskView): return Response({'platforms': [platform.serialize(user = request.user) for platform in platforms]}, status=status.HTTP_200_OK) -### ### -# CELERY TASK(S) # -### ### -@task def import_files(task_id, files): + import requests + from app import models + from app.plugins import logger + + def download_file(task, file): + path = task.task_path(file['name']) + download_stream = requests.get(file['url'], stream=True, timeout=60) + + with open(path, 'wb') as fd: + for chunk in download_stream.iter_content(4096): + fd.write(chunk) + + models.ImageUpload.objects.create(task=task, image=path) + logger.info("Will import {} files".format(len(files))) task = models.Task.objects.get(pk=task_id) task.create_task_directories() @@ -125,13 +137,3 @@ def import_files(task_id, files): task.processing_time = 0 task.partial = False task.save() - -def download_file(task, file): - path = task.task_path(file['name']) - download_stream = requests.get(file['url'], stream=True, timeout=60) - - with open(path, 'wb') as fd: - for chunk in download_stream.iter_content(4096): - fd.write(chunk) - - models.ImageUpload.objects.create(task=task, image=path) diff --git a/webodm/settings.py b/webodm/settings.py index ea7d6a4e..4beff37f 100644 --- a/webodm/settings.py +++ b/webodm/settings.py @@ -353,7 +353,7 @@ CELERY_RESULT_BACKEND = os.environ.get('WO_BROKER', 'redis://localhost') CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] -CELERY_INCLUDE=['worker.tasks', 'plugins.cloudimport.api_views'] +CELERY_INCLUDE=['worker.tasks', 'app.plugins.worker'] CELERY_WORKER_REDIRECT_STDOUTS = False CELERY_WORKER_HIJACK_ROOT_LOGGER = False