Various LRE fixes

Former-commit-id: 67afd4f9a3
pull/1161/head
Piero Toffanin 2019-05-08 20:32:02 -04:00
rodzic 766efbf8b5
commit 4f28484e50
4 zmienionych plików z 42 dodań i 16 usunięć

Wyświetl plik

@ -180,14 +180,14 @@ def get_submodel_argv(args = None, submodels_path = None, submodel_name = None):
:return the same as argv, but removing references to --split,
setting/replacing --project-path and name
removing --rerun-from, --rerun, --rerun-all
removing --rerun-from, --rerun, --rerun-all, --sm-cluster
adding --orthophoto-cutline
adding --dem-euclidean-map
adding --skip-3dmodel (split-merge does not support 3D model merging)
removing --gcp (the GCP path if specified is always "gcp_list.txt")
"""
assure_always = ['--orthophoto-cutline', '--dem-euclidean-map', '--skip-3dmodel']
remove_always_2 = ['--split', '--rerun-from', '--rerun', '--gcp', '--end-with']
remove_always_2 = ['--split', '--rerun-from', '--rerun', '--gcp', '--end-with', '--sm-cluster']
remove_always_1 = ['--rerun-all']
argv = sys.argv

Wyświetl plik

@ -28,7 +28,8 @@ class LocalRemoteExecutor:
def __init__(self, nodeUrl):
self.node = Node.from_url(nodeUrl)
self.params = {
'tasks': []
'tasks': [],
'threads': []
}
self.node_online = True
@ -65,9 +66,13 @@ class LocalRemoteExecutor:
q.put(ReconstructionTask(pp, self.node, self.params))
def cleanup_remote_tasks_and_exit():
log.ODM_WARNING("LRE: Attempting to cleanup remote tasks")
if self.params['tasks']:
log.ODM_WARNING("LRE: Attempting to cleanup remote tasks")
else:
log.ODM_WARNING("LRE: No remote tasks to cleanup")
for task in self.params['tasks']:
log.ODM_DEBUG("Removing remote task %s... %s" % (task.uuid, task.remove()))
log.ODM_DEBUG("Removing remote task %s... %s" % (task.uuid, 'OK' if task.remove() else 'FAILED'))
os._exit(1)
def handle_result(task, local, error = None, partial=False):
@ -102,10 +107,12 @@ class LocalRemoteExecutor:
log.ODM_INFO("LRE: %s finished successfully" % task)
if local:
nonloc.local_is_processing = False
if nonloc.semaphore: nonloc.semaphore.release()
q.task_done()
nonloc.local_is_processing = False
if not task.finished:
if nonloc.semaphore: nonloc.semaphore.release()
q.task_done()
task.finished = True
def worker():
while True:
@ -134,7 +141,7 @@ class LocalRemoteExecutor:
except Exception as e:
handle_result(task, False, e)
# Define thread
# Create queue thread
t = threading.Thread(target=worker)
# Capture SIGTERM so that we can
@ -161,9 +168,13 @@ class LocalRemoteExecutor:
# stop workers
q.put(None)
# Wait for thread
# Wait for queue thread
t.join()
# Wait for all remains threads
for thrds in self.params['threads']:
thrds.join()
# restore SIGTERM handler
signal.signal(signal.SIGTERM, original_sigterm_handler)
@ -195,6 +206,7 @@ class Task:
self.retries = 0
self.retry_timeout = retry_timeout
self.local = None
self.finished = False
def process(self, local, done):
def handle_result(error = None, partial=False):
@ -204,6 +216,7 @@ class Task:
if local:
t = threading.Thread(target=self._process_local, args=(handle_result, ))
self.params['threads'].append(t)
t.start()
else:
now = datetime.datetime.now()
@ -293,7 +306,7 @@ class ReconstructionTask(Task):
get_submodel_args_dict(),
progress_callback=print_progress,
skip_post_processing=True,
outputs=["opensfm/matches", "opensfm/features"])
outputs=["opensfm/matches", "opensfm/features", "opensfm/reconstruction.aligned.json"])
# Keep track of tasks for cleanup
self.params['tasks'].append(task)
@ -318,12 +331,24 @@ class ReconstructionTask(Task):
task.wait_for_completion(status_callback=status_callback)
log.ODM_DEBUG("LRE: Downloading assets for %s" % self)
task.download_assets(self.project_path, progress_callback=print_progress)
log.ODM_DEBUG("LRE: Downloaded and extracted assets for %s" % self)
done()
except exceptions.TaskFailedError as e:
# Try to get output
try:
log.ODM_WARNING("LRE: %s failed with task output:" % self)
log.ODM_WARNING("\n".join(task.output()[-10:]))
except:
log.ODM_WARNING("LRE: Could not retrieve task output for %s" % self)
pass
done(e)
except Exception as e:
done(e)
# Launch monitor thread and return
threading.Thread(target=monitor).start()
t = threading.Thread(target=monitor)
self.params['threads'].append(t)
t.start()
elif info.status == TaskStatus.QUEUED:
raise NodeTaskLimitReachedException("Task limit reached")
else:

Wyświetl plik

@ -29,6 +29,7 @@ class ODMOpenSfMStage(types.ODM_Stage):
# If we find a special flag file for split/merge we stop right here
if os.path.exists(octx.path("split_merge_stop_at_reconstruction.txt")):
log.ODM_INFO("Stopping OpenSfM early because we found: %s" % octx.path("split_merge_stop_at_reconstruction.txt"))
self.next_stage = None
return
if args.fast_orthophoto:

Wyświetl plik

@ -88,7 +88,7 @@ class ODMSplitStage(types.ODM_Stage):
else:
lre = LocalRemoteExecutor(args.sm_cluster)
lre.set_projects([os.path.abspath(os.path.join(p, "..")) for p in submodel_paths])
lre.run_reconstruct()
lre.run_reconstruction()
# Align
alignment_file = octx.path('alignment_done.txt')
@ -140,8 +140,8 @@ class ODMSplitStage(types.ODM_Stage):
# Re-run the ODM toolchain on the submodel
system.run(" ".join(map(quote, argv)), env_vars=os.environ.copy())
else:
de.set_projects([os.path.abspath(os.path.join(p, "..")) for p in submodel_paths])
de.run_toolchain()
lre.set_projects([os.path.abspath(os.path.join(p, "..")) for p in submodel_paths])
lre.run_toolchain()
with open(split_done_file, 'w') as fout:
fout.write("Split done!\n")