kopia lustrzana https://github.com/OpenDroneMap/ODM
LRE improvements (semaphore --> atomic counter with retries)
rodzic
0998872ba5
commit
ab9a94723c
|
@ -62,11 +62,12 @@ class LocalRemoteExecutor:
|
|||
# Shared variables across threads
|
||||
class nonloc:
|
||||
error = None
|
||||
semaphore = None
|
||||
local_processing = False
|
||||
max_remote_tasks = None
|
||||
|
||||
calculate_task_limit_lock = threading.Lock()
|
||||
finished_tasks = AtomicCounter(0)
|
||||
remote_running_tasks = AtomicCounter(0)
|
||||
|
||||
# Create queue
|
||||
q = queue.Queue()
|
||||
|
@ -85,7 +86,7 @@ class LocalRemoteExecutor:
|
|||
if self.params['tasks']:
|
||||
log.ODM_WARNING("LRE: Attempting to cleanup remote tasks")
|
||||
else:
|
||||
log.ODM_INFO("LRE: No remote tasks to cleanup")
|
||||
log.ODM_INFO("LRE: No remote tasks left to cleanup")
|
||||
|
||||
for task in self.params['tasks']:
|
||||
log.ODM_DEBUG("LRE: Removing remote task %s... %s" % (task.uuid, 'OK' if remove_task_safe(task) else 'NO'))
|
||||
|
@ -106,12 +107,12 @@ class LocalRemoteExecutor:
|
|||
if str(error) == "Child was terminated by signal 15":
|
||||
system.exit_gracefully()
|
||||
|
||||
created_semaphore = False
|
||||
if isinstance(error, NodeTaskLimitReachedException):
|
||||
task_limit_reached = isinstance(error, NodeTaskLimitReachedException)
|
||||
if task_limit_reached:
|
||||
# Estimate the maximum number of tasks based on how many tasks
|
||||
# are currently running
|
||||
with calculate_task_limit_lock:
|
||||
if not nonloc.semaphore:
|
||||
if nonloc.max_remote_tasks is None:
|
||||
node_task_limit = 0
|
||||
for t in self.params['tasks']:
|
||||
try:
|
||||
|
@ -121,39 +122,36 @@ class LocalRemoteExecutor:
|
|||
except exceptions.OdmError:
|
||||
pass
|
||||
|
||||
sem_value = max(1, node_task_limit)
|
||||
nonloc.semaphore = threading.Semaphore(sem_value)
|
||||
log.ODM_DEBUG("LRE: Node task limit reached. Setting semaphore to %s" % sem_value)
|
||||
for i in range(sem_value):
|
||||
nonloc.semaphore.acquire()
|
||||
created_semaphore = True
|
||||
nonloc.max_remote_tasks = max(1, node_task_limit)
|
||||
log.ODM_DEBUG("LRE: Node task limit reached. Setting max remote tasks to %s" % node_task_limit)
|
||||
|
||||
|
||||
# Retry, but only if the error is not related to a task failure
|
||||
if task.retries < task.max_retries and not isinstance(error, exceptions.TaskFailedError):
|
||||
# Put task back in queue
|
||||
# Don't increment the retry counter if this task simply reached the task
|
||||
# limit count.
|
||||
if not isinstance(error, NodeTaskLimitReachedException):
|
||||
if not task_limit_reached:
|
||||
task.retries += 1
|
||||
task.wait_until = datetime.datetime.now() + datetime.timedelta(seconds=task.retries * task.retry_timeout)
|
||||
cleanup_remote()
|
||||
q.task_done()
|
||||
|
||||
log.ODM_DEBUG("LRE: Re-queueing %s (retries: %s)" % (task, task.retries))
|
||||
if not created_semaphore and nonloc.semaphore: nonloc.semaphore.release()
|
||||
q.put(task)
|
||||
if not local: remote_running_tasks.increment(-1)
|
||||
return
|
||||
else:
|
||||
nonloc.error = error
|
||||
finished_tasks.increment()
|
||||
if not local: remote_running_tasks.increment(-1)
|
||||
else:
|
||||
if not partial:
|
||||
log.ODM_INFO("LRE: %s finished successfully" % task)
|
||||
finished_tasks.increment()
|
||||
if not local: remote_running_tasks.increment(-1)
|
||||
|
||||
cleanup_remote()
|
||||
|
||||
if not local and not partial and nonloc.semaphore: nonloc.semaphore.release()
|
||||
if not partial: q.task_done()
|
||||
|
||||
def local_worker():
|
||||
|
@ -177,38 +175,32 @@ class LocalRemoteExecutor:
|
|||
|
||||
def remote_worker():
|
||||
while True:
|
||||
had_semaphore = bool(nonloc.semaphore)
|
||||
|
||||
# If we've found an estimate of the limit on the maximum number of tasks
|
||||
# a node can process, we block until some tasks have completed
|
||||
if nonloc.semaphore: nonloc.semaphore.acquire()
|
||||
|
||||
# Block until a new queue item is available
|
||||
task = q.get()
|
||||
|
||||
if task is None or nonloc.error is not None:
|
||||
q.task_done()
|
||||
if nonloc.semaphore: nonloc.semaphore.release()
|
||||
break
|
||||
|
||||
# Special case in which we've just created a semaphore
|
||||
if not had_semaphore and nonloc.semaphore:
|
||||
log.ODM_INFO("LRE: Just found semaphore, sending %s back to the queue" % task)
|
||||
q.put(task)
|
||||
q.task_done()
|
||||
continue
|
||||
|
||||
# Yield to local processing
|
||||
if not nonloc.local_processing:
|
||||
log.ODM_DEBUG("LRE: Yielding to local processing, sending %s back to the queue" % task)
|
||||
q.put(task)
|
||||
q.task_done()
|
||||
if nonloc.semaphore: nonloc.semaphore.release()
|
||||
time.sleep(0.05)
|
||||
continue
|
||||
|
||||
# If we've found an estimate of the limit on the maximum number of tasks
|
||||
# a node can process, we block until some tasks have completed
|
||||
if nonloc.max_remote_tasks is not None and remote_running_tasks.value >= nonloc.max_remote_tasks:
|
||||
q.put(task)
|
||||
q.task_done()
|
||||
time.sleep(2)
|
||||
continue
|
||||
|
||||
# Process remote
|
||||
try:
|
||||
remote_running_tasks.increment()
|
||||
task.process(False, handle_result)
|
||||
except Exception as e:
|
||||
handle_result(task, False, e)
|
||||
|
@ -234,7 +226,6 @@ class LocalRemoteExecutor:
|
|||
system.exit_gracefully()
|
||||
|
||||
# stop workers
|
||||
if nonloc.semaphore: nonloc.semaphore.release()
|
||||
q.put(None)
|
||||
if self.node_online:
|
||||
q.put(None)
|
||||
|
|
|
@ -8,7 +8,7 @@ from pyodm.types import TaskStatus
|
|||
|
||||
class TestRemote(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.lre = LocalRemoteExecutor('http://invalid-host:3000')
|
||||
self.lre = LocalRemoteExecutor('http://localhost:9001')
|
||||
|
||||
projects = []
|
||||
for i in range(9):
|
||||
|
@ -48,6 +48,11 @@ class TestRemote(unittest.TestCase):
|
|||
def process_local(self):
|
||||
# First task should be 0000 or 0001
|
||||
if not nonloc.local_task_check: nonloc.local_task_check = self.project_path.endswith("0000") or self.project_path.endswith("0001")
|
||||
|
||||
if nonloc.should_fail:
|
||||
if self.project_path.endswith("0006"):
|
||||
raise exceptions.TaskFailedError("FAIL #6")
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
def process_remote(self, done):
|
||||
|
|
Ładowanie…
Reference in New Issue