OpenDroneMap-ODM/tests/test_remote.py

106 wiersze
3.7 KiB
Python
Czysty Zwykły widok Historia

import time
import unittest
import threading
import random
from opendm.remote import LocalRemoteExecutor, Task, NodeTaskLimitReachedException
from pyodm import Node, exceptions
from pyodm.types import TaskStatus
class TestRemote(unittest.TestCase):
def setUp(self):
self.lre = LocalRemoteExecutor('http://localhost:9001')
projects = []
for i in range(9):
projects.append('/submodels/submodel_00' + str(i).rjust(2, '0'))
self.lre.set_projects(projects)
def test_lre_init(self):
self.assertFalse(self.lre.node_online)
def test_processing_logic(self):
# Fake online status
self.lre.node_online = True
MAX_QUEUE = 2
class nonloc:
local_task_check = False
remote_queue = 1
should_fail = False
task_limit_reached = False
class OdmTaskMock:
def __init__(self, running, queue_num):
self.running = running
self.queue_num = queue_num
self.uuid = 'xxxxx-xxxxx-xxxxx-xxxxx-xxxx' + str(queue_num)
def info(self, with_output=None):
class StatusMock:
status = TaskStatus.RUNNING if self.running else TaskStatus.QUEUED
processing_time = 1
output = "test output"
return StatusMock()
def remove(self):
return True
class TaskMock(Task):
def process_local(self):
# First task should be 0000 or 0001
if not nonloc.local_task_check: nonloc.local_task_check = self.project_path.endswith("0000") or self.project_path.endswith("0001")
if nonloc.should_fail:
if self.project_path.endswith("0006"):
raise exceptions.TaskFailedError("FAIL #6")
time.sleep(1)
def process_remote(self, done):
time.sleep(0.05) # file upload
self.remote_task = OdmTaskMock(nonloc.remote_queue <= MAX_QUEUE, nonloc.remote_queue)
self.params['tasks'].append(self.remote_task)
if nonloc.should_fail:
if self.project_path.endswith("0006"):
raise exceptions.TaskFailedError("FAIL #6")
nonloc.remote_queue += 1
# Upload successful
done(error=None, partial=True)
# Async processing
def monitor():
try:
if nonloc.task_limit_reached and random.randint(0, 4) == 0:
nonloc.remote_queue -= 1
raise NodeTaskLimitReachedException("Random fail!")
if not nonloc.task_limit_reached and self.remote_task.queue_num > MAX_QUEUE:
nonloc.remote_queue -= 1
nonloc.task_limit_reached = True
raise NodeTaskLimitReachedException("Delayed task limit reached")
time.sleep(0.5)
nonloc.remote_queue -= 1
done()
except Exception as e:
done(e)
t = threading.Thread(target=monitor)
self.params['threads'].append(t)
t.start()
self.lre.run(TaskMock)
self.assertTrue(nonloc.local_task_check)
nonloc.should_fail = True
nonloc.remote_queue = 1
nonloc.task_limit_reached = False
with self.assertRaises(exceptions.TaskFailedError):
self.lre.run(TaskMock)
if __name__ == '__main__':
unittest.main()