kopia lustrzana https://github.com/OpenDroneMap/ODM
100 wiersze
3.5 KiB
Python
100 wiersze
3.5 KiB
Python
import time
|
|
import unittest
|
|
import threading
|
|
import random
|
|
from opendm.remote import LocalRemoteExecutor, Task, NodeTaskLimitReachedException
|
|
from pyodm import Node, exceptions
|
|
from pyodm.types import TaskStatus
|
|
|
|
class TestRemote(unittest.TestCase):
|
|
def setUp(self):
|
|
self.lre = LocalRemoteExecutor('http://invalid-host:3000')
|
|
|
|
projects = []
|
|
for i in range(9):
|
|
projects.append('/submodels/submodel_00' + str(i).rjust(2, '0'))
|
|
self.lre.set_projects(projects)
|
|
|
|
def test_lre_init(self):
|
|
self.assertFalse(self.lre.node_online)
|
|
|
|
def test_processing_logic(self):
|
|
# Fake online status
|
|
self.lre.node_online = True
|
|
|
|
MAX_QUEUE = 2
|
|
class nonloc:
|
|
local_task_check = False
|
|
remote_queue = 1
|
|
should_fail = False
|
|
task_limit_reached = False
|
|
|
|
class OdmTaskMock:
|
|
def __init__(self, running, queue_num):
|
|
self.running = running
|
|
self.queue_num = queue_num
|
|
self.uuid = 'xxxxx-xxxxx-xxxxx-xxxxx-xxxx' + str(queue_num)
|
|
|
|
def info(self):
|
|
class StatusMock:
|
|
status = TaskStatus.RUNNING if self.running else TaskStatus.QUEUED
|
|
processing_time = 1
|
|
return StatusMock()
|
|
|
|
def remove(self):
|
|
return True
|
|
|
|
class TaskMock(Task):
|
|
def process_local(self):
|
|
# First task should be 0000 or 0001
|
|
if not nonloc.local_task_check: nonloc.local_task_check = self.project_path.endswith("0000") or self.project_path.endswith("0001")
|
|
time.sleep(1)
|
|
|
|
def process_remote(self, done):
|
|
time.sleep(0.05) # file upload
|
|
|
|
self.remote_task = OdmTaskMock(nonloc.remote_queue <= MAX_QUEUE, nonloc.remote_queue)
|
|
self.params['tasks'].append(self.remote_task)
|
|
|
|
if nonloc.should_fail:
|
|
if self.project_path.endswith("0006"):
|
|
raise exceptions.TaskFailedError("FAIL #6")
|
|
|
|
nonloc.remote_queue += 1
|
|
|
|
# Upload successful
|
|
done(error=None, partial=True)
|
|
|
|
# Async processing
|
|
def monitor():
|
|
try:
|
|
if nonloc.task_limit_reached and random.randint(0, 4) == 0:
|
|
nonloc.remote_queue -= 1
|
|
raise NodeTaskLimitReachedException("Random fail!")
|
|
|
|
if not nonloc.task_limit_reached and self.remote_task.queue_num > MAX_QUEUE:
|
|
nonloc.remote_queue -= 1
|
|
nonloc.task_limit_reached = True
|
|
raise NodeTaskLimitReachedException("Delayed task limit reached")
|
|
time.sleep(0.5)
|
|
nonloc.remote_queue -= 1
|
|
done()
|
|
except Exception as e:
|
|
done(e)
|
|
|
|
t = threading.Thread(target=monitor)
|
|
self.params['threads'].append(t)
|
|
t.start()
|
|
|
|
self.lre.run(TaskMock)
|
|
self.assertTrue(nonloc.local_task_check)
|
|
|
|
nonloc.should_fail = True
|
|
nonloc.remote_queue = 1
|
|
nonloc.task_limit_reached = False
|
|
|
|
with self.assertRaises(exceptions.TaskFailedError):
|
|
self.lre.run(TaskMock)
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main() |