Add generic run_function_async method, settings changes, bump version

pull/892/head
Piero Toffanin 2020-07-26 16:08:04 -04:00
rodzic f28c0f3a0d
commit e278f728eb
4 zmienionych plików z 45 dodań i 19 usunięć

Wyświetl plik

@ -1,5 +1,29 @@
import inspect
from worker.celery import app from worker.celery import app
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
from worker.tasks import execute_grass_script from worker.tasks import execute_grass_script
task = app.task task = app.task
def run_function_async(func, *args, **kwargs):
"""
Run a function asynchronously using Celery.
Plugins should use this function so that they don't
have to register new Celery tasks at startup. Functions
should import any required library at the top of the function body.
:param {Function} a function to execute
"""
source = inspect.getsource(func)
return eval_async.delay(source, func.__name__, *args, **kwargs)
@app.task
def eval_async(source, funcname, *args, **kwargs):
"""
Run Python code asynchronously using Celery.
It's recommended to use run_function_async instead.
"""
ns = {}
code = compile(source, 'file', 'exec')
eval(code, ns, ns)
return ns[funcname](*args, **kwargs)

Wyświetl plik

@ -1,6 +1,6 @@
{ {
"name": "WebODM", "name": "WebODM",
"version": "1.4.1", "version": "1.4.2",
"description": "User-friendly, extendable application and API for processing aerial imagery.", "description": "User-friendly, extendable application and API for processing aerial imagery.",
"main": "index.js", "main": "index.js",
"scripts": { "scripts": {

Wyświetl plik

@ -5,8 +5,8 @@ from os import path
from app import models, pending_actions from app import models, pending_actions
from app.plugins.views import TaskView from app.plugins.views import TaskView
from app.plugins.worker import task from app.plugins.worker import run_function_async
from app.plugins import logger, get_current_plugin from app.plugins import get_current_plugin
from worker.celery import app from worker.celery import app
from rest_framework.response import Response from rest_framework.response import Response
@ -52,7 +52,9 @@ class ImportFolderTaskView(TaskView):
# Start importing the files in the background # Start importing the files in the background
serialized = [file.serialize() for file in files] serialized = [file.serialize() for file in files]
import_files.delay(task.id, serialized) run_function_async(import_files, task.id, serialized)
#import_files.delay(task.id, serialized)
return Response({}, status=status.HTTP_200_OK) return Response({}, status=status.HTTP_200_OK)
@ -99,11 +101,21 @@ class PlatformsTaskView(TaskView):
return Response({'platforms': [platform.serialize(user = request.user) for platform in platforms]}, status=status.HTTP_200_OK) return Response({'platforms': [platform.serialize(user = request.user) for platform in platforms]}, status=status.HTTP_200_OK)
### ###
# CELERY TASK(S) #
### ###
@task
def import_files(task_id, files): def import_files(task_id, files):
import requests
from app import models
from app.plugins import logger
def download_file(task, file):
path = task.task_path(file['name'])
download_stream = requests.get(file['url'], stream=True, timeout=60)
with open(path, 'wb') as fd:
for chunk in download_stream.iter_content(4096):
fd.write(chunk)
models.ImageUpload.objects.create(task=task, image=path)
logger.info("Will import {} files".format(len(files))) logger.info("Will import {} files".format(len(files)))
task = models.Task.objects.get(pk=task_id) task = models.Task.objects.get(pk=task_id)
task.create_task_directories() task.create_task_directories()
@ -125,13 +137,3 @@ def import_files(task_id, files):
task.processing_time = 0 task.processing_time = 0
task.partial = False task.partial = False
task.save() task.save()
def download_file(task, file):
path = task.task_path(file['name'])
download_stream = requests.get(file['url'], stream=True, timeout=60)
with open(path, 'wb') as fd:
for chunk in download_stream.iter_content(4096):
fd.write(chunk)
models.ImageUpload.objects.create(task=task, image=path)

Wyświetl plik

@ -353,7 +353,7 @@ CELERY_RESULT_BACKEND = os.environ.get('WO_BROKER', 'redis://localhost')
CELERY_TASK_SERIALIZER = 'json' CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json'] CELERY_ACCEPT_CONTENT = ['json']
CELERY_INCLUDE=['worker.tasks', 'plugins.cloudimport.api_views'] CELERY_INCLUDE=['worker.tasks', 'app.plugins.worker']
CELERY_WORKER_REDIRECT_STDOUTS = False CELERY_WORKER_REDIRECT_STDOUTS = False
CELERY_WORKER_HIJACK_ROOT_LOGGER = False CELERY_WORKER_HIJACK_ROOT_LOGGER = False