From d712c16571654ce70fcdda2369d8c0d1b3af1d2a Mon Sep 17 00:00:00 2001 From: Piero Toffanin Date: Tue, 7 May 2019 15:08:07 -0400 Subject: [PATCH] Remote/local task coordination, node limit logic Former-commit-id: 7c458b8eaad76c14cba051212a469dd74787e3a8 --- opendm/remote.py | 104 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 81 insertions(+), 23 deletions(-) diff --git a/opendm/remote.py b/opendm/remote.py index 8e144552..2d241259 100644 --- a/opendm/remote.py +++ b/opendm/remote.py @@ -1,14 +1,17 @@ import time import datetime +import os import threading from opendm import log from pyodm import Node, exceptions +from pyodm.utils import AtomicCounter + try: import queue except ImportError: import Queue as queue -class HybridDistributedExecutor: +class LocalRemoteExecutor: """ A class for performing OpenSfM reconstructions and full ODM pipeline executions using a mix of local and remote processing. Tasks are executed locally one at a time @@ -19,14 +22,14 @@ class HybridDistributedExecutor: def __init__(self, nodeUrl): self.node = Node.from_url(nodeUrl) - log.ODM_INFO("Initializing hybrid distributed executor using cluster node %s" % nodeUrl) + log.ODM_INFO("LRE: Initializing using cluster node %s" % nodeUrl) try: odm_version = self.node.info().odm_version - log.ODM_INFO("Node is online and running ODM version: %s" % odm_version) + log.ODM_INFO("LRE: Node is online and running ODM version: %s" % odm_version) except exceptions.NodeConnectionError: - log.ODM_WARNING("The node seems to be offline! We'll still process the dataset, but it's going to run entirely locally.") + log.ODM_WARNING("LRE: The node seems to be offline! We'll still process the dataset, but it's going to run entirely locally.") except Exception as e: - log.ODM_ERROR("An unexpected problem happened while opening the node connection: %s" % str(e)) + log.ODM_ERROR("LRE: An unexpected problem happened while opening the node connection: %s" % str(e)) exit(1) def set_projects(self, paths): @@ -40,46 +43,73 @@ class HybridDistributedExecutor: class nonloc: error = None local_is_processing = False + semaphore = None + + node_task_limit = AtomicCounter(0) # Create queue q = queue.Queue() for pp in self.project_paths: + log.ODM_DEBUG("LRE: Adding to queue %s" % pp) q.put(ReconstructionTask(pp)) def handle_result(task, local, error = None): if error: - print("ERROR!!! " + str(error)) + if isinstance(error, NodeTaskLimitReachedException) and not nonloc.semaphore: + nonloc.semaphore = threading.Semaphore(node_task_limit.value) + log.ODM_DEBUG("LRE: Node task limit reached. Setting semaphore to %s" % node_task_limit.value) + for i in range(node_task_limit.value): + nonloc.semaphore.acquire() + + log.ODM_WARNING("LRE: %s failed with: %s" % (task, str(error))) + if task.retries < task.max_retries: # Put task back in queue task.retries += 1 task.wait_until = datetime.datetime.now() + datetime.timedelta(seconds=task.retries * task.retry_timeout) + log.ODM_DEBUG("LRE: Re-queueing %s (retries: %s)" % (task, task.retries)) q.put(task) else: nonloc.error = e - - if local: - nonloc.local_is_processing = False + else: + if not local: + node_task_limit.increment(-1) + log.ODM_INFO("LRE: %s finished successfully" % task) + + if local: + nonloc.local_is_processing = False + + if nonloc.semaphore: nonloc.semaphore.release() q.task_done() def worker(): while True: + # If we've found a limit on the maximum number of tasks + # a node can process, we block until some tasks have completed + if nonloc.semaphore: nonloc.semaphore.acquire() + task = q.get() if task is None or nonloc.error is not None: q.task_done() + if nonloc.semaphore: nonloc.semaphore.release() break - + if not nonloc.local_is_processing: # Process local - nonloc.local_is_processing = True - task.process(True, handle_result) + try: + nonloc.local_is_processing = True + task.process(True, handle_result) + except Exception as e: + handle_result(task, True, e) else: # Process remote - now = datetime.datetime.now() - if task.wait_until > now: - time.sleep((task.wait_until - now).seconds) + try: + task.process(False, handle_result) + node_task_limit.increment() # Called after upload, but before processing is started + except Exception as e: + handle_result(task, False, e) - task.process(False, handle_result) t = threading.Thread(target=worker) t.start() @@ -101,8 +131,11 @@ class HybridDistributedExecutor: return +class NodeTaskLimitReachedException(Exception): + pass + class Task: - def __init__(self, project_path, max_retries=10, retry_timeout=10): + def __init__(self, project_path, max_retries=10, retry_timeout=1): self.project_path = project_path self.wait_until = datetime.datetime.now() # Don't run this task until a certain time self.max_retries = max_retries @@ -114,27 +147,52 @@ class Task: def handle_result(error = None): done(self, local, error) - t = threading.Thread(target=getattr(self, 'process_local' if local else 'process_remote'), args=(handle_result, )) - t.start() + log.ODM_INFO("LRE: About to process %s %s" % (self, 'locally' if local else 'remotely')) + + if local: + t = threading.Thread(target=self.process_local, args=(handle_result, )) + t.start() + else: + now = datetime.datetime.now() + if self.wait_until > now: + wait_for = (self.wait_until - now).seconds + log.ODM_DEBUG("LRE: Waiting %s seconds before processing %s" % (wait_for, self)) + time.sleep(wait_for) + + # TODO: we could consider uploading multiple tasks + # in parallel. But since we are using the same node + # perhaps this wouldn't be a big speedup. + self.process_remote(handle_result) # Block until upload is complete def process_local(self, done): raise NotImplementedError() def process_remote(self, done): raise NotImplementedError() + + def __str__(self): + return os.path.basename(self.project_path) class ReconstructionTask(Task): - def process_local(self, done): print("Process local: " + self.project_path) - time.sleep(0.1) + time.sleep(10) done() def process_remote(self, done): - time.sleep(0.3) + + def test(): + time.sleep(4) + done() if self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0001': done(Exception("TEST EXCEPTION!" + self.project_path)) + elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0002': + done(NodeTaskLimitReachedException("Limit reached")) + elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0003': + threading.Thread(target=test).start() + elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0004': + threading.Thread(target=test).start() else: print("Process remote: " + self.project_path) - done() \ No newline at end of file + done()