Multithreaded processing engine working, replaced swagger with requests lib

pull/39/head
Piero Toffanin 2016-10-25 16:04:24 -04:00
rodzic d12513647c
commit 86855df1bb
17 zmienionych plików z 121 dodań i 41 usunięć

Wyświetl plik

@ -18,7 +18,6 @@ class TaskSerializer(serializers.ModelSerializer):
model = models.Task model = models.Task
fields = '__all__' fields = '__all__'
class TaskViewSet(viewsets.ViewSet): class TaskViewSet(viewsets.ViewSet):
""" """
A task represents a set of images and other input to be sent to a processing node. A task represents a set of images and other input to be sent to a processing node.

Wyświetl plik

@ -1,14 +1,7 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from django.apps import AppConfig from django.apps import AppConfig
from .boot import boot
from webodm import settings
class MainConfig(AppConfig): class MainConfig(AppConfig):
name = 'app' name = 'app'
verbose_name = 'Application' verbose_name = 'Application'
def ready(self):
# Test cases call boot() independently
if not settings.TESTING:
boot()

Wyświetl plik

@ -1,11 +1,13 @@
def boot():
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.models import Permission from django.contrib.auth.models import Permission
from django.contrib.auth.models import User, Group from django.contrib.auth.models import User, Group
from django.db.utils import ProgrammingError from django.db.utils import ProgrammingError
from . import signals, scheduler from . import signals, scheduler
import logging, os import logging, os
from .models import Task
from webodm import settings
def boot():
logger = logging.getLogger('app.logger') logger = logging.getLogger('app.logger')
# Check default group # Check default group
@ -27,8 +29,11 @@ def boot():
except ProgrammingError: except ProgrammingError:
logger.warn("Could not create default group/user. If running a migration, this is expected.") logger.warn("Could not create default group/user. If running a migration, this is expected.")
# Run only on the main runserver process # Unlock any Task that might have been locked
# (do not start again on the auto-reloader process) Task.objects.filter(processing_lock=True).update(processing_lock=False)
if os.environ.get('RUN_MAIN') != 'true':
if not settings.TESTING:
# Setup and start scheduler # Setup and start scheduler
scheduler.setup() scheduler.setup()
scheduler.update_nodes_info(background=True)

Wyświetl plik

@ -67,12 +67,14 @@ class Task(models.Model):
uuid = models.CharField(max_length=255, null=True, blank=True, help_text="Identifier of the task (as returned by OpenDroneMap's REST API)") uuid = models.CharField(max_length=255, null=True, blank=True, help_text="Identifier of the task (as returned by OpenDroneMap's REST API)")
project = models.ForeignKey(Project, on_delete=models.CASCADE, help_text="Project that this task belongs to") project = models.ForeignKey(Project, on_delete=models.CASCADE, help_text="Project that this task belongs to")
name = models.CharField(max_length=255, null=True, blank=True, help_text="A label for the task") name = models.CharField(max_length=255, null=True, blank=True, help_text="A label for the task")
processing_lock = models.BooleanField(default=False, help_text="A flag indicating whether this task is currently locked for processing. When this flag is turned on, the task is in the middle of a processing step.")
processing_time = models.IntegerField(default=-1, help_text="Number of milliseconds that elapsed since the beginning of this task (-1 indicates that no information is available)") processing_time = models.IntegerField(default=-1, help_text="Number of milliseconds that elapsed since the beginning of this task (-1 indicates that no information is available)")
processing_node = models.ForeignKey(ProcessingNode, null=True, blank=True, help_text="Processing node assigned to this task (or null if this task has not been associated yet)") processing_node = models.ForeignKey(ProcessingNode, null=True, blank=True, help_text="Processing node assigned to this task (or null if this task has not been associated yet)")
status = models.IntegerField(choices=STATUS_CODES, null=True, blank=True, help_text="Current status of the task") status = models.IntegerField(choices=STATUS_CODES, null=True, blank=True, help_text="Current status of the task")
options = fields.JSONField(default=dict(), blank=True, help_text="Options that are being used to process this task") options = fields.JSONField(default=dict(), blank=True, help_text="Options that are being used to process this task")
console_output = models.TextField(null=True, blank=True, help_text="Console output of the OpenDroneMap's process") console_output = models.TextField(null=True, blank=True, help_text="Console output of the OpenDroneMap's process")
ground_control_points = models.FileField(null=True, blank=True, upload_to=gcp_directory_path, help_text="Optional Ground Control Points file to use for processing") ground_control_points = models.FileField(null=True, blank=True, upload_to=gcp_directory_path, help_text="Optional Ground Control Points file to use for processing")
# georeferenced_model # georeferenced_model
# orthophoto # orthophoto
# textured_model # textured_model
@ -100,6 +102,13 @@ class Task(models.Model):
# In case of error # In case of error
return None return None
def process(self):
if not self.uuid and self.processing_node:
print("Processing... {}".format(self))
import time
time.sleep(30)
print("Done! {}".format(self))
class Meta: class Meta:
permissions = ( permissions = (
('view_task', 'Can view task'), ('view_task', 'Can view task'),

Wyświetl plik

@ -1,37 +1,67 @@
import logging import logging
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
import threading from threading import Thread, Lock
from multiprocessing.dummy import Pool as ThreadPool
from nodeodm.models import ProcessingNode from nodeodm.models import ProcessingNode
from app.models import Task from app.models import Task
from django.db.models import Q
import random import random
logger = logging.getLogger('app.logger') logger = logging.getLogger('app.logger')
scheduler = None scheduler = None
# Adds background={True|False} param to any function
# So that we can call update_nodes_info(background=True) from the outside
def job(func): def job(func):
"""
Adds background={True|False} param to any function
so that we can call update_nodes_info(background=True) from the outside
"""
def wrapper(*args,**kwargs): def wrapper(*args,**kwargs):
if (kwargs.get('background', False)): if (kwargs.get('background', False)):
t = (threading.Thread(target=func)) t = Thread(target=func)
t.start() t.start()
return t return t
else: else:
return func(*args, **kwargs) return func(*args, **kwargs)
return wrapper return wrapper
@job @job
def update_nodes_info(): def update_nodes_info():
processing_nodes = ProcessingNode.objects.all() processing_nodes = ProcessingNode.objects.all()
for processing_node in processing_nodes: for processing_node in processing_nodes:
processing_node.update_node_info() processing_node.update_node_info()
tasks_mutex = Lock()
@job @job
def process_pending_tasks(): def process_pending_tasks():
tasks = Task.objects.filter(uuid=None).exclude(processing_node=None) tasks = []
try:
tasks_mutex.acquire()
# All tasks that have a processing node assigned
# but don't have a UUID
# and that are not locked (being processed by another thread)
tasks = Task.objects.filter(uuid=None).exclude(Q(processing_node=None) | Q(processing_lock=True))
for task in tasks: for task in tasks:
print("Need to process: {}".format(task)) logger.info("Acquiring lock: {}".format(task))
task.processing_lock = True
task.save()
finally:
tasks_mutex.release()
def process(task):
task.process()
task.processing_lock = False
task.save()
if tasks.count() > 0:
pool = ThreadPool(tasks.count())
pool.map(process, tasks)
pool.close()
pool.join()
def setup(): def setup():
global scheduler global scheduler
@ -41,7 +71,7 @@ def setup():
scheduler = BackgroundScheduler() scheduler = BackgroundScheduler()
scheduler.start() scheduler.start()
scheduler.add_job(update_nodes_info, 'interval', seconds=30) scheduler.add_job(update_nodes_info, 'interval', seconds=30)
scheduler.add_job(process_pending_tasks, 'interval', seconds=15) scheduler.add_job(process_pending_tasks, 'interval', seconds=3)
except SchedulerAlreadyRunningError: except SchedulerAlreadyRunningError:
logger.warn("Scheduler already running (this is OK while testing)") logger.warn("Scheduler already running (this is OK while testing)")
@ -49,6 +79,6 @@ def teardown():
if scheduler != None: if scheduler != None:
logger.info("Stopping scheduler...") logger.info("Stopping scheduler...")
try: try:
scheduler.shutdown(wait=False) scheduler.shutdown()
except SchedulerNotRunningError: except SchedulerNotRunningError:
logger.warn("Scheduler not running") logger.warn("Scheduler not running")

Wyświetl plik

@ -10,6 +10,7 @@ class EditTaskPanel extends React.Component {
this.state = { this.state = {
name: "", name: "",
error: "",
advancedOptions: false, advancedOptions: false,
loadedProcessingNodes: false, loadedProcessingNodes: false,
selectedNode: null, selectedNode: null,
@ -39,6 +40,12 @@ class EditTaskPanel extends React.Component {
this.nodesRequest = this.nodesRequest =
$.getJSON("/api/processingnodes/?online=True", json => { $.getJSON("/api/processingnodes/?online=True", json => {
if (Array.isArray(json)){ if (Array.isArray(json)){
// All nodes offline?
if (json.length === 0){
this.setState({error: "There are no processing nodes online. Make sure at least one of them is reachable."});
return;
}
let nodes = json.map(node => { let nodes = json.map(node => {
return { return {
id: node.id, id: node.id,
@ -138,6 +145,12 @@ class EditTaskPanel extends React.Component {
} }
render() { render() {
if (this.state.error){
return (<div className="alert alert-warning alert-dismissible">
{this.state.error}
</div>);
}
if (this.state.editing){ if (this.state.editing){
let processingNodesOptions = ""; let processingNodesOptions = "";
if (this.state.loadedProcessingNodes){ if (this.state.loadedProcessingNodes){

Wyświetl plik

@ -2,7 +2,6 @@ from django.test import TestCase
from django.contrib.auth.models import User, Group from django.contrib.auth.models import User, Group
from app.models import Project from app.models import Project
from app.boot import boot from app.boot import boot
from app import scheduler
class BootTestCase(TestCase): class BootTestCase(TestCase):
''' '''
@ -51,4 +50,3 @@ class BootTestCase(TestCase):
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
super(BootTestCase, cls).tearDownClass() super(BootTestCase, cls).tearDownClass()
scheduler.teardown()

Wyświetl plik

@ -1,5 +1,7 @@
from django.conf.urls import url, include from django.conf.urls import url, include
from . import views from . import views
from app.boot import boot
from webodm import settings
urlpatterns = [ urlpatterns = [
url(r'^$', views.index, name='index'), url(r'^$', views.index, name='index'),
@ -8,3 +10,7 @@ urlpatterns = [
url(r'^api/', include("app.api.urls")), url(r'^api/', include("app.api.urls")),
] ]
# Test cases call boot() independently
if not settings.TESTING:
boot()

Wyświetl plik

@ -1,7 +1,11 @@
""" """
An interface to An interface to node-OpenDroneMap's API
https://github.com/pierotofy/node-OpenDroneMap/blob/master/docs/index.adoc
""" """
import requests import requests
import mimetypes
import json
import os
class ApiClient: class ApiClient:
def __init__(self, host, port): def __init__(self, host, port):
@ -17,10 +21,17 @@ class ApiClient:
def options(self): def options(self):
return requests.get(self.url('/options')).json() return requests.get(self.url('/options')).json()
def new_task(self): def new_task(self, images, name=None, options=[]):
pass """
#print(dir(self.client.task.post_task_new)) Starts processing of a new task
#return self.client.task.post_task_new(images=dict(images="../Gruntfile.js")).result() :param images: list of path images
:param name: name of the task
#a = ApiClient("localhostaa", 3000) :param options: options to be used for processing ([{'name': optionName, 'value': optionValue}, ...])
#print(a.info()) :return: UUID or error
"""
files = [('images',
(os.path.basename(image), open(image, 'rb'), (mimetypes.guess_type(image)[0] or "image/jpg"))
) for image in images]
return requests.post(self.url("/task/new"),
files=files,
data={'name': name, 'options': json.dumps(options)}).json()

Wyświetl plik

@ -0,0 +1,2 @@
class NewTaskException(Exception):
pass

Plik binarny nie jest wyświetlany.

Po

Szerokość:  |  Wysokość:  |  Rozmiar: 4.9 MiB

Plik binarny nie jest wyświetlany.

Po

Szerokość:  |  Wysokość:  |  Rozmiar: 5.0 MiB

Plik binarny nie jest wyświetlany.

Po

Szerokość:  |  Wysokość:  |  Rozmiar: 4.9 MiB

Plik binarny nie jest wyświetlany.

Po

Szerokość:  |  Wysokość:  |  Rozmiar: 5.0 MiB

Plik binarny nie jest wyświetlany.

Po

Szerokość:  |  Wysokość:  |  Rozmiar: 4.9 MiB

Wyświetl plik

@ -8,6 +8,7 @@ from .api_client import ApiClient
import json import json
from django.db.models import signals from django.db.models import signals
from requests.exceptions import ConnectionError from requests.exceptions import ConnectionError
from .exceptions import NewTaskException
class ProcessingNode(models.Model): class ProcessingNode(models.Model):
hostname = models.CharField(max_length=255, help_text="Hostname where the node is located (can be an internal hostname as well)") hostname = models.CharField(max_length=255, help_text="Hostname where the node is located (can be an internal hostname as well)")
@ -50,14 +51,19 @@ class ProcessingNode(models.Model):
""" """
return json.dumps(self.available_options) return json.dumps(self.available_options)
def process_new_task(self): def process_new_task(self, images, name=None, options=[]):
""" """
Sends a set of images (and optional GCP file) via the API Sends a set of images (and optional GCP file) via the API
to start processing. to start processing.
:returns UUID of the newly created task or ... ? :returns UUID of the newly created task
""" """
api_client = self.api_client() api_client = self.api_client()
result = api_client.new_task(images, name, options)
if result['uuid']:
return result['uuid']
elif result['error']:
raise NewTaskException(result['error'])
# First time a processing node is created, automatically try to update # First time a processing node is created, automatically try to update

Wyświetl plik

@ -66,3 +66,11 @@ class TestClientApi(TestCase):
def test_auto_update_node_info(self): def test_auto_update_node_info(self):
online_node = ProcessingNode.objects.create(hostname="localhost", port=11223) online_node = ProcessingNode.objects.create(hostname="localhost", port=11223)
self.assertTrue(online_node.last_refreshed != None, "Last refreshed info is here (update_node_info() was called)") self.assertTrue(online_node.last_refreshed != None, "Last refreshed info is here (update_node_info() was called)")
def test_add_new_task(self):
pass #TODO
# import glob
# a = ApiClient("localhost", 3000)
# print(a.info())
# print(a.new_task(glob.glob("fixtures/test_images/*.JPG"), "test", [{'name': 'cmvs-maxImages', 'value': 5}]))