Console output retrieval working

pull/40/head
Piero Toffanin 2016-11-01 17:12:13 -04:00
rodzic c75c35a21e
commit 18ecd94bc6
8 zmienionych plików z 54 dodań i 14 usunięć

Wyświetl plik

@ -88,7 +88,7 @@ class Task(models.Model):
status = models.IntegerField(choices=STATUS_CODES, db_index=True, null=True, blank=True, help_text="Current status of the task")
last_error = models.TextField(null=True, blank=True, help_text="The last processing error received")
options = fields.JSONField(default=dict(), blank=True, help_text="Options that are being used to process this task", validators=[validate_task_options])
console_output = models.TextField(null=True, blank=True, help_text="Console output of the OpenDroneMap's process")
console_output = models.TextField(null=False, default="", blank=True, help_text="Console output of the OpenDroneMap's process")
ground_control_points = models.FileField(null=True, blank=True, upload_to=gcp_directory_path, help_text="Optional Ground Control Points file to use for processing")
# georeferenced_model
@ -140,7 +140,7 @@ class Task(models.Model):
# TODO: log process has started processing
except ProcessingException as e:
print("TASK ERROR: " + e.message)
self.set_failure(e.message)
# Need to update status (first time, queued or running?)
if self.uuid and self.status in [None, 10, 20]:
@ -149,10 +149,13 @@ class Task(models.Model):
# Update task info from processing node
try:
info = self.processing_node.get_task_info(self.uuid)
self.processing_time = info["processingTime"]
self.status = info["status"]["code"]
current_lines_count = len(self.console_output.split("\n")) - 1
self.console_output += self.processing_node.get_task_console_output(self.uuid, current_lines_count)
if "errorMessage" in info["status"]:
self.last_error = info["status"]["errorMessage"]
@ -171,9 +174,14 @@ class Task(models.Model):
else:
# Still waiting...
self.save()
except ProcessingException, e:
print("TASK ERROR 2: " + e.message)
except ProcessingException as e:
self.set_failure(e.message)
def set_failure(self, error_message):
print("{} ERROR: {}".format(self, error_message))
self.last_error = error_message
self.status = 30 # failed
self.save()
class Meta:
permissions = (

Wyświetl plik

@ -60,7 +60,7 @@ def process_pending_tasks():
# All tasks that have a processing node assigned
# but don't have a UUID
# and that are not locked (being processed by another thread)
tasks = Task.objects.filter(Q(uuid='') | Q(status__in=[10, 20])).exclude(Q(processing_node=None) | Q(processing_lock=True))
tasks = Task.objects.filter(Q(uuid='') | Q(status__in=[10, 20]) | Q(status=None)).exclude(Q(processing_node=None) | Q(processing_lock=True) | Q(last_error__isnull=False))
for task in tasks:
logger.info("Acquiring lock: {}".format(task))
task.processing_lock = True
@ -84,7 +84,7 @@ def setup():
try:
scheduler.start()
scheduler.add_job(update_nodes_info, 'interval', seconds=30)
scheduler.add_job(process_pending_tasks, 'interval', seconds=15)
scheduler.add_job(process_pending_tasks, 'interval', seconds=5)
except SchedulerAlreadyRunningError:
logger.warn("Scheduler already running (this is OK while testing)")

Wyświetl plik

@ -38,12 +38,14 @@ class Console extends React.Component {
// Fetch
this.sourceRequest = $.get(sourceUrl, text => {
let lines = text.split("\n");
lines.forEach(line => this.addLine(line));
currentLineNumber += (lines.length - 1);
if (text !== ""){
let lines = text.split("\n");
lines.forEach(line => this.addLine(line));
currentLineNumber += (lines.length - 1);
}
})
.always(() => {
if (this.props.refreshInterval !== undefined){
.always((_, textStatus) => {
if (textStatus !== "abort" && this.props.refreshInterval !== undefined){
this.sourceTimeout = setTimeout(updateFromSource, this.props.refreshInterval);
}
});

Wyświetl plik

@ -179,6 +179,9 @@ class ProjectListItem extends React.Component {
handleUpload(){
this.resetUploadState();
// Hide task list
if (this.state.showTaskList) this.toggleTaskList();
}
handleTaskSaved(taskInfo){

Wyświetl plik

@ -6,4 +6,5 @@
word-wrap: break-word; /* IE 5.5+ */
background: #fbfbfb;
color: black;
}

Wyświetl plik

@ -28,6 +28,10 @@ class ApiClient:
def task_info(self, uuid):
return requests.get(self.url('/task/{}/info').format(uuid)).json()
def task_output(self, uuid, line = 0):
return requests.get(self.url('/task/{}/output?line={}').format(uuid, line)).json()
def new_task(self, images, name=None, options=[]):
"""
Starts processing of a new task

Wyświetl plik

@ -85,6 +85,20 @@ class ProcessingNode(models.Model):
elif result['error']:
raise ProcessingException(result['error'])
def get_task_console_output(self, uuid, line):
"""
Retrieves the console output of the OpenDroneMap's process.
Useful for monitoring execution and to provide updates to the user.
"""
api_client = self.api_client()
result = api_client.task_output(uuid, line)
if isinstance(result, dict) and 'error' in result:
raise ProcessingException(result['error'])
elif isinstance(result, list):
return "".join(result)
else:
raise ProcessingException("Unknown response for console output: {}".format(result))
# First time a processing node is created, automatically try to update
@receiver(signals.post_save, sender=ProcessingNode, dispatch_uid="update_processing_node_info")
def auto_update_node_info(sender, instance, created, **kwargs):

Wyświetl plik

@ -5,7 +5,7 @@ from os import path
from .models import ProcessingNode
from .api_client import ApiClient
from requests.exceptions import ConnectionError
from .exceptions import ProcessingException
current_dir = path.dirname(path.realpath(__file__))
@ -59,6 +59,7 @@ class TestClientApi(TestCase):
self.assertTrue(isinstance(online_node.get_available_options_json(), six.string_types), "Available options json works")
self.assertTrue(isinstance(online_node.get_available_options_json(pretty=True), six.string_types), "Available options json works with pretty")
def test_offline_processing_node(self):
offline_node = ProcessingNode.objects.get(pk=2)
self.assertFalse(offline_node.update_node_info(), "Could not update info (offline)")
@ -68,8 +69,9 @@ class TestClientApi(TestCase):
online_node = ProcessingNode.objects.create(hostname="localhost", port=11223)
self.assertTrue(online_node.last_refreshed != None, "Last refreshed info is here (update_node_info() was called)")
def test_client_api(self):
def test_client_api_and_task_methods(self):
api = ApiClient("localhost", 11223)
online_node = ProcessingNode.objects.get(pk=1)
# Can call info(), options()
self.assertTrue(type(api.info()['version']) in [str, unicode])
@ -88,3 +90,9 @@ class TestClientApi(TestCase):
task_info = api.task_info(uuid)
self.assertTrue(isinstance(task_info['dateCreated'], (int, long)))
self.assertTrue(isinstance(task_info['uuid'], (str, unicode)))
# task_output
self.assertTrue(isinstance(api.task_output(uuid, 0), list))
self.assertTrue(isinstance(online_node.get_task_console_output(uuid, 0), (str, unicode)))
self.assertRaises(ProcessingException, online_node.get_task_console_output, "wrong-uuid", 0)