kopia lustrzana https://github.com/OpenDroneMap/ODM
Cryptography package warn fix, tests
rodzic
cfb1cb362b
commit
14855d010c
|
@ -78,6 +78,8 @@ RUN pip install -U \
|
||||||
attrs==19.1.0 \
|
attrs==19.1.0 \
|
||||||
pyodm==1.5.0
|
pyodm==1.5.0
|
||||||
|
|
||||||
|
RUN pip install --upgrade cryptography && python -m easy_install --upgrade pyOpenSSL
|
||||||
|
|
||||||
ENV PYTHONPATH="$PYTHONPATH:/code/SuperBuild/install/lib/python2.7/dist-packages"
|
ENV PYTHONPATH="$PYTHONPATH:/code/SuperBuild/install/lib/python2.7/dist-packages"
|
||||||
ENV PYTHONPATH="$PYTHONPATH:/code/SuperBuild/src/opensfm"
|
ENV PYTHONPATH="$PYTHONPATH:/code/SuperBuild/src/opensfm"
|
||||||
ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/code/SuperBuild/install/lib"
|
ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/code/SuperBuild/install/lib"
|
||||||
|
|
|
@ -82,6 +82,11 @@ install() {
|
||||||
attrs==19.1.0 \
|
attrs==19.1.0 \
|
||||||
pyodm==1.5.0
|
pyodm==1.5.0
|
||||||
|
|
||||||
|
# Fix: /usr/local/lib/python2.7/dist-packages/requests/__init__.py:83: RequestsDependencyWarning: Old version of cryptography ([1, 2, 3]) may cause slowdown.
|
||||||
|
pip install --upgrade cryptography
|
||||||
|
python -m easy_install --upgrade pyOpenSSL
|
||||||
|
|
||||||
|
|
||||||
echo "Installing OpenDroneMap Dependencies"
|
echo "Installing OpenDroneMap Dependencies"
|
||||||
apt-get install -y -qq python-scipy \
|
apt-get install -y -qq python-scipy \
|
||||||
liblas-bin
|
liblas-bin
|
||||||
|
|
|
@ -91,6 +91,7 @@ class LocalRemoteExecutor:
|
||||||
def handle_result(task, local, error = None, partial=False):
|
def handle_result(task, local, error = None, partial=False):
|
||||||
try:
|
try:
|
||||||
handle_result_mutex.acquire()
|
handle_result_mutex.acquire()
|
||||||
|
acquire_semaphore_on_exit = False
|
||||||
|
|
||||||
if error:
|
if error:
|
||||||
log.ODM_WARNING("LRE: %s failed with: %s" % (task, str(error)))
|
log.ODM_WARNING("LRE: %s failed with: %s" % (task, str(error)))
|
||||||
|
@ -102,12 +103,13 @@ class LocalRemoteExecutor:
|
||||||
system.exit_gracefully()
|
system.exit_gracefully()
|
||||||
|
|
||||||
if isinstance(error, NodeTaskLimitReachedException) and not nonloc.semaphore and node_task_limit.value > 0:
|
if isinstance(error, NodeTaskLimitReachedException) and not nonloc.semaphore and node_task_limit.value > 0:
|
||||||
sem_value = max(1, node_task_limit.value - 1)
|
sem_value = max(1, node_task_limit.value)
|
||||||
nonloc.semaphore = threading.Semaphore(sem_value)
|
nonloc.semaphore = threading.Semaphore(sem_value)
|
||||||
log.ODM_DEBUG("LRE: Node task limit reached. Setting semaphore to %s and waiting..." % sem_value)
|
log.ODM_DEBUG("LRE: Node task limit reached. Setting semaphore to %s" % sem_value)
|
||||||
for i in range(sem_value + 1):
|
for i in range(sem_value):
|
||||||
nonloc.semaphore.acquire() # This will block until a task has finished
|
nonloc.semaphore.acquire()
|
||||||
|
acquire_semaphore_on_exit = True
|
||||||
|
|
||||||
# Retry, but only if the error is not related to a task failure
|
# Retry, but only if the error is not related to a task failure
|
||||||
if task.retries < task.max_retries and not isinstance(error, exceptions.TaskFailedError):
|
if task.retries < task.max_retries and not isinstance(error, exceptions.TaskFailedError):
|
||||||
# Put task back in queue
|
# Put task back in queue
|
||||||
|
@ -133,18 +135,21 @@ class LocalRemoteExecutor:
|
||||||
nonloc.local_is_processing = False
|
nonloc.local_is_processing = False
|
||||||
|
|
||||||
if not task.finished:
|
if not task.finished:
|
||||||
if nonloc.semaphore: nonloc.semaphore.release()
|
if not acquire_semaphore_on_exit and nonloc.semaphore: nonloc.semaphore.release()
|
||||||
task.finished = True
|
task.finished = True
|
||||||
q.task_done()
|
q.task_done()
|
||||||
finally:
|
finally:
|
||||||
handle_result_mutex.release()
|
handle_result_mutex.release()
|
||||||
|
if acquire_semaphore_on_exit and nonloc.semaphore:
|
||||||
|
log.ODM_INFO("LRE: Waiting...")
|
||||||
|
nonloc.semaphore.acquire()
|
||||||
|
|
||||||
def worker():
|
def worker():
|
||||||
while True:
|
while True:
|
||||||
# If we've found a limit on the maximum number of tasks
|
# If we've found a limit on the maximum number of tasks
|
||||||
# a node can process, we block until some tasks have completed
|
# a node can process, we block until some tasks have completed
|
||||||
if nonloc.semaphore: nonloc.semaphore.acquire()
|
if nonloc.semaphore: nonloc.semaphore.acquire()
|
||||||
|
|
||||||
# Block until a new queue item is available
|
# Block until a new queue item is available
|
||||||
task = q.get()
|
task = q.get()
|
||||||
|
|
||||||
|
@ -187,6 +192,7 @@ class LocalRemoteExecutor:
|
||||||
system.exit_gracefully()
|
system.exit_gracefully()
|
||||||
|
|
||||||
# stop workers
|
# stop workers
|
||||||
|
if nonloc.semaphore: nonloc.semaphore.release()
|
||||||
q.put(None)
|
q.put(None)
|
||||||
|
|
||||||
# Wait for queue thread
|
# Wait for queue thread
|
||||||
|
|
|
@ -78,6 +78,8 @@ RUN pip install -U \
|
||||||
attrs==19.1.0 \
|
attrs==19.1.0 \
|
||||||
pyodm==1.5.0
|
pyodm==1.5.0
|
||||||
|
|
||||||
|
RUN pip install --upgrade cryptography && python -m easy_install --upgrade pyOpenSSL
|
||||||
|
|
||||||
ENV PYTHONPATH="$PYTHONPATH:/code/SuperBuild/install/lib/python2.7/dist-packages"
|
ENV PYTHONPATH="$PYTHONPATH:/code/SuperBuild/install/lib/python2.7/dist-packages"
|
||||||
ENV PYTHONPATH="$PYTHONPATH:/code/SuperBuild/src/opensfm"
|
ENV PYTHONPATH="$PYTHONPATH:/code/SuperBuild/src/opensfm"
|
||||||
ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/code/SuperBuild/install/lib"
|
ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/code/SuperBuild/install/lib"
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
python -m unittest discover tests "test_*.py"
|
|
@ -0,0 +1,62 @@
|
||||||
|
import time
|
||||||
|
import unittest
|
||||||
|
import threading
|
||||||
|
from opendm.remote import LocalRemoteExecutor, Task, NodeTaskLimitReachedException
|
||||||
|
from pyodm import Node
|
||||||
|
|
||||||
|
class TestRemote(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.lre = LocalRemoteExecutor('http://invalid-host:3000')
|
||||||
|
self.lre.set_projects(['/submodels/submodel_0000',
|
||||||
|
'/submodels/submodel_0001',
|
||||||
|
'/submodels/submodel_0002',
|
||||||
|
'/submodels/submodel_0003',
|
||||||
|
'/submodels/submodel_0004',
|
||||||
|
'/submodels/submodel_0005',
|
||||||
|
'/submodels/submodel_0006',
|
||||||
|
])
|
||||||
|
|
||||||
|
def test_lre_init(self):
|
||||||
|
self.assertFalse(self.lre.node_online)
|
||||||
|
|
||||||
|
def test_processing_logic(self):
|
||||||
|
# Fake online status
|
||||||
|
self.lre.node_online = True
|
||||||
|
|
||||||
|
MAX_QUEUE = 2
|
||||||
|
class nonloc:
|
||||||
|
local_task_check = False
|
||||||
|
remote_queue = 0
|
||||||
|
|
||||||
|
class TaskMock(Task):
|
||||||
|
def process_local(self):
|
||||||
|
# First task should be submodel_0000
|
||||||
|
if not nonloc.local_task_check: nonloc.local_task_check = self.project_path.endswith("0000")
|
||||||
|
time.sleep(3)
|
||||||
|
|
||||||
|
def process_remote(self, done):
|
||||||
|
time.sleep(0.2)
|
||||||
|
|
||||||
|
# Upload successful
|
||||||
|
done(error=None, partial=True)
|
||||||
|
|
||||||
|
# Async processing
|
||||||
|
def monitor():
|
||||||
|
nonloc.remote_queue += 1
|
||||||
|
time.sleep(0.3)
|
||||||
|
|
||||||
|
try:
|
||||||
|
if nonloc.remote_queue > MAX_QUEUE:
|
||||||
|
nonloc.remote_queue = 0
|
||||||
|
raise NodeTaskLimitReachedException("Delayed task limit reached")
|
||||||
|
done()
|
||||||
|
except Exception as e:
|
||||||
|
done(e)
|
||||||
|
|
||||||
|
t = threading.Thread(target=monitor)
|
||||||
|
self.params['threads'].append(t)
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
self.lre.run(TaskMock)
|
||||||
|
self.assertTrue(nonloc.local_task_check)
|
||||||
|
|
Ładowanie…
Reference in New Issue