diff --git a/Dockerfile b/Dockerfile index 52b471dc..6b2959dc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -78,6 +78,8 @@ RUN pip install -U \ attrs==19.1.0 \ pyodm==1.5.0 +RUN pip install --upgrade cryptography && python -m easy_install --upgrade pyOpenSSL + ENV PYTHONPATH="$PYTHONPATH:/code/SuperBuild/install/lib/python2.7/dist-packages" ENV PYTHONPATH="$PYTHONPATH:/code/SuperBuild/src/opensfm" ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/code/SuperBuild/install/lib" diff --git a/configure.sh b/configure.sh index e320eaf2..deb90563 100755 --- a/configure.sh +++ b/configure.sh @@ -82,6 +82,11 @@ install() { attrs==19.1.0 \ pyodm==1.5.0 + # Fix: /usr/local/lib/python2.7/dist-packages/requests/__init__.py:83: RequestsDependencyWarning: Old version of cryptography ([1, 2, 3]) may cause slowdown. + pip install --upgrade cryptography + python -m easy_install --upgrade pyOpenSSL + + echo "Installing OpenDroneMap Dependencies" apt-get install -y -qq python-scipy \ liblas-bin diff --git a/opendm/remote.py b/opendm/remote.py index b57171c1..0a58c639 100644 --- a/opendm/remote.py +++ b/opendm/remote.py @@ -91,6 +91,7 @@ class LocalRemoteExecutor: def handle_result(task, local, error = None, partial=False): try: handle_result_mutex.acquire() + acquire_semaphore_on_exit = False if error: log.ODM_WARNING("LRE: %s failed with: %s" % (task, str(error))) @@ -102,12 +103,13 @@ class LocalRemoteExecutor: system.exit_gracefully() if isinstance(error, NodeTaskLimitReachedException) and not nonloc.semaphore and node_task_limit.value > 0: - sem_value = max(1, node_task_limit.value - 1) + sem_value = max(1, node_task_limit.value) nonloc.semaphore = threading.Semaphore(sem_value) - log.ODM_DEBUG("LRE: Node task limit reached. Setting semaphore to %s and waiting..." % sem_value) - for i in range(sem_value + 1): - nonloc.semaphore.acquire() # This will block until a task has finished - + log.ODM_DEBUG("LRE: Node task limit reached. Setting semaphore to %s" % sem_value) + for i in range(sem_value): + nonloc.semaphore.acquire() + acquire_semaphore_on_exit = True + # Retry, but only if the error is not related to a task failure if task.retries < task.max_retries and not isinstance(error, exceptions.TaskFailedError): # Put task back in queue @@ -133,18 +135,21 @@ class LocalRemoteExecutor: nonloc.local_is_processing = False if not task.finished: - if nonloc.semaphore: nonloc.semaphore.release() + if not acquire_semaphore_on_exit and nonloc.semaphore: nonloc.semaphore.release() task.finished = True q.task_done() finally: handle_result_mutex.release() + if acquire_semaphore_on_exit and nonloc.semaphore: + log.ODM_INFO("LRE: Waiting...") + nonloc.semaphore.acquire() def worker(): while True: # If we've found a limit on the maximum number of tasks # a node can process, we block until some tasks have completed if nonloc.semaphore: nonloc.semaphore.acquire() - + # Block until a new queue item is available task = q.get() @@ -187,6 +192,7 @@ class LocalRemoteExecutor: system.exit_gracefully() # stop workers + if nonloc.semaphore: nonloc.semaphore.release() q.put(None) # Wait for queue thread diff --git a/portable.Dockerfile b/portable.Dockerfile index 362c3956..6f2b728c 100644 --- a/portable.Dockerfile +++ b/portable.Dockerfile @@ -78,6 +78,8 @@ RUN pip install -U \ attrs==19.1.0 \ pyodm==1.5.0 +RUN pip install --upgrade cryptography && python -m easy_install --upgrade pyOpenSSL + ENV PYTHONPATH="$PYTHONPATH:/code/SuperBuild/install/lib/python2.7/dist-packages" ENV PYTHONPATH="$PYTHONPATH:/code/SuperBuild/src/opensfm" ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/code/SuperBuild/install/lib" diff --git a/test.sh b/test.sh new file mode 100755 index 00000000..1ebe2f67 --- /dev/null +++ b/test.sh @@ -0,0 +1 @@ +python -m unittest discover tests "test_*.py" diff --git a/tests/test_remote.py b/tests/test_remote.py new file mode 100644 index 00000000..cc32d5d1 --- /dev/null +++ b/tests/test_remote.py @@ -0,0 +1,62 @@ +import time +import unittest +import threading +from opendm.remote import LocalRemoteExecutor, Task, NodeTaskLimitReachedException +from pyodm import Node + +class TestRemote(unittest.TestCase): + def setUp(self): + self.lre = LocalRemoteExecutor('http://invalid-host:3000') + self.lre.set_projects(['/submodels/submodel_0000', + '/submodels/submodel_0001', + '/submodels/submodel_0002', + '/submodels/submodel_0003', + '/submodels/submodel_0004', + '/submodels/submodel_0005', + '/submodels/submodel_0006', + ]) + + def test_lre_init(self): + self.assertFalse(self.lre.node_online) + + def test_processing_logic(self): + # Fake online status + self.lre.node_online = True + + MAX_QUEUE = 2 + class nonloc: + local_task_check = False + remote_queue = 0 + + class TaskMock(Task): + def process_local(self): + # First task should be submodel_0000 + if not nonloc.local_task_check: nonloc.local_task_check = self.project_path.endswith("0000") + time.sleep(3) + + def process_remote(self, done): + time.sleep(0.2) + + # Upload successful + done(error=None, partial=True) + + # Async processing + def monitor(): + nonloc.remote_queue += 1 + time.sleep(0.3) + + try: + if nonloc.remote_queue > MAX_QUEUE: + nonloc.remote_queue = 0 + raise NodeTaskLimitReachedException("Delayed task limit reached") + done() + except Exception as e: + done(e) + + t = threading.Thread(target=monitor) + self.params['threads'].append(t) + t.start() + + self.lre.run(TaskMock) + self.assertTrue(nonloc.local_task_check) +