Fixed race condition

Former-commit-id: 4b59638e5f
pull/1161/head
Piero Toffanin 2019-05-16 17:02:32 -04:00
rodzic a342d23bc9
commit 068be4ea5f
2 zmienionych plików z 35 dodań i 26 usunięć

Wyświetl plik

@ -106,24 +106,27 @@ class LocalRemoteExecutor:
if str(error) == "Child was terminated by signal 15":
system.exit_gracefully()
if isinstance(error, NodeTaskLimitReachedException) and not nonloc.semaphore:
created_semaphore = False
if isinstance(error, NodeTaskLimitReachedException):
# Estimate the maximum number of tasks based on how many tasks
# are currently running
with calculate_task_limit_lock:
node_task_limit = 0
for t in self.params['tasks']:
try:
info = t.info()
if info.status == TaskStatus.RUNNING and info.processing_time >= 0:
node_task_limit += 1
except exceptions.OdmError:
pass
if not nonloc.semaphore:
node_task_limit = 0
for t in self.params['tasks']:
try:
info = t.info()
if info.status == TaskStatus.RUNNING and info.processing_time >= 0:
node_task_limit += 1
except exceptions.OdmError:
pass
sem_value = max(1, node_task_limit)
nonloc.semaphore = threading.Semaphore(sem_value)
log.ODM_DEBUG("LRE: Node task limit reached. Setting semaphore to %s" % sem_value)
for i in range(sem_value):
nonloc.semaphore.acquire()
sem_value = max(1, node_task_limit)
nonloc.semaphore = threading.Semaphore(sem_value)
log.ODM_DEBUG("LRE: Node task limit reached. Setting semaphore to %s" % sem_value)
for i in range(sem_value):
nonloc.semaphore.acquire()
created_semaphore = True
# Retry, but only if the error is not related to a task failure
if task.retries < task.max_retries and not isinstance(error, exceptions.TaskFailedError):
@ -137,6 +140,7 @@ class LocalRemoteExecutor:
q.task_done()
log.ODM_DEBUG("LRE: Re-queueing %s (retries: %s)" % (task, task.retries))
if not created_semaphore and nonloc.semaphore: nonloc.semaphore.release()
q.put(task)
return
else:

Wyświetl plik

@ -1,6 +1,7 @@
import time
import unittest
import threading
import random
from opendm.remote import LocalRemoteExecutor, Task, NodeTaskLimitReachedException
from pyodm import Node, exceptions
from pyodm.types import TaskStatus
@ -8,14 +9,11 @@ from pyodm.types import TaskStatus
class TestRemote(unittest.TestCase):
def setUp(self):
self.lre = LocalRemoteExecutor('http://invalid-host:3000')
self.lre.set_projects(['/submodels/submodel_0000',
'/submodels/submodel_0001',
'/submodels/submodel_0002',
'/submodels/submodel_0003',
'/submodels/submodel_0004',
'/submodels/submodel_0005',
'/submodels/submodel_0006',
])
projects = []
for i in range(9):
projects.append('/submodels/submodel_00' + str(i).rjust(2, '0'))
self.lre.set_projects(projects)
def test_lre_init(self):
self.assertFalse(self.lre.node_online)
@ -29,6 +27,7 @@ class TestRemote(unittest.TestCase):
local_task_check = False
remote_queue = 1
should_fail = False
task_limit_reached = False
class OdmTaskMock:
def __init__(self, running, queue_num):
@ -60,7 +59,7 @@ class TestRemote(unittest.TestCase):
if nonloc.should_fail:
if self.project_path.endswith("0006"):
raise exceptions.TaskFailedError("FAIL #6")
nonloc.remote_queue += 1
# Upload successful
@ -68,11 +67,14 @@ class TestRemote(unittest.TestCase):
# Async processing
def monitor():
time.sleep(0.2)
try:
if self.remote_task.queue_num > MAX_QUEUE:
if nonloc.task_limit_reached and random.randint(0, 4) == 0:
nonloc.remote_queue -= 1
raise NodeTaskLimitReachedException("Random fail!")
if not nonloc.task_limit_reached and self.remote_task.queue_num > MAX_QUEUE:
nonloc.remote_queue -= 1
nonloc.task_limit_reached = True
raise NodeTaskLimitReachedException("Delayed task limit reached")
time.sleep(0.5)
nonloc.remote_queue -= 1
@ -88,6 +90,9 @@ class TestRemote(unittest.TestCase):
self.assertTrue(nonloc.local_task_check)
nonloc.should_fail = True
nonloc.remote_queue = 1
nonloc.task_limit_reached = False
with self.assertRaises(exceptions.TaskFailedError):
self.lre.run(TaskMock)