Toolchain LRE tasks, lock fixes

Former-commit-id: c736a817dd
pull/1161/head
Piero Toffanin 2019-05-09 13:11:02 -04:00
rodzic 89e1315191
commit b76108b083
3 zmienionych plików z 119 dodań i 72 usunięć

Wyświetl plik

@ -171,10 +171,10 @@ class OSFMContext:
def name(self):
return os.path.basename(os.path.abspath(self.path("..")))
def get_submodel_argv(args = None, submodels_path = None, submodel_name = None):
def get_submodel_argv(project_name = None, submodels_path = None, submodel_name = None):
"""
Gets argv for a submodel starting from the argv passed to the application startup.
Additionally, if args, submodels_path and submodel_name are passed, the function
Additionally, if project_name, submodels_path and submodel_name are passed, the function
handles the <project name> value and --project-path detection / override.
When all arguments are set to None, --project-path and project name are always removed.
@ -202,7 +202,7 @@ def get_submodel_argv(args = None, submodels_path = None, submodel_name = None):
# Last?
if i == len(argv) - 1:
# Project name?
if args and submodel_name and arg == args.name:
if project_name and submodel_name and arg == project_name:
result.append(submodel_name)
found_args['project_name'] = True
elif arg.startswith("--"):

Wyświetl plik

@ -7,10 +7,12 @@ import signal
import zipfile
import glob
from opendm import log
from opendm import system
from pyodm import Node, exceptions
from pyodm.utils import AtomicCounter
from pyodm.types import TaskStatus
from osfm import OSFMContext, get_submodel_args_dict
from osfm import OSFMContext, get_submodel_args_dict, get_submodel_argv
from pipes import quote
try:
import queue
@ -48,6 +50,12 @@ class LocalRemoteExecutor:
self.project_paths = paths
def run_reconstruction(self):
self.run(ReconstructionTask)
def run_toolchain(self):
self.run(ToolchainTask)
def run(self, taskClass):
if not self.project_paths:
return
@ -56,6 +64,7 @@ class LocalRemoteExecutor:
error = None
local_is_processing = False
semaphore = None
handle_result_mutex = threading.Lock()
node_task_limit = AtomicCounter(0)
@ -63,7 +72,7 @@ class LocalRemoteExecutor:
q = queue.Queue()
for pp in self.project_paths:
log.ODM_DEBUG("LRE: Adding to queue %s" % pp)
q.put(ReconstructionTask(pp, self.node, self.params))
q.put(taskClass(pp, self.node, self.params))
def cleanup_remote_tasks_and_exit():
if self.params['tasks']:
@ -76,46 +85,50 @@ class LocalRemoteExecutor:
os._exit(1)
def handle_result(task, local, error = None, partial=False):
release_semaphore = True
try:
nonloc.handle_result_mutex.acquire()
release_semaphore = True
if error:
if isinstance(error, NodeTaskLimitReachedException) and not nonloc.semaphore and node_task_limit.value > 0:
nonloc.semaphore = threading.Semaphore(node_task_limit.value)
log.ODM_DEBUG("LRE: Node task limit reached. Setting semaphore to %s" % node_task_limit.value)
for i in range(node_task_limit.value):
nonloc.semaphore.acquire()
release_semaphore = False
log.ODM_WARNING("LRE: %s failed with: %s" % (task, str(error)))
# Special case in which the error is caused by a SIGTERM signal
# this means a local processing was terminated either by CTRL+C or
# by canceling the task.
if str(error) == "Child was terminated by signal 15":
cleanup_remote_tasks_and_exit()
if error:
if isinstance(error, NodeTaskLimitReachedException) and not nonloc.semaphore and node_task_limit.value > 0:
nonloc.semaphore = threading.Semaphore(node_task_limit.value)
log.ODM_DEBUG("LRE: Node task limit reached. Setting semaphore to %s" % node_task_limit.value)
for i in range(node_task_limit.value):
nonloc.semaphore.acquire()
release_semaphore = False
if task.retries < task.max_retries:
# Put task back in queue
task.retries += 1
task.wait_until = datetime.datetime.now() + datetime.timedelta(seconds=task.retries * task.retry_timeout)
log.ODM_DEBUG("LRE: Re-queueing %s (retries: %s)" % (task, task.retries))
q.put(task)
log.ODM_WARNING("LRE: %s failed with: %s" % (task, str(error)))
# Special case in which the error is caused by a SIGTERM signal
# this means a local processing was terminated either by CTRL+C or
# by canceling the task.
if str(error) == "Child was terminated by signal 15":
cleanup_remote_tasks_and_exit()
if task.retries < task.max_retries:
# Put task back in queue
task.retries += 1
task.wait_until = datetime.datetime.now() + datetime.timedelta(seconds=task.retries * task.retry_timeout)
log.ODM_DEBUG("LRE: Re-queueing %s (retries: %s)" % (task, task.retries))
q.put(task)
else:
nonloc.error = error
else:
nonloc.error = error
else:
if not local and not partial:
node_task_limit.increment(-1)
if not local and not partial:
node_task_limit.increment(-1)
if not partial:
log.ODM_INFO("LRE: %s finished successfully" % task)
if not partial:
log.ODM_INFO("LRE: %s finished successfully" % task)
if local:
nonloc.local_is_processing = False
if not task.finished:
if nonloc.semaphore and release_semaphore: nonloc.semaphore.release()
q.task_done()
task.finished = True
if local:
nonloc.local_is_processing = False
if not task.finished:
if nonloc.semaphore and release_semaphore: nonloc.semaphore.release()
q.task_done()
task.finished = True
finally:
nonloc.handle_result_mutex.release()
def worker():
while True:
@ -189,13 +202,6 @@ class LocalRemoteExecutor:
raise exceptions.NodeConnectionError("A connection error happened. Check the connection to the processing node and try again.")
else:
raise nonloc.error
def run_toolchain(self):
if not self.project_paths:
return
print("TODO!")
exit(1)
class NodeTaskLimitReachedException(Exception):
@ -270,26 +276,14 @@ class Task:
done(error=None, partial=True) # Upload is completed, but processing is not (partial)
except Exception as e:
done(e)
def process_local(self):
raise NotImplementedError()
def process_remote(self, done):
raise NotImplementedError()
def __str__(self):
return os.path.basename(self.project_path)
class ReconstructionTask(Task):
def process_local(self):
octx = OSFMContext(self.path("opensfm"))
octx.feature_matching()
octx.reconstruct()
def process_remote(self, done):
seed_file = self.create_seed_payload(["opensfm/exif",
"opensfm/camera_models.json",
"opensfm/reference_lla.json"], touch_files=["opensfm/split_merge_stop_at_reconstruction.txt"])
def execute_remote_task(self, seed_files = [], seed_touch_files = [], outputs = []):
"""
Run a task by creating a seed file with all files in seed_files, optionally
creating empty files (for flag checks) specified in seed_touch_files
and returning the results specified in outputs. Yeah it's pretty cool!
"""
seed_file = self.create_seed_payload(seed_files, touch_files=seed_touch_files)
# Find all images
images = glob.glob(self.path("images/**"))
@ -310,10 +304,7 @@ class ReconstructionTask(Task):
get_submodel_args_dict(),
progress_callback=print_progress,
skip_post_processing=True,
outputs=["opensfm/matches", "opensfm/features",
"opensfm/reconstruction.json",
"opensfm/tracks.csv",
])
outputs=outputs)
# Cleanup seed file
os.remove(seed_file)
@ -329,7 +320,7 @@ class ReconstructionTask(Task):
# stop the process and re-add the task to the queue.
def status_callback(info):
if info.status == TaskStatus.QUEUED:
log.ODM_WARNING("%s (%s) turned from RUNNING to QUEUED. Re-adding to back of the queue." % (self, task.uuid))
log.ODM_WARNING("LRE: %s (%s) turned from RUNNING to QUEUED. Re-adding to back of the queue." % (self, task.uuid))
task.remove()
done(NodeTaskLimitReachedException("Delayed task limit reached"), partial=True)
@ -364,4 +355,60 @@ class ReconstructionTask(Task):
else:
raise Exception("Could not send task to node, task status is %s" % str(info.status))
def process_local(self):
raise NotImplementedError()
def process_remote(self, done):
raise NotImplementedError()
def __str__(self):
return os.path.basename(self.project_path)
class ReconstructionTask(Task):
def process_local(self):
octx = OSFMContext(self.path("opensfm"))
log.ODM_INFO("==================================")
log.ODM_INFO("Local Reconstruction %s" % octx.name())
log.ODM_INFO("==================================")
octx.feature_matching()
octx.reconstruct()
def process_remote(self, done):
self.execute_remote_task(seed_files=["opensfm/exif",
"opensfm/camera_models.json",
"opensfm/reference_lla.json"],
seed_touch_files=["opensfm/split_merge_stop_at_reconstruction.txt"],
outputs=["opensfm/matches", "opensfm/features",
"opensfm/reconstruction.json",
"opensfm/tracks.csv"])
class ToolchainTask(Task):
def process_local(self):
log.ODM_INFO("=============================")
log.ODM_INFO("Local Toolchain %s" % self)
log.ODM_INFO("=============================")
submodel_name = os.path.basename(self.project_path)
submodels_path = os.path.abspath(self.path(".."))
project_name = os.path.basename(os.path.abspath(os.path.join(submodels_path, "..")))
argv = get_submodel_argv(args, submodels_path, submodel_name)
# Re-run the ODM toolchain on the submodel
system.run(" ".join(map(quote, argv)), env_vars=os.environ.copy())
def process_remote(self, done):
self.execute_remote_task(seed_files=["opensfm/exif",
"opensfm/camera_models.json",
"opensfm/reference_lla.json",
"opensfm/features",
"opensfm/matches",
"opensfm/reconstruction.json",
"opensfm/tracks.csv"],
seed_touch_files=[],
outputs=["odm_orthophoto/odm_orthophoto.tif",
"odm_orthophoto/cutline.gpkg",
"odm_dem",
"odm_georeferencing"])

Wyświetl plik

@ -135,7 +135,7 @@ class ODMSplitStage(types.ODM_Stage):
log.ODM_INFO("Processing %s" % sp_octx.name())
log.ODM_INFO("========================")
argv = get_submodel_argv(args, tree.submodels_path, sp_octx.name())
argv = get_submodel_argv(args.name, tree.submodels_path, sp_octx.name())
# Re-run the ODM toolchain on the submodel
system.run(" ".join(map(quote, argv)), env_vars=os.environ.copy())