Refactoring, skeleton hybrid distributed executor

Former-commit-id: 83edf2d45b
pull/1161/head
Piero Toffanin 2019-05-06 11:35:23 -04:00
rodzic 3ef7a3a8bd
commit dd16a8913c
5 zmienionych plików z 84 dodań i 62 usunięć

Wyświetl plik

@ -123,62 +123,55 @@ class OSFMContext:
else: else:
log.ODM_WARNING("%s already exists, not rerunning OpenSfM setup" % list_path) log.ODM_WARNING("%s already exists, not rerunning OpenSfM setup" % list_path)
def extract_metadata(self, rerun=False):
def feature_matching(self, rerun=False): metadata_dir = self.path("exif")
if not io.file_exists(self.feature_matching_done_file()) or rerun: if not io.dir_exists(metadata_dir) or rerun:
# TODO: put extract metadata into its own function
self.run('extract_metadata') self.run('extract_metadata')
# TODO: distributed workflow should do these two steps independently def feature_matching(self, rerun=False):
features_dir = self.path("features")
matches_dir = self.path("matches")
if not io.dir_exists(features_dir) or rerun:
self.run('detect_features') self.run('detect_features')
self.run('match_features')
self.mark_feature_matching_done()
else: else:
log.ODM_WARNING('Found a feature matching done progress file in: %s' % self.feature_matching_done_file()) log.ODM_WARNING('Detect features already done: %s exists' % features_dir)
def feature_matching_done_file(self): if not io.dir_exists(matches_dir) or rerun:
return io.join_paths(self.opensfm_project_path, 'matching_done.txt') self.run('match_features')
else:
log.ODM_WARNING('Match features already done: %s exists' % matches_dir)
def mark_feature_matching_done(self):
with open(self.feature_matching_done_file(), 'w') as fout:
fout.write("Matching done!\n")
def path(self, *paths): def path(self, *paths):
return os.path.join(self.opensfm_project_path, *paths) return os.path.join(self.opensfm_project_path, *paths)
def set_image_list_absolute(self): def save_absolute_image_list_to(self, file):
""" """
Checks the image_list.txt file and makes sure that all paths Writes a copy of the image_list.txt file and makes sure that all paths
written in it are absolute paths and not relative paths. written in it are absolute paths and not relative paths.
If there are relative paths, they are changed to absolute paths.
""" """
image_list_file = self.path("image_list.txt") image_list_file = self.path("image_list.txt")
tmp_list_file = self.path("image_list.txt.tmp")
if io.file_exists(image_list_file): if io.file_exists(image_list_file):
changed = False
with open(image_list_file, 'r') as f: with open(image_list_file, 'r') as f:
content = f.read() content = f.read()
lines = [] lines = []
for line in map(str.strip, content.split('\n')): for line in map(str.strip, content.split('\n')):
if line and not line.startswith("/"): if line and not line.startswith("/"):
changed = True
line = os.path.abspath(os.path.join(self.opensfm_project_path, line)) line = os.path.abspath(os.path.join(self.opensfm_project_path, line))
lines.append(line) lines.append(line)
if changed: with open(file, 'w') as f:
with open(tmp_list_file, 'w') as f: f.write("\n".join(lines))
f.write("\n".join(lines))
os.remove(image_list_file) log.ODM_DEBUG("Wrote %s with absolute paths" % file)
os.rename(tmp_list_file, image_list_file)
log.ODM_DEBUG("%s now contains absolute paths" % image_list_file)
else: else:
log.ODM_WARNING("No %s found, cannot check for absolute paths." % image_list_file) log.ODM_WARNING("No %s found, cannot create %s" % (image_list_file, file))
def name(self):
return os.path.basename(os.path.abspath(self.path("..")))
def get_submodel_argv(args, submodels_path, submodel_name): def get_submodel_argv(args, submodels_path, submodel_name):
""" """

17
opendm/remote.py 100644
Wyświetl plik

@ -0,0 +1,17 @@
from opendm import log
class HybridDistributedExecutor:
def __init__(self, nodeUrl):
self.nodeUrl = nodeUrl
log.ODM_INFO("Initializing hybrid distributed executor with cluster node: %s" % nodeUrl)
def set_projects(self, paths):
self.project_paths = paths
def run_reconstruct(self):
print(self.project_paths)
exit(1)
def run_toolchain(self):
pass

Wyświetl plik

@ -6,6 +6,7 @@ from opendm import system
from opendm import context from opendm import context
from opendm import point_cloud from opendm import point_cloud
from opendm import types from opendm import types
from opendm.osfm import OSFMContext
class ODMMveStage(types.ODM_Stage): class ODMMveStage(types.ODM_Stage):
def process(self, args, outputs): def process(self, args, outputs):
@ -28,7 +29,9 @@ class ODMMveStage(types.ODM_Stage):
if not io.file_exists(tree.mve_bundle): if not io.file_exists(tree.mve_bundle):
system.mkdir_p(tree.mve_path) system.mkdir_p(tree.mve_path)
system.mkdir_p(io.join_paths(tree.mve_path, 'bundle')) system.mkdir_p(io.join_paths(tree.mve_path, 'bundle'))
io.copy(tree.opensfm_image_list, tree.mve_image_list)
octx = OSFMContext(tree.opensfm)
octx.save_absolute_image_list_to(tree.mve_image_list)
io.copy(tree.opensfm_bundle, tree.mve_bundle) io.copy(tree.opensfm_bundle, tree.mve_bundle)
# mve makescene wants the output directory # mve makescene wants the output directory

Wyświetl plik

@ -22,6 +22,7 @@ class ODMOpenSfMStage(types.ODM_Stage):
octx = OSFMContext(tree.opensfm) octx = OSFMContext(tree.opensfm)
octx.setup(args, tree.dataset_raw, photos, gcp_path=tree.odm_georeferencing_gcp, rerun=self.rerun()) octx.setup(args, tree.dataset_raw, photos, gcp_path=tree.odm_georeferencing_gcp, rerun=self.rerun())
octx.extract_metadata(self.rerun())
octx.feature_matching(self.rerun()) octx.feature_matching(self.rerun())
octx.reconstruct(self.rerun()) octx.reconstruct(self.rerun())

Wyświetl plik

@ -12,6 +12,7 @@ from opendm.dem.merge import euclidean_merge_dems
from opensfm.large import metadataset from opensfm.large import metadataset
from opendm.cropper import Cropper from opendm.cropper import Cropper
from opendm.concurrency import get_max_memory from opendm.concurrency import get_max_memory
from opendm.remote import HybridDistributedExecutor
from pipes import quote from pipes import quote
class ODMSplitStage(types.ODM_Stage): class ODMSplitStage(types.ODM_Stage):
@ -23,6 +24,9 @@ class ODMSplitStage(types.ODM_Stage):
outputs['large'] = len(photos) > args.split outputs['large'] = len(photos) > args.split
if outputs['large']: if outputs['large']:
# If we have a cluster address, we'll use a distributed workflow
local_workflow = not bool(args.sm_cluster)
octx = OSFMContext(tree.opensfm) octx = OSFMContext(tree.opensfm)
split_done_file = octx.path("split_done.txt") split_done_file = octx.path("split_done.txt")
@ -38,8 +42,10 @@ class ODMSplitStage(types.ODM_Stage):
] ]
octx.setup(args, tree.dataset_raw, photos, gcp_path=tree.odm_georeferencing_gcp, append_config=config, rerun=self.rerun()) octx.setup(args, tree.dataset_raw, photos, gcp_path=tree.odm_georeferencing_gcp, append_config=config, rerun=self.rerun())
octx.extract_metadata(self.rerun())
octx.feature_matching(self.rerun())
if local_workflow:
octx.feature_matching(self.rerun())
# Create submodels # Create submodels
if not io.dir_exists(tree.submodels_path) or self.rerun(): if not io.dir_exists(tree.submodels_path) or self.rerun():
@ -57,34 +63,32 @@ class ODMSplitStage(types.ODM_Stage):
gcp_file = GCPFile(tree.odm_georeferencing_gcp) gcp_file = GCPFile(tree.odm_georeferencing_gcp)
# Make sure the image list file has absolute paths
for sp in submodel_paths: for sp in submodel_paths:
sp_octx = OSFMContext(sp) sp_octx = OSFMContext(sp)
sp_octx.set_image_list_absolute()
# Copy filtered GCP file if needed # Copy filtered GCP file if needed
# One in OpenSfM's directory, one in the submodel project directory # One in OpenSfM's directory, one in the submodel project directory
if gcp_file.exists(): if gcp_file.exists():
submodel_gcp_file = os.path.abspath(sp_octx.path("..", "gcp_list.txt")) submodel_gcp_file = os.path.abspath(sp_octx.path("..", "gcp_list.txt"))
submodel_images_dir = os.path.abspath(sp_octx.path("..", "images")) submodel_images_dir = os.path.abspath(sp_octx.path("..", "images"))
submodel_name = os.path.basename(os.path.abspath(sp_octx.path("..")))
if gcp_file.make_filtered_copy(submodel_gcp_file, submodel_images_dir): if gcp_file.make_filtered_copy(submodel_gcp_file, submodel_images_dir):
log.ODM_DEBUG("Copied filtered GCP file to %s" % submodel_gcp_file) log.ODM_DEBUG("Copied filtered GCP file to %s" % submodel_gcp_file)
io.copy(submodel_gcp_file, os.path.abspath(sp_octx.path("gcp_list.txt"))) io.copy(submodel_gcp_file, os.path.abspath(sp_octx.path("gcp_list.txt")))
else: else:
log.ODM_DEBUG("No GCP will be copied for %s, not enough images in the submodel are referenced by the GCP" % submodel_name) log.ODM_DEBUG("No GCP will be copied for %s, not enough images in the submodel are referenced by the GCP" % sp_octx.name())
# Reconstruct each submodel # Reconstruct each submodel
log.ODM_INFO("Dataset has been split into %s submodels. Reconstructing each submodel..." % len(submodel_paths)) log.ODM_INFO("Dataset has been split into %s submodels. Reconstructing each submodel..." % len(submodel_paths))
# TODO: on a network workflow we probably stop here if local_workflow:
# and let NodeODM take over for sp in submodel_paths:
# exit(0) log.ODM_INFO("Reconstructing %s" % sp)
OSFMContext(sp).reconstruct(self.rerun())
for sp in submodel_paths: else:
log.ODM_INFO("Reconstructing %s" % sp) de = HybridDistributedExecutor(args.sm_cluster)
OSFMContext(sp).reconstruct(self.rerun()) de.set_projects([os.path.abspath(os.path.join(p, "..")) for p in submodel_paths])
de.run_reconstruct()
# Align # Align
alignment_file = octx.path('alignment_done.txt') alignment_file = octx.path('alignment_done.txt')
@ -97,19 +101,11 @@ class ODMSplitStage(types.ODM_Stage):
else: else:
log.ODM_WARNING('Found a alignment matching done progress file in: %s' % alignment_file) log.ODM_WARNING('Found a alignment matching done progress file in: %s' % alignment_file)
# Dense reconstruction for each submodel # Aligned reconstruction is in reconstruction.aligned.json
# We need to rename it to reconstruction.json
remove_paths = []
for sp in submodel_paths: for sp in submodel_paths:
# TODO: network workflow
# We have already done matching
sp_octx = OSFMContext(sp) sp_octx = OSFMContext(sp)
sp_octx.mark_feature_matching_done()
submodel_name = os.path.basename(os.path.abspath(sp_octx.path("..")))
# Aligned reconstruction is in reconstruction.aligned.json
# We need to rename it to reconstruction.json
aligned_recon = sp_octx.path('reconstruction.aligned.json') aligned_recon = sp_octx.path('reconstruction.aligned.json')
main_recon = sp_octx.path('reconstruction.json') main_recon = sp_octx.path('reconstruction.json')
@ -117,7 +113,8 @@ class ODMSplitStage(types.ODM_Stage):
if not io.file_exists(aligned_recon): if not io.file_exists(aligned_recon):
log.ODM_WARNING("Submodel %s does not have an aligned reconstruction (%s). " log.ODM_WARNING("Submodel %s does not have an aligned reconstruction (%s). "
"This could mean that the submodel could not be reconstructed " "This could mean that the submodel could not be reconstructed "
" (are there enough features to reconstruct it?). Skipping." % (submodel_name, aligned_recon)) " (are there enough features to reconstruct it?). Skipping." % (sp_octx.name(), aligned_recon))
remove_paths.append(sp)
continue continue
if io.file_exists(main_recon): if io.file_exists(main_recon):
@ -126,14 +123,25 @@ class ODMSplitStage(types.ODM_Stage):
shutil.move(aligned_recon, main_recon) shutil.move(aligned_recon, main_recon)
log.ODM_DEBUG("%s is now %s" % (aligned_recon, main_recon)) log.ODM_DEBUG("%s is now %s" % (aligned_recon, main_recon))
log.ODM_INFO("========================") # Remove invalid submodels
log.ODM_INFO("Processing %s" % submodel_name) submodel_paths = [p for p in submodel_paths if not p in remove_paths]
log.ODM_INFO("========================")
argv = get_submodel_argv(args, tree.submodels_path, submodel_name) # Run ODM toolchain for each submodel
if local_workflow:
for sp in submodel_paths:
sp_octx = OSFMContext(sp)
# Re-run the ODM toolchain on the submodel log.ODM_INFO("========================")
system.run(" ".join(map(quote, argv)), env_vars=os.environ.copy()) log.ODM_INFO("Processing %s" % sp_octx.name())
log.ODM_INFO("========================")
argv = get_submodel_argv(args, tree.submodels_path, sp_octx.name())
# Re-run the ODM toolchain on the submodel
system.run(" ".join(map(quote, argv)), env_vars=os.environ.copy())
else:
de.set_projects([os.path.abspath(os.path.join(p, "..")) for p in submodel_paths])
de.run_toolchain()
with open(split_done_file, 'w') as fout: with open(split_done_file, 'w') as fout:
fout.write("Split done!\n") fout.write("Split done!\n")