OpenDroneMap-ODM/tests/test_remote.py

106 wiersze
3.7 KiB
Python

import time
import unittest
import threading
import random
from opendm.remote import LocalRemoteExecutor, Task, NodeTaskLimitReachedException
from pyodm import Node, exceptions
from pyodm.types import TaskStatus
class TestRemote(unittest.TestCase):
def setUp(self):
self.lre = LocalRemoteExecutor('http://localhost:9001')
projects = []
for i in range(9):
projects.append('/submodels/submodel_00' + str(i).rjust(2, '0'))
self.lre.set_projects(projects)
def test_lre_init(self):
self.assertFalse(self.lre.node_online)
def test_processing_logic(self):
# Fake online status
self.lre.node_online = True
MAX_QUEUE = 2
class nonloc:
local_task_check = False
remote_queue = 1
should_fail = False
task_limit_reached = False
class OdmTaskMock:
def __init__(self, running, queue_num):
self.running = running
self.queue_num = queue_num
self.uuid = 'xxxxx-xxxxx-xxxxx-xxxxx-xxxx' + str(queue_num)
def info(self, with_output=None):
class StatusMock:
status = TaskStatus.RUNNING if self.running else TaskStatus.QUEUED
processing_time = 1
output = "test output"
return StatusMock()
def remove(self):
return True
class TaskMock(Task):
def process_local(self):
# First task should be 0000 or 0001
if not nonloc.local_task_check: nonloc.local_task_check = self.project_path.endswith("0000") or self.project_path.endswith("0001")
if nonloc.should_fail:
if self.project_path.endswith("0006"):
raise exceptions.TaskFailedError("FAIL #6")
time.sleep(1)
def process_remote(self, done):
time.sleep(0.05) # file upload
self.remote_task = OdmTaskMock(nonloc.remote_queue <= MAX_QUEUE, nonloc.remote_queue)
self.params['tasks'].append(self.remote_task)
if nonloc.should_fail:
if self.project_path.endswith("0006"):
raise exceptions.TaskFailedError("FAIL #6")
nonloc.remote_queue += 1
# Upload successful
done(error=None, partial=True)
# Async processing
def monitor():
try:
if nonloc.task_limit_reached and random.randint(0, 4) == 0:
nonloc.remote_queue -= 1
raise NodeTaskLimitReachedException("Random fail!")
if not nonloc.task_limit_reached and self.remote_task.queue_num > MAX_QUEUE:
nonloc.remote_queue -= 1
nonloc.task_limit_reached = True
raise NodeTaskLimitReachedException("Delayed task limit reached")
time.sleep(0.5)
nonloc.remote_queue -= 1
done()
except Exception as e:
done(e)
t = threading.Thread(target=monitor)
self.params['threads'].append(t)
t.start()
self.lre.run(TaskMock)
self.assertTrue(nonloc.local_task_check)
nonloc.should_fail = True
nonloc.remote_queue = 1
nonloc.task_limit_reached = False
with self.assertRaises(exceptions.TaskFailedError):
self.lre.run(TaskMock)
if __name__ == '__main__':
unittest.main()