From f6afade7524ced271df46afaa241033c93bf06b1 Mon Sep 17 00:00:00 2001 From: Piero Toffanin Date: Wed, 8 May 2019 16:19:05 -0400 Subject: [PATCH] Custom logging class, split task upload, partial results --- opendm/dem/merge.py | 12 ---- opendm/log.py | 40 ++++++++----- opendm/osfm.py | 4 +- opendm/remote.py | 133 +++++++++++++++++++++++++++++++----------- run.py | 1 - stages/run_opensfm.py | 6 +- stages/splitmerge.py | 8 +-- 7 files changed, 133 insertions(+), 71 deletions(-) diff --git a/opendm/dem/merge.py b/opendm/dem/merge.py index 02ec2cd8..89d0528e 100644 --- a/opendm/dem/merge.py +++ b/opendm/dem/merge.py @@ -8,8 +8,6 @@ from opendm import log from opendm import io import os -from rasterio import logging - def euclidean_merge_dems(input_dems, output_dem, creation_options={}): """ Based on https://github.com/mapbox/rio-merge-rgba @@ -35,12 +33,6 @@ def euclidean_merge_dems(input_dems, output_dem, creation_options={}): log.ODM_WARNING("No input DEMs, skipping euclidean merge.") return - # Suppress DEBUG logging for rasterio operations - log_ = logging.getLogger() - debug_enabled = log_.isEnabledFor(logging.DEBUG) - if debug_enabled: - logging.disable(logging.DEBUG) - with rasterio.open(existing_dems[0]) as first: src_nodata = first.nodatavals[0] res = first.res @@ -169,8 +161,4 @@ def euclidean_merge_dems(input_dems, output_dem, creation_options={}): dstrast.write(dstarr, window=dst_window) - # Restore logging - if debug_enabled: - logging.disable(logging.NOTSET) - return output_dem \ No newline at end of file diff --git a/opendm/log.py b/opendm/log.py index 10e6d0ae..335553d7 100644 --- a/opendm/log.py +++ b/opendm/log.py @@ -1,5 +1,3 @@ -import logging - HEADER = '\033[95m' OKBLUE = '\033[94m' OKGREEN = '\033[92m' @@ -7,19 +5,31 @@ WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' -def init(): - # TODO add file handling - logging.addLevelName(logging.INFO, '%s[%s]' % (OKBLUE, logging.getLevelName(logging.INFO))) - logging.addLevelName(logging.WARNING, '%s[%s]' % (WARNING, logging.getLevelName(logging.WARNING))) - logging.addLevelName(logging.ERROR, '%s[%s]' % (FAIL, logging.getLevelName(logging.ERROR))) - logging.addLevelName(logging.DEBUG, '%s[%s]' % (OKGREEN, logging.getLevelName(logging.DEBUG))) +# logging has too many quirks... +class ODMLogger: + def log(self, startc, msg, level_name): + level = ("[" + level_name + "]").ljust(9) + print("%s%s %s%s" % (startc, level, msg, ENDC)) - logging.basicConfig(level=logging.DEBUG, - format='%(levelname)-14s %(message)s' + ENDC) + def info(self, msg): + self.log(OKBLUE, msg, "INFO") + def warning(self, msg): + self.log(WARNING, msg, "WARNING") -ODM_INFO = logging.info -ODM_WARNING = logging.warning -ODM_ERROR = logging.error -ODM_EXCEPTION = logging.exception -ODM_DEBUG = logging.debug + def error(self, msg): + self.log(FAIL, msg, "ERROR") + + def exception(self, msg): + self.log(FAIL, msg, "EXCEPTION") + + def debug(self, msg): + self.log(OKGREEN, msg, "DEBUG") + +logger = ODMLogger() + +ODM_INFO = logger.info +ODM_WARNING = logger.warning +ODM_ERROR = logger.error +ODM_EXCEPTION = logger.exception +ODM_DEBUG = logger.debug diff --git a/opendm/osfm.py b/opendm/osfm.py index 0a490cda..a957e8fb 100644 --- a/opendm/osfm.py +++ b/opendm/osfm.py @@ -70,8 +70,6 @@ class OSFMContext: has_alt = False fout.write('%s\n' % io.join_paths(images_path, photo.filename)) - # TODO: does this need to be a relative path? - # create config file for OpenSfM config = [ "use_exif_size: no", @@ -105,7 +103,7 @@ class OSFMContext: if gcp_path: config.append("bundle_use_gcp: yes") - io.copy(gcp_path, self.opensfm_project_path) + io.copy(gcp_path, self.path("gcp_list.txt")) config = config + append_config diff --git a/opendm/remote.py b/opendm/remote.py index d54a6312..cd412235 100644 --- a/opendm/remote.py +++ b/opendm/remote.py @@ -5,9 +5,11 @@ import sys import threading import signal import zipfile +import glob from opendm import log from pyodm import Node, exceptions from pyodm.utils import AtomicCounter +from pyodm.types import TaskStatus from osfm import OSFMContext try: @@ -26,6 +28,7 @@ class LocalRemoteExecutor: def __init__(self, nodeUrl): self.node = Node.from_url(nodeUrl) self.node.tasks = [] + self.node_online = True log.ODM_INFO("LRE: Initializing using cluster node %s:%s" % (self.node.host, self.node.port)) try: @@ -33,6 +36,7 @@ class LocalRemoteExecutor: log.ODM_INFO("LRE: Node is online and running ODM version: %s" % odm_version) except exceptions.NodeConnectionError: log.ODM_WARNING("LRE: The node seems to be offline! We'll still process the dataset, but it's going to run entirely locally.") + self.node_online = False except Exception as e: log.ODM_ERROR("LRE: An unexpected problem happened while opening the node connection: %s" % str(e)) exit(1) @@ -61,11 +65,10 @@ class LocalRemoteExecutor: def cleanup_remote_tasks_and_exit(): log.ODM_WARNING("LRE: Attempting to cleanup remote tasks") for task in self.node.tasks: - pass # TODO!!! - #task.cancel() + log.ODM_DEBUG("Removing remote task %s... %s" % (task.uuid, task.remove())) os._exit(1) - def handle_result(task, local, error = None): + def handle_result(task, local, error = None, partial=False): if error: if isinstance(error, NodeTaskLimitReachedException) and not nonloc.semaphore: nonloc.semaphore = threading.Semaphore(node_task_limit.value) @@ -90,10 +93,11 @@ class LocalRemoteExecutor: else: nonloc.error = error else: - if not local: + if not local and not partial: node_task_limit.increment(-1) - log.ODM_INFO("LRE: %s finished successfully" % task) + if not partial: + log.ODM_INFO("LRE: %s finished successfully" % task) if local: nonloc.local_is_processing = False @@ -113,7 +117,7 @@ class LocalRemoteExecutor: if nonloc.semaphore: nonloc.semaphore.release() break - if not nonloc.local_is_processing: + if not nonloc.local_is_processing or not self.node_online: # Process local try: nonloc.local_is_processing = True @@ -184,8 +188,8 @@ class Task: self.local = None def process(self, local, done): - def handle_result(error = None): - done(self, local, error) + def handle_result(error = None, partial=False): + done(self, local, error, partial) log.ODM_INFO("LRE: About to process %s %s" % (self, 'locally' if local else 'remotely')) @@ -204,9 +208,12 @@ class Task: # perhaps this wouldn't be a big speedup. self._process_remote(handle_result) # Block until upload is complete - def create_seed_payload(self, paths): - paths = filter(os.path.exists, map(lambda p: os.path.join(self.project_path, p), paths)) - outfile = os.path.join(self.project_path, "seed.zip") + def path(self, *paths): + return os.path.join(self.project_path, *paths) + + def create_seed_payload(self, paths, touch_files=[]): + paths = filter(os.path.exists, map(lambda p: self.path(p), paths)) + outfile = self.path("seed.zip") with zipfile.ZipFile(outfile, "w", compression=zipfile.ZIP_DEFLATED) as zf: for p in paths: @@ -218,21 +225,27 @@ class Task: zf.write(filename, os.path.relpath(filename, self.project_path)) else: zf.write(p, os.path.relpath(p, self.project_path)) + + for tf in touch_files: + zf.writestr(tf, "") + return outfile def _process_local(self, done): try: - self.process_local(done) + self.process_local() + done() except Exception as e: done(e) def _process_remote(self, done): try: self.process_remote(done) + done(error=None, partial=True) # Upload is completed, but processing is not (partial) except Exception as e: done(e) - def process_local(self, done): + def process_local(self): raise NotImplementedError() def process_remote(self, done): @@ -242,33 +255,85 @@ class Task: return os.path.basename(self.project_path) class ReconstructionTask(Task): - def process_local(self, done): - octx = OSFMContext(os.path.join(self.project_path, "opensfm")) + def process_local(self): + octx = OSFMContext(self.path("opensfm")) octx.feature_matching() octx.reconstruct() - done() def process_remote(self, done): + seed_file = self.create_seed_payload(["opensfm/exif", + "opensfm/camera_models.json", + "opensfm/reference_lla.json"], touch_files=["opensfm/split_merge_stop_at_reconstruction.txt"]) + + # Find all images + images = glob.glob(self.path("images/**")) - def test(): - time.sleep(4) - done() + # Add GCP (optional) + if os.path.exists(self.path("gcp_list.txt")): + images.append(self.path("gcp_list.txt")) + + # Add seed file + images.append(seed_file) - self.create_seed_payload(["opensfm/exif", - "opensfm/matches", - "opensfm/features", - "opensfm/camera_models.json", - "opensfm/reference_lla.json"]) + def print_progress(percentage): + # if percentage % 10 == 0: + log.ODM_DEBUG("LRE: Upload of %s at [%s%]" % (self, percentage)) + + # Upload task + task = self.node.create_task(images, + {'dsm': True, 'orthophoto-resolution': 4}, # TODO + progress_callback=print_progress, + skip_post_processing=True, + outputs=["opensfm/matches", + "opensfm/features",]) - if self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0001': - done(Exception("TEST EXCEPTION!" + self.project_path)) - elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0002': - done(NodeTaskLimitReachedException("Limit reached")) - elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0003': - threading.Thread(target=test).start() - elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0004': - threading.Thread(target=test).start() + # Keep track of tasks for cleanup + self.node.tasks.append(task) + + # Check status + info = task.info() + if info.status == TaskStatus.RUNNING: + def monitor(): + # If a task switches from RUNNING to QUEUED, then we need to + # stop the process and re-add the task to the queue. + def status_callback(info): + if info.status == TaskStatus.QUEUED: + log.ODM_WARNING("%s (%s) turned from RUNNING to QUEUED. Re-adding to back of the queue." % (self, task.uuid)) + task.remove() + done(NodeTaskLimitReachedException("Delayed task limit reached"), partial=True) + + try: + def print_progress(progress): + log.ODM_DEBUG("LRE: Download of %s at [%s%]" % (self, percentage)) + + task.wait_for_completion(status_callback=status_callback) + log.ODM_DEBUG("Downloading assets for %s" % self) + task.download_assets(self.project_path, progress_callback=print_progress) + done() + except Exception as e: + done(e) + + # Launch monitor thread and return + threading.Thread(target=monitor).start() + elif info.status == TaskStatus.QUEUED: + raise NodeTaskLimitReachedException("Task limit reached") else: - print("Process remote: " + self.project_path) - done() + raise Exception("Could not send task to node, task status set to %s" % str(info.status)) + + + # def test(): + # time.sleep(4) + # done() + + # if self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0001': + # done(Exception("TEST EXCEPTION!" + self.project_path)) + # elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0002': + # done(NodeTaskLimitReachedException("Limit reached")) + # elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0003': + # threading.Thread(target=test).start() + # elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0004': + # threading.Thread(target=test).start() + # else: + # print("Process remote: " + self.project_path) + # done() diff --git a/run.py b/run.py index 00e5df59..17aea0d4 100755 --- a/run.py +++ b/run.py @@ -11,7 +11,6 @@ from pipes import quote from stages.odm_app import ODMApp if __name__ == '__main__': - log.init() args = config.config() log.ODM_INFO('Initializing OpenDroneMap app - %s' % system.now()) diff --git a/stages/run_opensfm.py b/stages/run_opensfm.py index 65a9fbcf..206f5ec9 100644 --- a/stages/run_opensfm.py +++ b/stages/run_opensfm.py @@ -26,8 +26,10 @@ class ODMOpenSfMStage(types.ODM_Stage): octx.feature_matching(self.rerun()) octx.reconstruct(self.rerun()) - # TODO: add special logic for distributed large workflow - # and terminate stage early + # If we find a special flag file for split/merge we stop right here + if os.path.exists(octx.path("split_merge_stop_at_reconstruction.txt")): + log.ODM_INFO("Stopping OpenSfM early because we found: %s" % octx.path("split_merge_stop_at_reconstruction.txt")) + return if args.fast_orthophoto: output_file = octx.path('reconstruction.ply') diff --git a/stages/splitmerge.py b/stages/splitmerge.py index cb90b8fd..e324ac57 100644 --- a/stages/splitmerge.py +++ b/stages/splitmerge.py @@ -12,7 +12,7 @@ from opendm.dem.merge import euclidean_merge_dems from opensfm.large import metadataset from opendm.cropper import Cropper from opendm.concurrency import get_max_memory -from opendm.remote import HybridDistributedExecutor +from opendm.remote import LocalRemoteExecutor from pipes import quote class ODMSplitStage(types.ODM_Stage): @@ -86,9 +86,9 @@ class ODMSplitStage(types.ODM_Stage): log.ODM_INFO("Reconstructing %s" % sp) OSFMContext(sp).reconstruct(self.rerun()) else: - de = HybridDistributedExecutor(args.sm_cluster) - de.set_projects([os.path.abspath(os.path.join(p, "..")) for p in submodel_paths]) - de.run_reconstruct(self.rerun()) + lre = LocalRemoteExecutor(args.sm_cluster) + lre.set_projects([os.path.abspath(os.path.join(p, "..")) for p in submodel_paths]) + lre.run_reconstruct() # Align alignment_file = octx.path('alignment_done.txt')