Task processing working (for happy path cases, still need to handle failures, edge cases, errors)

pull/39/head
Piero Toffanin 2016-10-26 16:56:32 -04:00
rodzic f7d519e52c
commit 51aabe177d
4 zmienionych plików z 33 dodań i 12 usunięć

Wyświetl plik

@ -79,13 +79,14 @@ class Task(models.Model):
(50, 'CANCELED')
)
uuid = models.CharField(max_length=255, null=True, blank=True, help_text="Identifier of the task (as returned by OpenDroneMap's REST API)")
uuid = models.CharField(max_length=255, db_index=True, null=True, blank=True, help_text="Identifier of the task (as returned by OpenDroneMap's REST API)")
project = models.ForeignKey(Project, on_delete=models.CASCADE, help_text="Project that this task belongs to")
name = models.CharField(max_length=255, null=True, blank=True, help_text="A label for the task")
processing_lock = models.BooleanField(default=False, help_text="A flag indicating whether this task is currently locked for processing. When this flag is turned on, the task is in the middle of a processing step.")
processing_time = models.IntegerField(default=-1, help_text="Number of milliseconds that elapsed since the beginning of this task (-1 indicates that no information is available)")
processing_node = models.ForeignKey(ProcessingNode, null=True, blank=True, help_text="Processing node assigned to this task (or null if this task has not been associated yet)")
status = models.IntegerField(choices=STATUS_CODES, null=True, blank=True, help_text="Current status of the task")
status = models.IntegerField(choices=STATUS_CODES, db_index=True, null=True, blank=True, help_text="Current status of the task")
last_error = models.TextField(null=True, blank=True, help_text="The last processing error received")
options = fields.JSONField(default=dict(), blank=True, help_text="Options that are being used to process this task", validators=[validate_task_options])
console_output = models.TextField(null=True, blank=True, help_text="Console output of the OpenDroneMap's process")
ground_control_points = models.FileField(null=True, blank=True, upload_to=gcp_directory_path, help_text="Optional Ground Control Points file to use for processing")
@ -144,14 +145,34 @@ class Task(models.Model):
# Need to update status (first time, queued or running?)
if self.uuid and self.status in [None, 10, 20]:
print("Have UUID: {}".format(self.uuid))
# Update task info from processing node
try:
info = self.processing_node.get_task_info(self.uuid)
self.processing_time = info["processingTime"]
self.status = info["status"]["code"]
# TODO!
if "errorMessage" in info["status"]:
self.last_error = info["status"]["errorMessage"]
# Canceled, failed, or completed
if self.uuid and self.status in [30, 40, 50]:
print("DONE: " + str(self.status))
# Has the task just been canceled, failed, or completed?
# Note that we don't save the status code right away,
# if the assets retrieval fails we want to retry again.
if self.status in [30, 40, 50]:
print("ALMOST DONE: " + str(self.status))
# Completed?
if self.status == 40:
# TODO: retrieve assets
pass
else:
self.save()
else:
# Still waiting...
self.save()
except ProcessingException, e:
print("TASK ERROR 2: " + e.message)
class Meta:

Wyświetl plik

@ -44,7 +44,7 @@ def process_pending_tasks():
# All tasks that have a processing node assigned
# but don't have a UUID
# and that are not locked (being processed by another thread)
tasks = Task.objects.filter(uuid=None).exclude(Q(processing_node=None) | Q(processing_lock=True))
tasks = Task.objects.filter(Q(uuid=None) | Q(status=10) | Q(status=20)).exclude(Q(processing_node=None) | Q(processing_lock=True))
for task in tasks:
logger.info("Acquiring lock: {}".format(task))
task.processing_lock = True
@ -71,7 +71,7 @@ def setup():
scheduler = BackgroundScheduler()
scheduler.start()
scheduler.add_job(update_nodes_info, 'interval', seconds=30)
scheduler.add_job(process_pending_tasks, 'interval', seconds=15)
scheduler.add_job(process_pending_tasks, 'interval', seconds=5)
except SchedulerAlreadyRunningError:
logger.warn("Scheduler already running (this is OK while testing)")

Wyświetl plik

@ -32,7 +32,7 @@ class ApiClient:
:param options: options to be used for processing ([{'name': optionName, 'value': optionValue}, ...])
:return: UUID or error
"""
print(options)
files = [('images',
(os.path.basename(image), open(image, 'rb'), (mimetypes.guess_type(image)[0] or "image/jpg"))
) for image in images]

Wyświetl plik

@ -85,5 +85,5 @@ class TestClientApi(TestCase):
# Can call task_info()
task_info = api.task_info(uuid)
self.assertTrue(type(task_info['dateCreated']) == long)
self.assertTrue(type(task_info['uuid']) in [str, unicode])
self.assertTrue(isinstance(task_info['dateCreated'], (int, long)))
self.assertTrue(isinstance(task_info['uuid'], (str, unicode)))