From 217f9f7844409e2ff28df67892dfb8de00db3673 Mon Sep 17 00:00:00 2001 From: Piero Toffanin Date: Mon, 13 May 2019 07:18:03 -0400 Subject: [PATCH] LRE hang fix, medfilt fallback, favor local processing --- opendm/dem/commands.py | 8 ++++++-- opendm/remote.py | 27 ++++++++++++++++++++------- tests/test_remote.py | 15 ++++++++++++++- 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/opendm/dem/commands.py b/opendm/dem/commands.py index bbcfe52e..1f0cdcd7 100644 --- a/opendm/dem/commands.py +++ b/opendm/dem/commands.py @@ -275,8 +275,12 @@ def post_process(geotiff_path, output_path, smoothing_iterations=1): # these edge cases, but it's slower. for i in range(smoothing_iterations): log.ODM_INFO("Smoothing iteration %s" % str(i + 1)) - arr = signal.medfilt(arr, 5) - + try: + arr = signal.medfilt(arr, 5) + except MemoryError: + log.ODM_WARNING("medfilt ran out of memory, switching to slower median_filter") + arr = ndimage.median_filter(arr, size=5) + # Fill corner points with nearest value if arr.shape >= (4, 4): arr[0][:2] = arr[1][0] = arr[1][1] diff --git a/opendm/remote.py b/opendm/remote.py index 36ae99e4..9be2425b 100644 --- a/opendm/remote.py +++ b/opendm/remote.py @@ -63,6 +63,7 @@ class LocalRemoteExecutor: class nonloc: error = None semaphore = None + local_processing = False calculate_task_limit_lock = threading.Lock() finished_tasks = AtomicCounter(0) @@ -87,12 +88,12 @@ class LocalRemoteExecutor: log.ODM_INFO("LRE: No remote tasks to cleanup") for task in self.params['tasks']: - log.ODM_DEBUG("Removing remote task %s... %s" % (task.uuid, 'OK' if remove_task_safe(task) else 'NO')) + log.ODM_DEBUG("LRE: Removing remote task %s... %s" % (task.uuid, 'OK' if remove_task_safe(task) else 'NO')) def handle_result(task, local, error = None, partial=False): def cleanup_remote(): if not partial and task.remote_task: - log.ODM_DEBUG("Cleaning up remote task (%s)... %s" % (task.remote_task.uuid, 'OK' if remove_task_safe(task.remote_task) else 'NO')) + log.ODM_DEBUG("LRE: Cleaning up remote task (%s)... %s" % (task.remote_task.uuid, 'OK' if remove_task_safe(task.remote_task) else 'NO')) self.params['tasks'].remove(task.remote_task) task.remote_task = None @@ -113,7 +114,7 @@ class LocalRemoteExecutor: for t in self.params['tasks']: try: info = t.info() - if info.status == TaskStatus.RUNNING: + if info.status == TaskStatus.RUNNING and info.processing_time >= 0: node_task_limit += 1 except exceptions.OdmError: pass @@ -150,7 +151,7 @@ class LocalRemoteExecutor: if not local and not partial and nonloc.semaphore: nonloc.semaphore.release() if not partial: q.task_done() - + def local_worker(): while True: # Block until a new queue item is available @@ -162,9 +163,12 @@ class LocalRemoteExecutor: # Process local try: + nonloc.local_processing = True task.process(True, handle_result) except Exception as e: handle_result(task, True, e) + finally: + nonloc.local_processing = False def remote_worker(): @@ -182,14 +186,23 @@ class LocalRemoteExecutor: q.task_done() if nonloc.semaphore: nonloc.semaphore.release() break - + # Special case in which we've just created a semaphore if not had_semaphore and nonloc.semaphore: - log.ODM_INFO("Just found semaphore, sending %s back to the queue" % task) + log.ODM_INFO("LRE: Just found semaphore, sending %s back to the queue" % task) q.put(task) q.task_done() continue + # Yield to local processing + if not nonloc.local_processing: + log.ODM_DEBUG("LRE: Yielding to local processing, sending %s back to the queue" % task) + q.put(task) + q.task_done() + if nonloc.semaphore: nonloc.semaphore.release() + time.sleep(0.05) + continue + # Process remote try: task.process(False, handle_result) @@ -210,7 +223,7 @@ class LocalRemoteExecutor: # block until all tasks are done (or CTRL+C) try: - while finished_tasks.value < len(self.project_paths): + while finished_tasks.value < len(self.project_paths) and nonloc.error is None: time.sleep(0.5) except KeyboardInterrupt: log.ODM_WARNING("LRE: CTRL+C") diff --git a/tests/test_remote.py b/tests/test_remote.py index 69b5c323..c59a4bb0 100644 --- a/tests/test_remote.py +++ b/tests/test_remote.py @@ -2,7 +2,7 @@ import time import unittest import threading from opendm.remote import LocalRemoteExecutor, Task, NodeTaskLimitReachedException -from pyodm import Node +from pyodm import Node, exceptions from pyodm.types import TaskStatus class TestRemote(unittest.TestCase): @@ -28,6 +28,7 @@ class TestRemote(unittest.TestCase): class nonloc: local_task_check = False remote_queue = 1 + should_fail = False class OdmTaskMock: def __init__(self, running, queue_num): @@ -38,6 +39,7 @@ class TestRemote(unittest.TestCase): def info(self): class StatusMock: status = TaskStatus.RUNNING if self.running else TaskStatus.QUEUED + processing_time = 1 return StatusMock() def remove(self): @@ -54,6 +56,11 @@ class TestRemote(unittest.TestCase): self.remote_task = OdmTaskMock(nonloc.remote_queue <= MAX_QUEUE, nonloc.remote_queue) self.params['tasks'].append(self.remote_task) + + if nonloc.should_fail: + if self.project_path.endswith("0006"): + raise exceptions.TaskFailedError("FAIL #6") + nonloc.remote_queue += 1 # Upload successful @@ -80,3 +87,9 @@ class TestRemote(unittest.TestCase): self.lre.run(TaskMock) self.assertTrue(nonloc.local_task_check) + nonloc.should_fail = True + with self.assertRaises(exceptions.TaskFailedError): + self.lre.run(TaskMock) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file