Fast PLY merge, parallel filtering

pull/1132/head
Piero Toffanin 2020-07-07 16:14:55 -04:00
rodzic c38628db6f
commit 2cd0f588ac
3 zmienionych plików z 149 dodań i 7 usunięć

Wyświetl plik

@ -1,5 +1,12 @@
from psutil import virtual_memory
import os
try:
import Queue as queue
except:
import queue
import threading
import time
from opendm import log
def get_max_memory(minimum = 5, use_at_most = 0.5):
"""
@ -16,3 +23,75 @@ def get_max_memory_mb(minimum = 100, use_at_most = 0.5):
:return value of memory to use in megabytes.
"""
return max(minimum, (virtual_memory().available / 1024 / 1024) * use_at_most)
def parallel_map(func, items, max_workers=1):
"""
Our own implementation for parallel processing
which handles gracefully CTRL+C and reverts to
single thread processing in case of errors
:param items list of objects
:param func function to execute on each object
"""
global error
error = None
def process_one(q):
func(q)
def worker():
global error
while True:
(num, q) = pq.get()
if q is None or error is not None:
pq.task_done()
break
try:
process_one(q)
except Exception as e:
error = e
finally:
pq.task_done()
if max_workers > 1:
use_single_thread = False
pq = queue.PriorityQueue()
threads = []
for i in range(max_workers):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for t in items:
pq.put((i, t.copy()))
def stop_workers():
for i in range(len(threads)):
pq.put((-1, None))
for t in threads:
t.join()
# block until all tasks are done
try:
while pq.unfinished_tasks > 0:
time.sleep(0.5)
except KeyboardInterrupt:
print("CTRL+C terminating...")
stop_workers()
sys.exit(1)
stop_workers()
if error is not None:
# Try to reprocess using a single thread
# in case this was a memory error
log.ODM_WARNING("Failed to run process in parallel, retrying with a single thread...")
use_single_thread = True
else:
use_single_thread = True
if use_single_thread:
# Boring, single thread processing
for q in items:
process_one(q)

Wyświetl plik

@ -80,6 +80,9 @@ def create_dem(input_point_cloud, dem_type, output_type='max', radiuses=['0.56']
verbose=False, decimation=None, keep_unfilled_copy=False,
apply_smoothing=True):
""" Create DEM from multiple radii, and optionally gapfill """
# TODO: refactor to use concurrency.parallel_map
global error
error = None

Wyświetl plik

@ -5,6 +5,7 @@ from opendm import context
from opendm.system import run
from opendm import entwine
from opendm import io
from opendm.concurrency import parallel_map
from pipes import quote
def ply_info(input_ply):
@ -18,7 +19,7 @@ def ply_info(input_ply):
with open(input_ply, 'r') as f:
line = f.readline().strip().lower()
i = 0
while line != "end_header" and i < 100:
while line != "end_header":
line = f.readline().strip().lower()
props = line.split(" ")
if len(props) == 3:
@ -27,6 +28,9 @@ def ply_info(input_ply):
elif props[0] == "element" and props[1] == "vertex":
vertex_count = int(props[2])
i += 1
if i > 100:
raise IOError("Cannot find end_header field. Invalid PLY?")
return {
'has_normals': has_normals,
@ -99,7 +103,7 @@ def filter(input_point_cloud, output_point_cloud, standard_deviation=2.5, meank=
# Do we need to split this?
VERTEX_THRESHOLD = 300000
max_concurrency = min(max_concurrency, math.ceil(info['vertex_count'] / VERTEX_THRESHOLD))
max_concurrency = min(max_concurrency, int(math.ceil(info['vertex_count'] / VERTEX_THRESHOLD)))
vertices_per_submodel = int(math.ceil(info['vertex_count'] / max(1, max_concurrency)))
should_split = max_concurrency > 1 and info['vertex_count'] > VERTEX_THRESHOLD
@ -111,22 +115,25 @@ def filter(input_point_cloud, output_point_cloud, standard_deviation=2.5, meank=
point_cloud_submodels = split(input_point_cloud, partsdir, "part.ply", capacity=vertices_per_submodel, dims=dims)
# Filter
for pcs in point_cloud_submodels:
def run_filter(pcs):
# Recurse
filter(pcs, io.related_file_path(pcs, postfix="_filtered"),
filter(pcs['path'], io.related_file_path(pcs['path'], postfix="_filtered"),
standard_deviation=standard_deviation,
meank=meank,
sample_radius=sample_radius,
verbose=verbose,
max_concurrency=1)
# Filter
parallel_map(run_filter, [{'path': p} for p in point_cloud_submodels], max_concurrency)
# Merge
log.ODM_INFO("Merging %s point cloud chunks to %s" % (len(point_cloud_submodels), output_point_cloud))
filtered_pcs = [io.related_file_path(pcs, postfix="_filtered") for pcs in point_cloud_submodels]
merge_ply(filtered_pcs, output_point_cloud, dims)
#merge_ply(filtered_pcs, output_point_cloud, dims)
fast_merge_ply(filtered_pcs, output_point_cloud)
# TODO REMOVE parts
if os.path.exists(partsdir):
shutil.rmtree(partsdir)
else:
# Process point cloud (or a point cloud submodel) in a single step
filterArgs = {
@ -230,6 +237,59 @@ def merge(input_point_cloud_files, output_file, rerun=False):
system.run('lasmerge -i {all_inputs} -o "{output}"'.format(**kwargs))
def fast_merge_ply(input_point_cloud_files, output_file):
# Assumes that all input files share the same header/content format
# As the merge is a naive byte stream copy
num_files = len(input_point_cloud_files)
if num_files == 0:
log.ODM_WARNING("No input point cloud files to process")
return
if io.file_exists(output_file):
log.ODM_WARNING("Removing previous point cloud: %s" % output_file)
os.remove(output_file)
vertex_count = sum([ply_info(pcf)['vertex_count'] for pcf in input_point_cloud_files])
master_file = input_point_cloud_files[0]
with open(output_file, "wb") as out:
with open(master_file, "r") as fhead:
# Copy header
line = fhead.readline()
out.write(line)
i = 0
while line.strip().lower() != "end_header":
line = fhead.readline()
# Intercept element vertex field
if line.lower().startswith("element vertex "):
out.write("element vertex %s\n" % vertex_count)
else:
out.write(line)
i += 1
if i > 100:
raise IOError("Cannot find end_header field. Invalid PLY?")
for ipc in input_point_cloud_files:
i = 0
with open(ipc, "rb") as fin:
# Skip header
line = fin.readline()
while line.strip().lower() != "end_header":
line = fin.readline()
i += 1
if i > 100:
raise IOError("Cannot find end_header field. Invalid PLY?")
# Write fields
out.write(fin.read())
return output_file
def merge_ply(input_point_cloud_files, output_file, dims=None):
num_files = len(input_point_cloud_files)
if num_files == 0: