Fast PLY merge, parallel filtering

Former-commit-id: 2cd0f588ac
pull/1161/head
Piero Toffanin 2020-07-07 16:14:55 -04:00
rodzic 055cf26e61
commit d764d6576a
3 zmienionych plików z 149 dodań i 7 usunięć

Wyświetl plik

@ -1,5 +1,12 @@
from psutil import virtual_memory from psutil import virtual_memory
import os import os
try:
import Queue as queue
except:
import queue
import threading
import time
from opendm import log
def get_max_memory(minimum = 5, use_at_most = 0.5): def get_max_memory(minimum = 5, use_at_most = 0.5):
""" """
@ -16,3 +23,75 @@ def get_max_memory_mb(minimum = 100, use_at_most = 0.5):
:return value of memory to use in megabytes. :return value of memory to use in megabytes.
""" """
return max(minimum, (virtual_memory().available / 1024 / 1024) * use_at_most) return max(minimum, (virtual_memory().available / 1024 / 1024) * use_at_most)
def parallel_map(func, items, max_workers=1):
"""
Our own implementation for parallel processing
which handles gracefully CTRL+C and reverts to
single thread processing in case of errors
:param items list of objects
:param func function to execute on each object
"""
global error
error = None
def process_one(q):
func(q)
def worker():
global error
while True:
(num, q) = pq.get()
if q is None or error is not None:
pq.task_done()
break
try:
process_one(q)
except Exception as e:
error = e
finally:
pq.task_done()
if max_workers > 1:
use_single_thread = False
pq = queue.PriorityQueue()
threads = []
for i in range(max_workers):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for t in items:
pq.put((i, t.copy()))
def stop_workers():
for i in range(len(threads)):
pq.put((-1, None))
for t in threads:
t.join()
# block until all tasks are done
try:
while pq.unfinished_tasks > 0:
time.sleep(0.5)
except KeyboardInterrupt:
print("CTRL+C terminating...")
stop_workers()
sys.exit(1)
stop_workers()
if error is not None:
# Try to reprocess using a single thread
# in case this was a memory error
log.ODM_WARNING("Failed to run process in parallel, retrying with a single thread...")
use_single_thread = True
else:
use_single_thread = True
if use_single_thread:
# Boring, single thread processing
for q in items:
process_one(q)

Wyświetl plik

@ -80,6 +80,9 @@ def create_dem(input_point_cloud, dem_type, output_type='max', radiuses=['0.56']
verbose=False, decimation=None, keep_unfilled_copy=False, verbose=False, decimation=None, keep_unfilled_copy=False,
apply_smoothing=True): apply_smoothing=True):
""" Create DEM from multiple radii, and optionally gapfill """ """ Create DEM from multiple radii, and optionally gapfill """
# TODO: refactor to use concurrency.parallel_map
global error global error
error = None error = None

Wyświetl plik

@ -5,6 +5,7 @@ from opendm import context
from opendm.system import run from opendm.system import run
from opendm import entwine from opendm import entwine
from opendm import io from opendm import io
from opendm.concurrency import parallel_map
from pipes import quote from pipes import quote
def ply_info(input_ply): def ply_info(input_ply):
@ -18,7 +19,7 @@ def ply_info(input_ply):
with open(input_ply, 'r') as f: with open(input_ply, 'r') as f:
line = f.readline().strip().lower() line = f.readline().strip().lower()
i = 0 i = 0
while line != "end_header" and i < 100: while line != "end_header":
line = f.readline().strip().lower() line = f.readline().strip().lower()
props = line.split(" ") props = line.split(" ")
if len(props) == 3: if len(props) == 3:
@ -27,6 +28,9 @@ def ply_info(input_ply):
elif props[0] == "element" and props[1] == "vertex": elif props[0] == "element" and props[1] == "vertex":
vertex_count = int(props[2]) vertex_count = int(props[2])
i += 1 i += 1
if i > 100:
raise IOError("Cannot find end_header field. Invalid PLY?")
return { return {
'has_normals': has_normals, 'has_normals': has_normals,
@ -99,7 +103,7 @@ def filter(input_point_cloud, output_point_cloud, standard_deviation=2.5, meank=
# Do we need to split this? # Do we need to split this?
VERTEX_THRESHOLD = 300000 VERTEX_THRESHOLD = 300000
max_concurrency = min(max_concurrency, math.ceil(info['vertex_count'] / VERTEX_THRESHOLD)) max_concurrency = min(max_concurrency, int(math.ceil(info['vertex_count'] / VERTEX_THRESHOLD)))
vertices_per_submodel = int(math.ceil(info['vertex_count'] / max(1, max_concurrency))) vertices_per_submodel = int(math.ceil(info['vertex_count'] / max(1, max_concurrency)))
should_split = max_concurrency > 1 and info['vertex_count'] > VERTEX_THRESHOLD should_split = max_concurrency > 1 and info['vertex_count'] > VERTEX_THRESHOLD
@ -111,22 +115,25 @@ def filter(input_point_cloud, output_point_cloud, standard_deviation=2.5, meank=
point_cloud_submodels = split(input_point_cloud, partsdir, "part.ply", capacity=vertices_per_submodel, dims=dims) point_cloud_submodels = split(input_point_cloud, partsdir, "part.ply", capacity=vertices_per_submodel, dims=dims)
# Filter def run_filter(pcs):
for pcs in point_cloud_submodels:
# Recurse # Recurse
filter(pcs, io.related_file_path(pcs, postfix="_filtered"), filter(pcs['path'], io.related_file_path(pcs['path'], postfix="_filtered"),
standard_deviation=standard_deviation, standard_deviation=standard_deviation,
meank=meank, meank=meank,
sample_radius=sample_radius, sample_radius=sample_radius,
verbose=verbose, verbose=verbose,
max_concurrency=1) max_concurrency=1)
# Filter
parallel_map(run_filter, [{'path': p} for p in point_cloud_submodels], max_concurrency)
# Merge # Merge
log.ODM_INFO("Merging %s point cloud chunks to %s" % (len(point_cloud_submodels), output_point_cloud)) log.ODM_INFO("Merging %s point cloud chunks to %s" % (len(point_cloud_submodels), output_point_cloud))
filtered_pcs = [io.related_file_path(pcs, postfix="_filtered") for pcs in point_cloud_submodels] filtered_pcs = [io.related_file_path(pcs, postfix="_filtered") for pcs in point_cloud_submodels]
merge_ply(filtered_pcs, output_point_cloud, dims) #merge_ply(filtered_pcs, output_point_cloud, dims)
fast_merge_ply(filtered_pcs, output_point_cloud)
# TODO REMOVE parts if os.path.exists(partsdir):
shutil.rmtree(partsdir)
else: else:
# Process point cloud (or a point cloud submodel) in a single step # Process point cloud (or a point cloud submodel) in a single step
filterArgs = { filterArgs = {
@ -230,6 +237,59 @@ def merge(input_point_cloud_files, output_file, rerun=False):
system.run('lasmerge -i {all_inputs} -o "{output}"'.format(**kwargs)) system.run('lasmerge -i {all_inputs} -o "{output}"'.format(**kwargs))
def fast_merge_ply(input_point_cloud_files, output_file):
# Assumes that all input files share the same header/content format
# As the merge is a naive byte stream copy
num_files = len(input_point_cloud_files)
if num_files == 0:
log.ODM_WARNING("No input point cloud files to process")
return
if io.file_exists(output_file):
log.ODM_WARNING("Removing previous point cloud: %s" % output_file)
os.remove(output_file)
vertex_count = sum([ply_info(pcf)['vertex_count'] for pcf in input_point_cloud_files])
master_file = input_point_cloud_files[0]
with open(output_file, "wb") as out:
with open(master_file, "r") as fhead:
# Copy header
line = fhead.readline()
out.write(line)
i = 0
while line.strip().lower() != "end_header":
line = fhead.readline()
# Intercept element vertex field
if line.lower().startswith("element vertex "):
out.write("element vertex %s\n" % vertex_count)
else:
out.write(line)
i += 1
if i > 100:
raise IOError("Cannot find end_header field. Invalid PLY?")
for ipc in input_point_cloud_files:
i = 0
with open(ipc, "rb") as fin:
# Skip header
line = fin.readline()
while line.strip().lower() != "end_header":
line = fin.readline()
i += 1
if i > 100:
raise IOError("Cannot find end_header field. Invalid PLY?")
# Write fields
out.write(fin.read())
return output_file
def merge_ply(input_point_cloud_files, output_file, dims=None): def merge_ply(input_point_cloud_files, output_file, dims=None):
num_files = len(input_point_cloud_files) num_files = len(input_point_cloud_files)
if num_files == 0: if num_files == 0: