kopia lustrzana https://github.com/OpenDroneMap/ODM
LRE hang fix, medfilt fallback, favor local processing
rodzic
d2c11ccd9b
commit
217f9f7844
opendm
tests
|
@ -275,8 +275,12 @@ def post_process(geotiff_path, output_path, smoothing_iterations=1):
|
|||
# these edge cases, but it's slower.
|
||||
for i in range(smoothing_iterations):
|
||||
log.ODM_INFO("Smoothing iteration %s" % str(i + 1))
|
||||
arr = signal.medfilt(arr, 5)
|
||||
|
||||
try:
|
||||
arr = signal.medfilt(arr, 5)
|
||||
except MemoryError:
|
||||
log.ODM_WARNING("medfilt ran out of memory, switching to slower median_filter")
|
||||
arr = ndimage.median_filter(arr, size=5)
|
||||
|
||||
# Fill corner points with nearest value
|
||||
if arr.shape >= (4, 4):
|
||||
arr[0][:2] = arr[1][0] = arr[1][1]
|
||||
|
|
|
@ -63,6 +63,7 @@ class LocalRemoteExecutor:
|
|||
class nonloc:
|
||||
error = None
|
||||
semaphore = None
|
||||
local_processing = False
|
||||
|
||||
calculate_task_limit_lock = threading.Lock()
|
||||
finished_tasks = AtomicCounter(0)
|
||||
|
@ -87,12 +88,12 @@ class LocalRemoteExecutor:
|
|||
log.ODM_INFO("LRE: No remote tasks to cleanup")
|
||||
|
||||
for task in self.params['tasks']:
|
||||
log.ODM_DEBUG("Removing remote task %s... %s" % (task.uuid, 'OK' if remove_task_safe(task) else 'NO'))
|
||||
log.ODM_DEBUG("LRE: Removing remote task %s... %s" % (task.uuid, 'OK' if remove_task_safe(task) else 'NO'))
|
||||
|
||||
def handle_result(task, local, error = None, partial=False):
|
||||
def cleanup_remote():
|
||||
if not partial and task.remote_task:
|
||||
log.ODM_DEBUG("Cleaning up remote task (%s)... %s" % (task.remote_task.uuid, 'OK' if remove_task_safe(task.remote_task) else 'NO'))
|
||||
log.ODM_DEBUG("LRE: Cleaning up remote task (%s)... %s" % (task.remote_task.uuid, 'OK' if remove_task_safe(task.remote_task) else 'NO'))
|
||||
self.params['tasks'].remove(task.remote_task)
|
||||
task.remote_task = None
|
||||
|
||||
|
@ -113,7 +114,7 @@ class LocalRemoteExecutor:
|
|||
for t in self.params['tasks']:
|
||||
try:
|
||||
info = t.info()
|
||||
if info.status == TaskStatus.RUNNING:
|
||||
if info.status == TaskStatus.RUNNING and info.processing_time >= 0:
|
||||
node_task_limit += 1
|
||||
except exceptions.OdmError:
|
||||
pass
|
||||
|
@ -150,7 +151,7 @@ class LocalRemoteExecutor:
|
|||
|
||||
if not local and not partial and nonloc.semaphore: nonloc.semaphore.release()
|
||||
if not partial: q.task_done()
|
||||
|
||||
|
||||
def local_worker():
|
||||
while True:
|
||||
# Block until a new queue item is available
|
||||
|
@ -162,9 +163,12 @@ class LocalRemoteExecutor:
|
|||
|
||||
# Process local
|
||||
try:
|
||||
nonloc.local_processing = True
|
||||
task.process(True, handle_result)
|
||||
except Exception as e:
|
||||
handle_result(task, True, e)
|
||||
finally:
|
||||
nonloc.local_processing = False
|
||||
|
||||
|
||||
def remote_worker():
|
||||
|
@ -182,14 +186,23 @@ class LocalRemoteExecutor:
|
|||
q.task_done()
|
||||
if nonloc.semaphore: nonloc.semaphore.release()
|
||||
break
|
||||
|
||||
|
||||
# Special case in which we've just created a semaphore
|
||||
if not had_semaphore and nonloc.semaphore:
|
||||
log.ODM_INFO("Just found semaphore, sending %s back to the queue" % task)
|
||||
log.ODM_INFO("LRE: Just found semaphore, sending %s back to the queue" % task)
|
||||
q.put(task)
|
||||
q.task_done()
|
||||
continue
|
||||
|
||||
# Yield to local processing
|
||||
if not nonloc.local_processing:
|
||||
log.ODM_DEBUG("LRE: Yielding to local processing, sending %s back to the queue" % task)
|
||||
q.put(task)
|
||||
q.task_done()
|
||||
if nonloc.semaphore: nonloc.semaphore.release()
|
||||
time.sleep(0.05)
|
||||
continue
|
||||
|
||||
# Process remote
|
||||
try:
|
||||
task.process(False, handle_result)
|
||||
|
@ -210,7 +223,7 @@ class LocalRemoteExecutor:
|
|||
|
||||
# block until all tasks are done (or CTRL+C)
|
||||
try:
|
||||
while finished_tasks.value < len(self.project_paths):
|
||||
while finished_tasks.value < len(self.project_paths) and nonloc.error is None:
|
||||
time.sleep(0.5)
|
||||
except KeyboardInterrupt:
|
||||
log.ODM_WARNING("LRE: CTRL+C")
|
||||
|
|
|
@ -2,7 +2,7 @@ import time
|
|||
import unittest
|
||||
import threading
|
||||
from opendm.remote import LocalRemoteExecutor, Task, NodeTaskLimitReachedException
|
||||
from pyodm import Node
|
||||
from pyodm import Node, exceptions
|
||||
from pyodm.types import TaskStatus
|
||||
|
||||
class TestRemote(unittest.TestCase):
|
||||
|
@ -28,6 +28,7 @@ class TestRemote(unittest.TestCase):
|
|||
class nonloc:
|
||||
local_task_check = False
|
||||
remote_queue = 1
|
||||
should_fail = False
|
||||
|
||||
class OdmTaskMock:
|
||||
def __init__(self, running, queue_num):
|
||||
|
@ -38,6 +39,7 @@ class TestRemote(unittest.TestCase):
|
|||
def info(self):
|
||||
class StatusMock:
|
||||
status = TaskStatus.RUNNING if self.running else TaskStatus.QUEUED
|
||||
processing_time = 1
|
||||
return StatusMock()
|
||||
|
||||
def remove(self):
|
||||
|
@ -54,6 +56,11 @@ class TestRemote(unittest.TestCase):
|
|||
|
||||
self.remote_task = OdmTaskMock(nonloc.remote_queue <= MAX_QUEUE, nonloc.remote_queue)
|
||||
self.params['tasks'].append(self.remote_task)
|
||||
|
||||
if nonloc.should_fail:
|
||||
if self.project_path.endswith("0006"):
|
||||
raise exceptions.TaskFailedError("FAIL #6")
|
||||
|
||||
nonloc.remote_queue += 1
|
||||
|
||||
# Upload successful
|
||||
|
@ -80,3 +87,9 @@ class TestRemote(unittest.TestCase):
|
|||
self.lre.run(TaskMock)
|
||||
self.assertTrue(nonloc.local_task_check)
|
||||
|
||||
nonloc.should_fail = True
|
||||
with self.assertRaises(exceptions.TaskFailedError):
|
||||
self.lre.run(TaskMock)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Ładowanie…
Reference in New Issue