SIGTERM handling

Former-commit-id: bbff4a1246
pull/1161/head
Piero Toffanin 2019-05-07 16:07:39 -04:00
rodzic d712c16571
commit f80163d331
1 zmienionych plików z 37 dodań i 7 usunięć

Wyświetl plik

@ -1,7 +1,9 @@
import time import time
import datetime import datetime
import os import os
import sys
import threading import threading
import signal
from opendm import log from opendm import log
from pyodm import Node, exceptions from pyodm import Node, exceptions
from pyodm.utils import AtomicCounter from pyodm.utils import AtomicCounter
@ -22,7 +24,7 @@ class LocalRemoteExecutor:
def __init__(self, nodeUrl): def __init__(self, nodeUrl):
self.node = Node.from_url(nodeUrl) self.node = Node.from_url(nodeUrl)
log.ODM_INFO("LRE: Initializing using cluster node %s" % nodeUrl) log.ODM_INFO("LRE: Initializing using cluster node %s:%s" % (self.node.host, self.node.port))
try: try:
odm_version = self.node.info().odm_version odm_version = self.node.info().odm_version
log.ODM_INFO("LRE: Node is online and running ODM version: %s" % odm_version) log.ODM_INFO("LRE: Node is online and running ODM version: %s" % odm_version)
@ -109,19 +111,46 @@ class LocalRemoteExecutor:
node_task_limit.increment() # Called after upload, but before processing is started node_task_limit.increment() # Called after upload, but before processing is started
except Exception as e: except Exception as e:
handle_result(task, False, e) handle_result(task, False, e)
# Define thread
t = threading.Thread(target=worker) t = threading.Thread(target=worker)
def cleanup_remote_tasks():
log.ODM_WARNING("LRE: Attempting to cleanup remote tasks")
pass # TODO
# Capture SIGTERM so that we can
# attempt to cleanup if the process is terminated
original_sigterm_handler = signal.getsignal(signal.SIGTERM)
def sigterm_handler(signum, frame):
log.ODM_WARNING("LRE: Caught SIGTERM")
cleanup_remote_tasks()
os._exit(1)
signal.signal(signal.SIGTERM, sigterm_handler)
# Start worker process
t.start() t.start()
# block until all tasks are done # block until all tasks are done (or CTRL+C)
q.join() try:
while q.unfinished_tasks > 0:
time.sleep(0.5)
except KeyboardInterrupt:
log.ODM_WARNING("LRE: CTRL+C")
cleanup_remote_tasks()
os._exit(1)
# stop workers # stop workers
q.put(None) q.put(None)
# Wait for thread
t.join() t.join()
# restore SIGTERM handler
signal.signal(signal.SIGTERM, original_sigterm_handler)
if nonloc.error is not None: if nonloc.error is not None:
raise nonloc.error raise nonloc.error
@ -169,7 +198,7 @@ class Task:
def process_remote(self, done): def process_remote(self, done):
raise NotImplementedError() raise NotImplementedError()
def __str__(self): def __str__(self):
return os.path.basename(self.project_path) return os.path.basename(self.project_path)
@ -196,3 +225,4 @@ class ReconstructionTask(Task):
else: else:
print("Process remote: " + self.project_path) print("Process remote: " + self.project_path)
done() done()