Semaphore improvement, better system cleanup callback system

pull/979/head
Piero Toffanin 2019-05-10 12:33:16 -04:00
rodzic 9c0cbf5c0d
commit 6f47e84a4d
4 zmienionych plików z 49 dodań i 34 usunięć

Wyświetl plik

@ -568,8 +568,8 @@ def config():
args.pc_classify = True
if args.skip_3dmodel and args.use_3dmesh:
log.ODM_WARNING('--skip-3dmodel is set, but so is --use-3dmesh. --use_3dmesh will be ignored.')
args.use_3dmesh = False
log.ODM_WARNING('--skip-3dmodel is set, but so is --use-3dmesh. --skip-3dmodel will be ignored.')
args.skip_3dmodel = False
if args.orthophoto_cutline and not args.crop:
log.ODM_WARNING("--orthophoto-cutline is set, but --crop is not. --crop will be set to 0.01")

Wyświetl plik

@ -187,7 +187,7 @@ def get_submodel_argv(project_name = None, submodels_path = None, submodel_name
removing --gcp (the GCP path if specified is always "gcp_list.txt")
"""
assure_always = ['--orthophoto-cutline', '--dem-euclidean-map', '--skip-3dmodel']
remove_always_2 = ['--split', '--rerun-from', '--rerun', '--gcp', '--end-with', '--sm-cluster']
remove_always_2 = ['--split', '--split-overlap', '--rerun-from', '--rerun', '--gcp', '--end-with', '--sm-cluster']
remove_always_1 = ['--rerun-all']
argv = sys.argv

Wyświetl plik

@ -82,30 +82,31 @@ class LocalRemoteExecutor:
log.ODM_WARNING("LRE: No remote tasks to cleanup")
for task in self.params['tasks']:
log.ODM_DEBUG("Removing remote task %s... %s" % (task.uuid, 'OK' if task.remove() else 'FAILED'))
os._exit(1)
try:
removed = task.remove()
except exceptions.OdmError:
removed = False
log.ODM_DEBUG("Removing remote task %s... %s" % (task.uuid, 'OK' if removed else 'NO'))
def handle_result(task, local, error = None, partial=False):
try:
handle_result_mutex.acquire()
release_semaphore = True
if error:
if isinstance(error, NodeTaskLimitReachedException) and not nonloc.semaphore and node_task_limit.value > 0:
sem_value = max(1, node_task_limit.value - 1)
nonloc.semaphore = threading.Semaphore(sem_value)
log.ODM_DEBUG("LRE: Node task limit reached. Setting semaphore to %s" % sem_value)
for i in range(sem_value):
nonloc.semaphore.acquire()
release_semaphore = False
log.ODM_WARNING("LRE: %s failed with: %s" % (task, str(error)))
# Special case in which the error is caused by a SIGTERM signal
# this means a local processing was terminated either by CTRL+C or
# by canceling the task.
if str(error) == "Child was terminated by signal 15":
cleanup_remote_tasks_and_exit()
if isinstance(error, NodeTaskLimitReachedException) and not nonloc.semaphore and node_task_limit.value > 0:
sem_value = max(1, node_task_limit.value - 1)
nonloc.semaphore = threading.Semaphore(sem_value)
log.ODM_DEBUG("LRE: Node task limit reached. Setting semaphore to %s and waiting..." % sem_value)
for i in range(sem_value + 1):
nonloc.semaphore.acquire() # This will block until a task has finished
# Retry, but only if the error is not related to a task failure
if task.retries < task.max_retries and not isinstance(error, exceptions.TaskFailedError):
@ -132,7 +133,7 @@ class LocalRemoteExecutor:
nonloc.local_is_processing = False
if not task.finished:
if nonloc.semaphore and release_semaphore: nonloc.semaphore.release()
if nonloc.semaphore: nonloc.semaphore.release()
task.finished = True
q.task_done()
finally:
@ -143,8 +144,10 @@ class LocalRemoteExecutor:
# If we've found a limit on the maximum number of tasks
# a node can process, we block until some tasks have completed
if nonloc.semaphore: nonloc.semaphore.acquire()
# Block until a new queue item is available
task = q.get()
if task is None or nonloc.error is not None:
q.task_done()
if nonloc.semaphore: nonloc.semaphore.release()
@ -170,15 +173,7 @@ class LocalRemoteExecutor:
# Create queue thread
t = threading.Thread(target=worker)
# Capture SIGTERM so that we can
# attempt to cleanup if the process is terminated
original_sigterm_handler = signal.getsignal(signal.SIGTERM)
def sigterm_handler(signum, frame):
log.ODM_WARNING("LRE: Caught SIGTERM")
cleanup_remote_tasks_and_exit()
signal.signal(signal.SIGTERM, sigterm_handler)
system.add_cleanup_callback(cleanup_remote_tasks_and_exit)
# Start worker process
t.start()
@ -189,7 +184,7 @@ class LocalRemoteExecutor:
time.sleep(0.5)
except KeyboardInterrupt:
log.ODM_WARNING("LRE: CTRL+C")
cleanup_remote_tasks_and_exit()
system.exit_gracefully()
# stop workers
q.put(None)
@ -200,9 +195,8 @@ class LocalRemoteExecutor:
# Wait for all remains threads
for thrds in self.params['threads']:
thrds.join()
# restore SIGTERM handler
signal.signal(signal.SIGTERM, original_sigterm_handler)
system.remove_cleanup_callback(cleanup_remote_tasks_and_exit)
if nonloc.error is not None:
# Try not to leak access token

Wyświetl plik

@ -18,19 +18,40 @@ def get_ccd_widths():
return dict(zip(map(string.lower, sensor_data.keys()), sensor_data.values()))
running_subprocesses = []
def exit_gracefully(signum, frame):
cleanup_callbacks = []
def add_cleanup_callback(func):
global cleanup_callbacks
cleanup_callbacks.append(func)
def remove_cleanup_callback(func):
global cleanup_callbacks
try:
cleanup_callbacks.remove(func)
except ValueError as e:
log.ODM_EXCEPTION("Tried to remove %s from cleanup_callbacks but got: %s" % (str(func), str(e)))
def exit_gracefully():
global running_subprocesses
global cleanup_callbacks
log.ODM_WARNING("Caught TERM/INT signal, attempting to exit gracefully...")
for cb in cleanup_callbacks:
cb()
for sp in running_subprocesses:
log.ODM_WARNING("Sending TERM signal to PID %s..." % sp.pid)
os.killpg(os.getpgid(sp.pid), signal.SIGTERM)
exit(1)
os._exit(1)
signal.signal(signal.SIGINT, exit_gracefully)
signal.signal(signal.SIGTERM, exit_gracefully)
def sighandler(signum, frame):
exit_gracefully()
signal.signal(signal.SIGINT, sighandler)
signal.signal(signal.SIGTERM, sighandler)
def run(cmd, env_paths=[context.superbuild_bin_path], env_vars={}):
"""Run a system command"""