diff --git a/opendm/remote.py b/opendm/remote.py index 0b695a2b..d54a6312 100644 --- a/opendm/remote.py +++ b/opendm/remote.py @@ -4,9 +4,11 @@ import os import sys import threading import signal +import zipfile from opendm import log from pyodm import Node, exceptions from pyodm.utils import AtomicCounter +from osfm import OSFMContext try: import queue @@ -23,6 +25,7 @@ class LocalRemoteExecutor: """ def __init__(self, nodeUrl): self.node = Node.from_url(nodeUrl) + self.node.tasks = [] log.ODM_INFO("LRE: Initializing using cluster node %s:%s" % (self.node.host, self.node.port)) try: @@ -53,8 +56,15 @@ class LocalRemoteExecutor: q = queue.Queue() for pp in self.project_paths: log.ODM_DEBUG("LRE: Adding to queue %s" % pp) - q.put(ReconstructionTask(pp)) + q.put(ReconstructionTask(pp, self.node)) + def cleanup_remote_tasks_and_exit(): + log.ODM_WARNING("LRE: Attempting to cleanup remote tasks") + for task in self.node.tasks: + pass # TODO!!! + #task.cancel() + os._exit(1) + def handle_result(task, local, error = None): if error: if isinstance(error, NodeTaskLimitReachedException) and not nonloc.semaphore: @@ -65,6 +75,12 @@ class LocalRemoteExecutor: log.ODM_WARNING("LRE: %s failed with: %s" % (task, str(error))) + # Special case in which the error is caused by a SIGTERM signal + # this means a local processing was terminated either by CTRL+C or + # by canceling the task. + if str(error) == "Child was terminated by signal 15": + cleanup_remote_tasks_and_exit() + if task.retries < task.max_retries: # Put task back in queue task.retries += 1 @@ -72,7 +88,7 @@ class LocalRemoteExecutor: log.ODM_DEBUG("LRE: Re-queueing %s (retries: %s)" % (task, task.retries)) q.put(task) else: - nonloc.error = e + nonloc.error = error else: if not local: node_task_limit.increment(-1) @@ -115,18 +131,13 @@ class LocalRemoteExecutor: # Define thread t = threading.Thread(target=worker) - def cleanup_remote_tasks(): - log.ODM_WARNING("LRE: Attempting to cleanup remote tasks") - pass # TODO - # Capture SIGTERM so that we can # attempt to cleanup if the process is terminated original_sigterm_handler = signal.getsignal(signal.SIGTERM) def sigterm_handler(signum, frame): log.ODM_WARNING("LRE: Caught SIGTERM") - cleanup_remote_tasks() - os._exit(1) + cleanup_remote_tasks_and_exit() signal.signal(signal.SIGTERM, sigterm_handler) @@ -139,8 +150,7 @@ class LocalRemoteExecutor: time.sleep(0.5) except KeyboardInterrupt: log.ODM_WARNING("LRE: CTRL+C") - cleanup_remote_tasks() - os._exit(1) + cleanup_remote_tasks_and_exit() # stop workers q.put(None) @@ -164,8 +174,9 @@ class NodeTaskLimitReachedException(Exception): pass class Task: - def __init__(self, project_path, max_retries=10, retry_timeout=1): + def __init__(self, project_path, node, max_retries=10, retry_timeout=10): self.project_path = project_path + self.node = node self.wait_until = datetime.datetime.now() # Don't run this task until a certain time self.max_retries = max_retries self.retries = 0 @@ -179,20 +190,48 @@ class Task: log.ODM_INFO("LRE: About to process %s %s" % (self, 'locally' if local else 'remotely')) if local: - t = threading.Thread(target=self.process_local, args=(handle_result, )) + t = threading.Thread(target=self._process_local, args=(handle_result, )) t.start() else: now = datetime.datetime.now() if self.wait_until > now: - wait_for = (self.wait_until - now).seconds + wait_for = (self.wait_until - now).seconds + 1 log.ODM_DEBUG("LRE: Waiting %s seconds before processing %s" % (wait_for, self)) time.sleep(wait_for) # TODO: we could consider uploading multiple tasks # in parallel. But since we are using the same node # perhaps this wouldn't be a big speedup. - self.process_remote(handle_result) # Block until upload is complete + self._process_remote(handle_result) # Block until upload is complete + def create_seed_payload(self, paths): + paths = filter(os.path.exists, map(lambda p: os.path.join(self.project_path, p), paths)) + outfile = os.path.join(self.project_path, "seed.zip") + + with zipfile.ZipFile(outfile, "w", compression=zipfile.ZIP_DEFLATED) as zf: + for p in paths: + if os.path.isdir(p): + for root, _, filenames in os.walk(p): + for filename in filenames: + filename = os.path.join(root, filename) + filename = os.path.normpath(filename) + zf.write(filename, os.path.relpath(filename, self.project_path)) + else: + zf.write(p, os.path.relpath(p, self.project_path)) + return outfile + + def _process_local(self, done): + try: + self.process_local(done) + except Exception as e: + done(e) + + def _process_remote(self, done): + try: + self.process_remote(done) + except Exception as e: + done(e) + def process_local(self, done): raise NotImplementedError() @@ -204,8 +243,9 @@ class Task: class ReconstructionTask(Task): def process_local(self, done): - print("Process local: " + self.project_path) - time.sleep(10) + octx = OSFMContext(os.path.join(self.project_path, "opensfm")) + octx.feature_matching() + octx.reconstruct() done() def process_remote(self, done): @@ -214,6 +254,12 @@ class ReconstructionTask(Task): time.sleep(4) done() + self.create_seed_payload(["opensfm/exif", + "opensfm/matches", + "opensfm/features", + "opensfm/camera_models.json", + "opensfm/reference_lla.json"]) + if self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0001': done(Exception("TEST EXCEPTION!" + self.project_path)) elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0002': diff --git a/stages/run_opensfm.py b/stages/run_opensfm.py index 849795bb..65a9fbcf 100644 --- a/stages/run_opensfm.py +++ b/stages/run_opensfm.py @@ -26,6 +26,9 @@ class ODMOpenSfMStage(types.ODM_Stage): octx.feature_matching(self.rerun()) octx.reconstruct(self.rerun()) + # TODO: add special logic for distributed large workflow + # and terminate stage early + if args.fast_orthophoto: output_file = octx.path('reconstruction.ply') elif args.use_opensfm_dense: diff --git a/stages/splitmerge.py b/stages/splitmerge.py index 8ecdefb4..cb90b8fd 100644 --- a/stages/splitmerge.py +++ b/stages/splitmerge.py @@ -88,7 +88,7 @@ class ODMSplitStage(types.ODM_Stage): else: de = HybridDistributedExecutor(args.sm_cluster) de.set_projects([os.path.abspath(os.path.join(p, "..")) for p in submodel_paths]) - de.run_reconstruct() + de.run_reconstruct(self.rerun()) # Align alignment_file = octx.path('alignment_done.txt')