Separate task count tracking

pull/979/head
Piero Toffanin 2019-05-09 14:14:31 -04:00
rodzic 196c62c9b4
commit f4c807e28f
1 zmienionych plików z 7 dodań i 5 usunięć

Wyświetl plik

@ -64,8 +64,9 @@ class LocalRemoteExecutor:
error = None error = None
local_is_processing = False local_is_processing = False
semaphore = None semaphore = None
handle_result_mutex = threading.Lock()
handle_result_mutex = threading.Lock()
unfinished_tasks = AtomicCounter(len(self.project_paths))
node_task_limit = AtomicCounter(0) node_task_limit = AtomicCounter(0)
# Create queue # Create queue
@ -86,7 +87,7 @@ class LocalRemoteExecutor:
def handle_result(task, local, error = None, partial=False): def handle_result(task, local, error = None, partial=False):
try: try:
nonloc.handle_result_mutex.acquire() handle_result_mutex.acquire()
release_semaphore = True release_semaphore = True
if error: if error:
@ -119,16 +120,17 @@ class LocalRemoteExecutor:
if not partial: if not partial:
log.ODM_INFO("LRE: %s finished successfully" % task) log.ODM_INFO("LRE: %s finished successfully" % task)
unfinished_tasks.increment(-1)
if local: if local:
nonloc.local_is_processing = False nonloc.local_is_processing = False
if not task.finished: if not task.finished:
if nonloc.semaphore and release_semaphore: nonloc.semaphore.release() if nonloc.semaphore and release_semaphore: nonloc.semaphore.release()
q.task_done()
task.finished = True task.finished = True
q.task_done()
finally: finally:
nonloc.handle_result_mutex.release() handle_result_mutex.release()
def worker(): def worker():
while True: while True:
@ -177,7 +179,7 @@ class LocalRemoteExecutor:
# block until all tasks are done (or CTRL+C) # block until all tasks are done (or CTRL+C)
try: try:
while q.unfinished_tasks > 0: while unfinished_tasks.value > 0:
time.sleep(0.5) time.sleep(0.5)
except KeyboardInterrupt: except KeyboardInterrupt:
log.ODM_WARNING("LRE: CTRL+C") log.ODM_WARNING("LRE: CTRL+C")