kopia lustrzana https://github.com/OpenDroneMap/ODM
Custom logging class, split task upload, partial results
Former-commit-id: f6afade752
pull/1161/head
rodzic
ecea20213d
commit
ecfb1b5fc5
|
@ -8,8 +8,6 @@ from opendm import log
|
||||||
from opendm import io
|
from opendm import io
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from rasterio import logging
|
|
||||||
|
|
||||||
def euclidean_merge_dems(input_dems, output_dem, creation_options={}):
|
def euclidean_merge_dems(input_dems, output_dem, creation_options={}):
|
||||||
"""
|
"""
|
||||||
Based on https://github.com/mapbox/rio-merge-rgba
|
Based on https://github.com/mapbox/rio-merge-rgba
|
||||||
|
@ -35,12 +33,6 @@ def euclidean_merge_dems(input_dems, output_dem, creation_options={}):
|
||||||
log.ODM_WARNING("No input DEMs, skipping euclidean merge.")
|
log.ODM_WARNING("No input DEMs, skipping euclidean merge.")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Suppress DEBUG logging for rasterio operations
|
|
||||||
log_ = logging.getLogger()
|
|
||||||
debug_enabled = log_.isEnabledFor(logging.DEBUG)
|
|
||||||
if debug_enabled:
|
|
||||||
logging.disable(logging.DEBUG)
|
|
||||||
|
|
||||||
with rasterio.open(existing_dems[0]) as first:
|
with rasterio.open(existing_dems[0]) as first:
|
||||||
src_nodata = first.nodatavals[0]
|
src_nodata = first.nodatavals[0]
|
||||||
res = first.res
|
res = first.res
|
||||||
|
@ -169,8 +161,4 @@ def euclidean_merge_dems(input_dems, output_dem, creation_options={}):
|
||||||
|
|
||||||
dstrast.write(dstarr, window=dst_window)
|
dstrast.write(dstarr, window=dst_window)
|
||||||
|
|
||||||
# Restore logging
|
|
||||||
if debug_enabled:
|
|
||||||
logging.disable(logging.NOTSET)
|
|
||||||
|
|
||||||
return output_dem
|
return output_dem
|
|
@ -1,5 +1,3 @@
|
||||||
import logging
|
|
||||||
|
|
||||||
HEADER = '\033[95m'
|
HEADER = '\033[95m'
|
||||||
OKBLUE = '\033[94m'
|
OKBLUE = '\033[94m'
|
||||||
OKGREEN = '\033[92m'
|
OKGREEN = '\033[92m'
|
||||||
|
@ -7,19 +5,31 @@ WARNING = '\033[93m'
|
||||||
FAIL = '\033[91m'
|
FAIL = '\033[91m'
|
||||||
ENDC = '\033[0m'
|
ENDC = '\033[0m'
|
||||||
|
|
||||||
def init():
|
# logging has too many quirks...
|
||||||
# TODO add file handling
|
class ODMLogger:
|
||||||
logging.addLevelName(logging.INFO, '%s[%s]' % (OKBLUE, logging.getLevelName(logging.INFO)))
|
def log(self, startc, msg, level_name):
|
||||||
logging.addLevelName(logging.WARNING, '%s[%s]' % (WARNING, logging.getLevelName(logging.WARNING)))
|
level = ("[" + level_name + "]").ljust(9)
|
||||||
logging.addLevelName(logging.ERROR, '%s[%s]' % (FAIL, logging.getLevelName(logging.ERROR)))
|
print("%s%s %s%s" % (startc, level, msg, ENDC))
|
||||||
logging.addLevelName(logging.DEBUG, '%s[%s]' % (OKGREEN, logging.getLevelName(logging.DEBUG)))
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.DEBUG,
|
def info(self, msg):
|
||||||
format='%(levelname)-14s %(message)s' + ENDC)
|
self.log(OKBLUE, msg, "INFO")
|
||||||
|
|
||||||
|
def warning(self, msg):
|
||||||
|
self.log(WARNING, msg, "WARNING")
|
||||||
|
|
||||||
ODM_INFO = logging.info
|
def error(self, msg):
|
||||||
ODM_WARNING = logging.warning
|
self.log(FAIL, msg, "ERROR")
|
||||||
ODM_ERROR = logging.error
|
|
||||||
ODM_EXCEPTION = logging.exception
|
def exception(self, msg):
|
||||||
ODM_DEBUG = logging.debug
|
self.log(FAIL, msg, "EXCEPTION")
|
||||||
|
|
||||||
|
def debug(self, msg):
|
||||||
|
self.log(OKGREEN, msg, "DEBUG")
|
||||||
|
|
||||||
|
logger = ODMLogger()
|
||||||
|
|
||||||
|
ODM_INFO = logger.info
|
||||||
|
ODM_WARNING = logger.warning
|
||||||
|
ODM_ERROR = logger.error
|
||||||
|
ODM_EXCEPTION = logger.exception
|
||||||
|
ODM_DEBUG = logger.debug
|
||||||
|
|
|
@ -70,8 +70,6 @@ class OSFMContext:
|
||||||
has_alt = False
|
has_alt = False
|
||||||
fout.write('%s\n' % io.join_paths(images_path, photo.filename))
|
fout.write('%s\n' % io.join_paths(images_path, photo.filename))
|
||||||
|
|
||||||
# TODO: does this need to be a relative path?
|
|
||||||
|
|
||||||
# create config file for OpenSfM
|
# create config file for OpenSfM
|
||||||
config = [
|
config = [
|
||||||
"use_exif_size: no",
|
"use_exif_size: no",
|
||||||
|
@ -105,7 +103,7 @@ class OSFMContext:
|
||||||
|
|
||||||
if gcp_path:
|
if gcp_path:
|
||||||
config.append("bundle_use_gcp: yes")
|
config.append("bundle_use_gcp: yes")
|
||||||
io.copy(gcp_path, self.opensfm_project_path)
|
io.copy(gcp_path, self.path("gcp_list.txt"))
|
||||||
|
|
||||||
config = config + append_config
|
config = config + append_config
|
||||||
|
|
||||||
|
|
133
opendm/remote.py
133
opendm/remote.py
|
@ -5,9 +5,11 @@ import sys
|
||||||
import threading
|
import threading
|
||||||
import signal
|
import signal
|
||||||
import zipfile
|
import zipfile
|
||||||
|
import glob
|
||||||
from opendm import log
|
from opendm import log
|
||||||
from pyodm import Node, exceptions
|
from pyodm import Node, exceptions
|
||||||
from pyodm.utils import AtomicCounter
|
from pyodm.utils import AtomicCounter
|
||||||
|
from pyodm.types import TaskStatus
|
||||||
from osfm import OSFMContext
|
from osfm import OSFMContext
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -26,6 +28,7 @@ class LocalRemoteExecutor:
|
||||||
def __init__(self, nodeUrl):
|
def __init__(self, nodeUrl):
|
||||||
self.node = Node.from_url(nodeUrl)
|
self.node = Node.from_url(nodeUrl)
|
||||||
self.node.tasks = []
|
self.node.tasks = []
|
||||||
|
self.node_online = True
|
||||||
|
|
||||||
log.ODM_INFO("LRE: Initializing using cluster node %s:%s" % (self.node.host, self.node.port))
|
log.ODM_INFO("LRE: Initializing using cluster node %s:%s" % (self.node.host, self.node.port))
|
||||||
try:
|
try:
|
||||||
|
@ -33,6 +36,7 @@ class LocalRemoteExecutor:
|
||||||
log.ODM_INFO("LRE: Node is online and running ODM version: %s" % odm_version)
|
log.ODM_INFO("LRE: Node is online and running ODM version: %s" % odm_version)
|
||||||
except exceptions.NodeConnectionError:
|
except exceptions.NodeConnectionError:
|
||||||
log.ODM_WARNING("LRE: The node seems to be offline! We'll still process the dataset, but it's going to run entirely locally.")
|
log.ODM_WARNING("LRE: The node seems to be offline! We'll still process the dataset, but it's going to run entirely locally.")
|
||||||
|
self.node_online = False
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.ODM_ERROR("LRE: An unexpected problem happened while opening the node connection: %s" % str(e))
|
log.ODM_ERROR("LRE: An unexpected problem happened while opening the node connection: %s" % str(e))
|
||||||
exit(1)
|
exit(1)
|
||||||
|
@ -61,11 +65,10 @@ class LocalRemoteExecutor:
|
||||||
def cleanup_remote_tasks_and_exit():
|
def cleanup_remote_tasks_and_exit():
|
||||||
log.ODM_WARNING("LRE: Attempting to cleanup remote tasks")
|
log.ODM_WARNING("LRE: Attempting to cleanup remote tasks")
|
||||||
for task in self.node.tasks:
|
for task in self.node.tasks:
|
||||||
pass # TODO!!!
|
log.ODM_DEBUG("Removing remote task %s... %s" % (task.uuid, task.remove()))
|
||||||
#task.cancel()
|
|
||||||
os._exit(1)
|
os._exit(1)
|
||||||
|
|
||||||
def handle_result(task, local, error = None):
|
def handle_result(task, local, error = None, partial=False):
|
||||||
if error:
|
if error:
|
||||||
if isinstance(error, NodeTaskLimitReachedException) and not nonloc.semaphore:
|
if isinstance(error, NodeTaskLimitReachedException) and not nonloc.semaphore:
|
||||||
nonloc.semaphore = threading.Semaphore(node_task_limit.value)
|
nonloc.semaphore = threading.Semaphore(node_task_limit.value)
|
||||||
|
@ -90,10 +93,11 @@ class LocalRemoteExecutor:
|
||||||
else:
|
else:
|
||||||
nonloc.error = error
|
nonloc.error = error
|
||||||
else:
|
else:
|
||||||
if not local:
|
if not local and not partial:
|
||||||
node_task_limit.increment(-1)
|
node_task_limit.increment(-1)
|
||||||
|
|
||||||
log.ODM_INFO("LRE: %s finished successfully" % task)
|
if not partial:
|
||||||
|
log.ODM_INFO("LRE: %s finished successfully" % task)
|
||||||
|
|
||||||
if local:
|
if local:
|
||||||
nonloc.local_is_processing = False
|
nonloc.local_is_processing = False
|
||||||
|
@ -113,7 +117,7 @@ class LocalRemoteExecutor:
|
||||||
if nonloc.semaphore: nonloc.semaphore.release()
|
if nonloc.semaphore: nonloc.semaphore.release()
|
||||||
break
|
break
|
||||||
|
|
||||||
if not nonloc.local_is_processing:
|
if not nonloc.local_is_processing or not self.node_online:
|
||||||
# Process local
|
# Process local
|
||||||
try:
|
try:
|
||||||
nonloc.local_is_processing = True
|
nonloc.local_is_processing = True
|
||||||
|
@ -184,8 +188,8 @@ class Task:
|
||||||
self.local = None
|
self.local = None
|
||||||
|
|
||||||
def process(self, local, done):
|
def process(self, local, done):
|
||||||
def handle_result(error = None):
|
def handle_result(error = None, partial=False):
|
||||||
done(self, local, error)
|
done(self, local, error, partial)
|
||||||
|
|
||||||
log.ODM_INFO("LRE: About to process %s %s" % (self, 'locally' if local else 'remotely'))
|
log.ODM_INFO("LRE: About to process %s %s" % (self, 'locally' if local else 'remotely'))
|
||||||
|
|
||||||
|
@ -204,9 +208,12 @@ class Task:
|
||||||
# perhaps this wouldn't be a big speedup.
|
# perhaps this wouldn't be a big speedup.
|
||||||
self._process_remote(handle_result) # Block until upload is complete
|
self._process_remote(handle_result) # Block until upload is complete
|
||||||
|
|
||||||
def create_seed_payload(self, paths):
|
def path(self, *paths):
|
||||||
paths = filter(os.path.exists, map(lambda p: os.path.join(self.project_path, p), paths))
|
return os.path.join(self.project_path, *paths)
|
||||||
outfile = os.path.join(self.project_path, "seed.zip")
|
|
||||||
|
def create_seed_payload(self, paths, touch_files=[]):
|
||||||
|
paths = filter(os.path.exists, map(lambda p: self.path(p), paths))
|
||||||
|
outfile = self.path("seed.zip")
|
||||||
|
|
||||||
with zipfile.ZipFile(outfile, "w", compression=zipfile.ZIP_DEFLATED) as zf:
|
with zipfile.ZipFile(outfile, "w", compression=zipfile.ZIP_DEFLATED) as zf:
|
||||||
for p in paths:
|
for p in paths:
|
||||||
|
@ -218,21 +225,27 @@ class Task:
|
||||||
zf.write(filename, os.path.relpath(filename, self.project_path))
|
zf.write(filename, os.path.relpath(filename, self.project_path))
|
||||||
else:
|
else:
|
||||||
zf.write(p, os.path.relpath(p, self.project_path))
|
zf.write(p, os.path.relpath(p, self.project_path))
|
||||||
|
|
||||||
|
for tf in touch_files:
|
||||||
|
zf.writestr(tf, "")
|
||||||
|
|
||||||
return outfile
|
return outfile
|
||||||
|
|
||||||
def _process_local(self, done):
|
def _process_local(self, done):
|
||||||
try:
|
try:
|
||||||
self.process_local(done)
|
self.process_local()
|
||||||
|
done()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
done(e)
|
done(e)
|
||||||
|
|
||||||
def _process_remote(self, done):
|
def _process_remote(self, done):
|
||||||
try:
|
try:
|
||||||
self.process_remote(done)
|
self.process_remote(done)
|
||||||
|
done(error=None, partial=True) # Upload is completed, but processing is not (partial)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
done(e)
|
done(e)
|
||||||
|
|
||||||
def process_local(self, done):
|
def process_local(self):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def process_remote(self, done):
|
def process_remote(self, done):
|
||||||
|
@ -242,33 +255,85 @@ class Task:
|
||||||
return os.path.basename(self.project_path)
|
return os.path.basename(self.project_path)
|
||||||
|
|
||||||
class ReconstructionTask(Task):
|
class ReconstructionTask(Task):
|
||||||
def process_local(self, done):
|
def process_local(self):
|
||||||
octx = OSFMContext(os.path.join(self.project_path, "opensfm"))
|
octx = OSFMContext(self.path("opensfm"))
|
||||||
octx.feature_matching()
|
octx.feature_matching()
|
||||||
octx.reconstruct()
|
octx.reconstruct()
|
||||||
done()
|
|
||||||
|
|
||||||
def process_remote(self, done):
|
def process_remote(self, done):
|
||||||
|
seed_file = self.create_seed_payload(["opensfm/exif",
|
||||||
|
"opensfm/camera_models.json",
|
||||||
|
"opensfm/reference_lla.json"], touch_files=["opensfm/split_merge_stop_at_reconstruction.txt"])
|
||||||
|
|
||||||
def test():
|
# Find all images
|
||||||
time.sleep(4)
|
images = glob.glob(self.path("images/**"))
|
||||||
done()
|
|
||||||
|
|
||||||
self.create_seed_payload(["opensfm/exif",
|
# Add GCP (optional)
|
||||||
"opensfm/matches",
|
if os.path.exists(self.path("gcp_list.txt")):
|
||||||
"opensfm/features",
|
images.append(self.path("gcp_list.txt"))
|
||||||
"opensfm/camera_models.json",
|
|
||||||
"opensfm/reference_lla.json"])
|
|
||||||
|
|
||||||
if self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0001':
|
# Add seed file
|
||||||
done(Exception("TEST EXCEPTION!" + self.project_path))
|
images.append(seed_file)
|
||||||
elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0002':
|
|
||||||
done(NodeTaskLimitReachedException("Limit reached"))
|
def print_progress(percentage):
|
||||||
elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0003':
|
# if percentage % 10 == 0:
|
||||||
threading.Thread(target=test).start()
|
log.ODM_DEBUG("LRE: Upload of %s at [%s%]" % (self, percentage))
|
||||||
elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0004':
|
|
||||||
threading.Thread(target=test).start()
|
# Upload task
|
||||||
|
task = self.node.create_task(images,
|
||||||
|
{'dsm': True, 'orthophoto-resolution': 4}, # TODO
|
||||||
|
progress_callback=print_progress,
|
||||||
|
skip_post_processing=True,
|
||||||
|
outputs=["opensfm/matches",
|
||||||
|
"opensfm/features",])
|
||||||
|
|
||||||
|
# Keep track of tasks for cleanup
|
||||||
|
self.node.tasks.append(task)
|
||||||
|
|
||||||
|
# Check status
|
||||||
|
info = task.info()
|
||||||
|
if info.status == TaskStatus.RUNNING:
|
||||||
|
def monitor():
|
||||||
|
# If a task switches from RUNNING to QUEUED, then we need to
|
||||||
|
# stop the process and re-add the task to the queue.
|
||||||
|
def status_callback(info):
|
||||||
|
if info.status == TaskStatus.QUEUED:
|
||||||
|
log.ODM_WARNING("%s (%s) turned from RUNNING to QUEUED. Re-adding to back of the queue." % (self, task.uuid))
|
||||||
|
task.remove()
|
||||||
|
done(NodeTaskLimitReachedException("Delayed task limit reached"), partial=True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
def print_progress(progress):
|
||||||
|
log.ODM_DEBUG("LRE: Download of %s at [%s%]" % (self, percentage))
|
||||||
|
|
||||||
|
task.wait_for_completion(status_callback=status_callback)
|
||||||
|
log.ODM_DEBUG("Downloading assets for %s" % self)
|
||||||
|
task.download_assets(self.project_path, progress_callback=print_progress)
|
||||||
|
done()
|
||||||
|
except Exception as e:
|
||||||
|
done(e)
|
||||||
|
|
||||||
|
# Launch monitor thread and return
|
||||||
|
threading.Thread(target=monitor).start()
|
||||||
|
elif info.status == TaskStatus.QUEUED:
|
||||||
|
raise NodeTaskLimitReachedException("Task limit reached")
|
||||||
else:
|
else:
|
||||||
print("Process remote: " + self.project_path)
|
raise Exception("Could not send task to node, task status set to %s" % str(info.status))
|
||||||
done()
|
|
||||||
|
|
||||||
|
# def test():
|
||||||
|
# time.sleep(4)
|
||||||
|
# done()
|
||||||
|
|
||||||
|
# if self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0001':
|
||||||
|
# done(Exception("TEST EXCEPTION!" + self.project_path))
|
||||||
|
# elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0002':
|
||||||
|
# done(NodeTaskLimitReachedException("Limit reached"))
|
||||||
|
# elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0003':
|
||||||
|
# threading.Thread(target=test).start()
|
||||||
|
# elif self.project_path == '/datasets/brighton/opensfm/submodels/submodel_0004':
|
||||||
|
# threading.Thread(target=test).start()
|
||||||
|
# else:
|
||||||
|
# print("Process remote: " + self.project_path)
|
||||||
|
# done()
|
||||||
|
|
||||||
|
|
1
run.py
1
run.py
|
@ -11,7 +11,6 @@ from pipes import quote
|
||||||
from stages.odm_app import ODMApp
|
from stages.odm_app import ODMApp
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
log.init()
|
|
||||||
args = config.config()
|
args = config.config()
|
||||||
|
|
||||||
log.ODM_INFO('Initializing OpenDroneMap app - %s' % system.now())
|
log.ODM_INFO('Initializing OpenDroneMap app - %s' % system.now())
|
||||||
|
|
|
@ -26,8 +26,10 @@ class ODMOpenSfMStage(types.ODM_Stage):
|
||||||
octx.feature_matching(self.rerun())
|
octx.feature_matching(self.rerun())
|
||||||
octx.reconstruct(self.rerun())
|
octx.reconstruct(self.rerun())
|
||||||
|
|
||||||
# TODO: add special logic for distributed large workflow
|
# If we find a special flag file for split/merge we stop right here
|
||||||
# and terminate stage early
|
if os.path.exists(octx.path("split_merge_stop_at_reconstruction.txt")):
|
||||||
|
log.ODM_INFO("Stopping OpenSfM early because we found: %s" % octx.path("split_merge_stop_at_reconstruction.txt"))
|
||||||
|
return
|
||||||
|
|
||||||
if args.fast_orthophoto:
|
if args.fast_orthophoto:
|
||||||
output_file = octx.path('reconstruction.ply')
|
output_file = octx.path('reconstruction.ply')
|
||||||
|
|
|
@ -12,7 +12,7 @@ from opendm.dem.merge import euclidean_merge_dems
|
||||||
from opensfm.large import metadataset
|
from opensfm.large import metadataset
|
||||||
from opendm.cropper import Cropper
|
from opendm.cropper import Cropper
|
||||||
from opendm.concurrency import get_max_memory
|
from opendm.concurrency import get_max_memory
|
||||||
from opendm.remote import HybridDistributedExecutor
|
from opendm.remote import LocalRemoteExecutor
|
||||||
from pipes import quote
|
from pipes import quote
|
||||||
|
|
||||||
class ODMSplitStage(types.ODM_Stage):
|
class ODMSplitStage(types.ODM_Stage):
|
||||||
|
@ -86,9 +86,9 @@ class ODMSplitStage(types.ODM_Stage):
|
||||||
log.ODM_INFO("Reconstructing %s" % sp)
|
log.ODM_INFO("Reconstructing %s" % sp)
|
||||||
OSFMContext(sp).reconstruct(self.rerun())
|
OSFMContext(sp).reconstruct(self.rerun())
|
||||||
else:
|
else:
|
||||||
de = HybridDistributedExecutor(args.sm_cluster)
|
lre = LocalRemoteExecutor(args.sm_cluster)
|
||||||
de.set_projects([os.path.abspath(os.path.join(p, "..")) for p in submodel_paths])
|
lre.set_projects([os.path.abspath(os.path.join(p, "..")) for p in submodel_paths])
|
||||||
de.run_reconstruct(self.rerun())
|
lre.run_reconstruct()
|
||||||
|
|
||||||
# Align
|
# Align
|
||||||
alignment_file = octx.path('alignment_done.txt')
|
alignment_file = octx.path('alignment_done.txt')
|
||||||
|
|
Ładowanie…
Reference in New Issue