kopia lustrzana https://github.com/OpenDroneMap/WebODM
Consolidated try/except statements from processing code, handled a circular lock condition with restarting tasks
rodzic
a992fea83d
commit
7ab34ba7f3
224
app/models.py
224
app/models.py
|
@ -208,154 +208,147 @@ class Task(models.Model):
|
|||
|
||||
images = [image.path() for image in self.imageupload_set.all()]
|
||||
|
||||
try:
|
||||
# This takes a while
|
||||
uuid = self.processing_node.process_new_task(images, self.name, self.options)
|
||||
# This takes a while
|
||||
uuid = self.processing_node.process_new_task(images, self.name, self.options)
|
||||
|
||||
# Refresh task object before committing change
|
||||
self.refresh_from_db()
|
||||
self.uuid = uuid
|
||||
self.save()
|
||||
# Refresh task object before committing change
|
||||
self.refresh_from_db()
|
||||
self.uuid = uuid
|
||||
self.save()
|
||||
|
||||
# TODO: log process has started processing
|
||||
|
||||
except ProcessingError as e:
|
||||
self.set_failure(str(e))
|
||||
# TODO: log process has started processing
|
||||
|
||||
if self.pending_action is not None:
|
||||
try:
|
||||
if self.pending_action == pending_actions.CANCEL:
|
||||
# Do we need to cancel the task on the processing node?
|
||||
logger.info("Canceling {}".format(self))
|
||||
if self.processing_node and self.uuid:
|
||||
self.processing_node.cancel_task(self.uuid)
|
||||
self.pending_action = None
|
||||
self.status = None
|
||||
self.save()
|
||||
else:
|
||||
raise ProcessingError("Cannot cancel a task that has no processing node or UUID")
|
||||
if self.pending_action == pending_actions.CANCEL:
|
||||
# Do we need to cancel the task on the processing node?
|
||||
logger.info("Canceling {}".format(self))
|
||||
if self.processing_node and self.uuid:
|
||||
self.processing_node.cancel_task(self.uuid)
|
||||
self.pending_action = None
|
||||
self.status = None
|
||||
self.save()
|
||||
else:
|
||||
raise ProcessingError("Cannot cancel a task that has no processing node or UUID")
|
||||
|
||||
elif self.pending_action == pending_actions.RESTART:
|
||||
logger.info("Restarting {}".format(self))
|
||||
if self.processing_node:
|
||||
elif self.pending_action == pending_actions.RESTART:
|
||||
logger.info("Restarting {}".format(self))
|
||||
if self.processing_node:
|
||||
|
||||
# Check if the UUID is still valid, as processing nodes purge
|
||||
# results after a set amount of time, the UUID might have eliminated.
|
||||
uuid_still_exists = False
|
||||
# Check if the UUID is still valid, as processing nodes purge
|
||||
# results after a set amount of time, the UUID might have eliminated.
|
||||
uuid_still_exists = False
|
||||
|
||||
if self.uuid:
|
||||
try:
|
||||
info = self.processing_node.get_task_info(self.uuid)
|
||||
uuid_still_exists = info['uuid'] == self.uuid
|
||||
except ProcessingException:
|
||||
pass
|
||||
|
||||
if uuid_still_exists:
|
||||
# Good to go
|
||||
self.processing_node.restart_task(self.uuid)
|
||||
else:
|
||||
# Task has been purged (or processing node is offline)
|
||||
# Process this as a new task
|
||||
# Removing its UUID will cause the scheduler
|
||||
# to process this the next tick
|
||||
self.uuid = ''
|
||||
|
||||
self.console_output = ""
|
||||
self.processing_time = -1
|
||||
self.status = None
|
||||
self.last_error = None
|
||||
self.pending_action = None
|
||||
self.save()
|
||||
else:
|
||||
raise ProcessingError("Cannot restart a task that has no processing node")
|
||||
|
||||
elif self.pending_action == pending_actions.REMOVE:
|
||||
logger.info("Removing {}".format(self))
|
||||
if self.processing_node and self.uuid:
|
||||
# Attempt to delete the resources on the processing node
|
||||
# We don't care if this fails, as resources on processing nodes
|
||||
# Are expected to be purged on their own after a set amount of time anyway
|
||||
if self.uuid:
|
||||
try:
|
||||
self.processing_node.remove_task(self.uuid)
|
||||
info = self.processing_node.get_task_info(self.uuid)
|
||||
uuid_still_exists = info['uuid'] == self.uuid
|
||||
except ProcessingException:
|
||||
pass
|
||||
|
||||
# What's more important is that we delete our task properly here
|
||||
self.delete()
|
||||
if uuid_still_exists:
|
||||
# Good to go
|
||||
try:
|
||||
self.processing_node.restart_task(self.uuid)
|
||||
except ProcessingError as e:
|
||||
# Something went wrong
|
||||
logger.warning("Could not restart {}, will start a new one".format(self))
|
||||
self.uuid = ''
|
||||
else:
|
||||
# Task has been purged (or processing node is offline)
|
||||
# Process this as a new task
|
||||
# Removing its UUID will cause the scheduler
|
||||
# to process this the next tick
|
||||
self.uuid = ''
|
||||
|
||||
# Stop right here!
|
||||
return
|
||||
self.console_output = ""
|
||||
self.processing_time = -1
|
||||
self.status = None
|
||||
self.last_error = None
|
||||
self.pending_action = None
|
||||
self.save()
|
||||
else:
|
||||
raise ProcessingError("Cannot restart a task that has no processing node")
|
||||
|
||||
except ProcessingError as e:
|
||||
self.last_error = str(e)
|
||||
self.save()
|
||||
elif self.pending_action == pending_actions.REMOVE:
|
||||
logger.info("Removing {}".format(self))
|
||||
if self.processing_node and self.uuid:
|
||||
# Attempt to delete the resources on the processing node
|
||||
# We don't care if this fails, as resources on processing nodes
|
||||
# Are expected to be purged on their own after a set amount of time anyway
|
||||
try:
|
||||
self.processing_node.remove_task(self.uuid)
|
||||
except ProcessingException:
|
||||
pass
|
||||
|
||||
# What's more important is that we delete our task properly here
|
||||
self.delete()
|
||||
|
||||
# Stop right here!
|
||||
return
|
||||
|
||||
if self.processing_node:
|
||||
# Need to update status (first time, queued or running?)
|
||||
if self.uuid and self.status in [None, status_codes.QUEUED, status_codes.RUNNING]:
|
||||
# Update task info from processing node
|
||||
try:
|
||||
info = self.processing_node.get_task_info(self.uuid)
|
||||
info = self.processing_node.get_task_info(self.uuid)
|
||||
|
||||
self.processing_time = info["processingTime"]
|
||||
self.status = info["status"]["code"]
|
||||
self.processing_time = info["processingTime"]
|
||||
self.status = info["status"]["code"]
|
||||
|
||||
current_lines_count = len(self.console_output.split("\n")) - 1
|
||||
self.console_output += self.processing_node.get_task_console_output(self.uuid, current_lines_count)
|
||||
current_lines_count = len(self.console_output.split("\n")) - 1
|
||||
self.console_output += self.processing_node.get_task_console_output(self.uuid, current_lines_count)
|
||||
|
||||
if "errorMessage" in info["status"]:
|
||||
self.last_error = info["status"]["errorMessage"]
|
||||
if "errorMessage" in info["status"]:
|
||||
self.last_error = info["status"]["errorMessage"]
|
||||
|
||||
# Has the task just been canceled, failed, or completed?
|
||||
if self.status in [status_codes.FAILED, status_codes.COMPLETED, status_codes.CANCELED]:
|
||||
logger.info("Processing status: {} for {}".format(self.status, self))
|
||||
# Has the task just been canceled, failed, or completed?
|
||||
if self.status in [status_codes.FAILED, status_codes.COMPLETED, status_codes.CANCELED]:
|
||||
logger.info("Processing status: {} for {}".format(self.status, self))
|
||||
|
||||
if self.status == status_codes.COMPLETED:
|
||||
try:
|
||||
assets_dir = self.assets_path("")
|
||||
if not os.path.exists(assets_dir):
|
||||
os.makedirs(assets_dir)
|
||||
if self.status == status_codes.COMPLETED:
|
||||
assets_dir = self.assets_path("")
|
||||
if not os.path.exists(assets_dir):
|
||||
os.makedirs(assets_dir)
|
||||
|
||||
logger.info("Downloading all.zip for {}".format(self))
|
||||
logger.info("Downloading all.zip for {}".format(self))
|
||||
|
||||
# Download all assets
|
||||
zip_stream = self.processing_node.download_task_asset(self.uuid, "all.zip")
|
||||
zip_path = os.path.join(assets_dir, "all.zip")
|
||||
with open(zip_path, 'wb') as fd:
|
||||
for chunk in zip_stream.iter_content(4096):
|
||||
fd.write(chunk)
|
||||
# Download all assets
|
||||
zip_stream = self.processing_node.download_task_asset(self.uuid, "all.zip")
|
||||
zip_path = os.path.join(assets_dir, "all.zip")
|
||||
with open(zip_path, 'wb') as fd:
|
||||
for chunk in zip_stream.iter_content(4096):
|
||||
fd.write(chunk)
|
||||
|
||||
logger.info("Done downloading all.zip for {}".format(self))
|
||||
logger.info("Done downloading all.zip for {}".format(self))
|
||||
|
||||
# Extract from zip
|
||||
with zipfile.ZipFile(zip_path, "r") as zip_h:
|
||||
zip_h.extractall(assets_dir)
|
||||
# Extract from zip
|
||||
with zipfile.ZipFile(zip_path, "r") as zip_h:
|
||||
zip_h.extractall(assets_dir)
|
||||
|
||||
logger.info("Extracted all.zip for {}".format(self))
|
||||
logger.info("Extracted all.zip for {}".format(self))
|
||||
|
||||
# Add to database orthophoto
|
||||
orthophoto_path = os.path.realpath(self.assets_path("odm_orthophoto", "odm_orthophoto.tif"))
|
||||
if os.path.exists(orthophoto_path):
|
||||
orthophoto = GDALRaster(orthophoto_path, write=True)
|
||||
# Add to database orthophoto
|
||||
orthophoto_path = os.path.realpath(self.assets_path("odm_orthophoto", "odm_orthophoto.tif"))
|
||||
if os.path.exists(orthophoto_path):
|
||||
orthophoto = GDALRaster(orthophoto_path, write=True)
|
||||
|
||||
# We need to transform to 4326 before we can store it
|
||||
# as an offdb raster field
|
||||
orthophoto_4326_path = os.path.realpath(self.assets_path("odm_orthophoto", "odm_orthophoto_4326.tif"))
|
||||
self.orthophoto = orthophoto.transform(4326, 'GTiff', orthophoto_4326_path)
|
||||
# We need to transform to 4326 before we can store it
|
||||
# as an offdb raster field
|
||||
orthophoto_4326_path = os.path.realpath(self.assets_path("odm_orthophoto", "odm_orthophoto_4326.tif"))
|
||||
self.orthophoto = orthophoto.transform(4326, 'GTiff', orthophoto_4326_path)
|
||||
|
||||
logger.info("Imported orthophoto {} for {}".format(orthophoto_4326_path, self))
|
||||
logger.info("Imported orthophoto {} for {}".format(orthophoto_4326_path, self))
|
||||
|
||||
self.save()
|
||||
except ProcessingError as e:
|
||||
self.set_failure(str(e))
|
||||
else:
|
||||
# FAILED, CANCELED
|
||||
self.save()
|
||||
else:
|
||||
# Still waiting...
|
||||
self.save()
|
||||
except ProcessingError as e:
|
||||
self.set_failure(str(e))
|
||||
else:
|
||||
# FAILED, CANCELED
|
||||
self.save()
|
||||
else:
|
||||
# Still waiting...
|
||||
self.save()
|
||||
|
||||
except ProcessingError as e:
|
||||
self.set_failure(str(e))
|
||||
except (ConnectionRefusedError, ConnectionError) as e:
|
||||
logger.warning("{} cannot communicate with processing node: {}".format(self, str(e)))
|
||||
except ProcessingTimeout as e:
|
||||
|
@ -394,6 +387,7 @@ class Task(models.Model):
|
|||
logger.error("FAILURE FOR {}: {}".format(self, error_message))
|
||||
self.last_error = error_message
|
||||
self.status = status_codes.FAILED
|
||||
self.pending_action = None
|
||||
self.save()
|
||||
|
||||
class Meta:
|
||||
|
|
Ładowanie…
Reference in New Issue