diff --git a/opendm/concurrency.py b/opendm/concurrency.py index e14d2186..53dea176 100644 --- a/opendm/concurrency.py +++ b/opendm/concurrency.py @@ -1,5 +1,12 @@ from psutil import virtual_memory import os +try: + import Queue as queue +except: + import queue +import threading +import time +from opendm import log def get_max_memory(minimum = 5, use_at_most = 0.5): """ @@ -16,3 +23,75 @@ def get_max_memory_mb(minimum = 100, use_at_most = 0.5): :return value of memory to use in megabytes. """ return max(minimum, (virtual_memory().available / 1024 / 1024) * use_at_most) + +def parallel_map(func, items, max_workers=1): + """ + Our own implementation for parallel processing + which handles gracefully CTRL+C and reverts to + single thread processing in case of errors + :param items list of objects + :param func function to execute on each object + """ + global error + error = None + + def process_one(q): + func(q) + + def worker(): + global error + + while True: + (num, q) = pq.get() + if q is None or error is not None: + pq.task_done() + break + + try: + process_one(q) + except Exception as e: + error = e + finally: + pq.task_done() + + if max_workers > 1: + use_single_thread = False + pq = queue.PriorityQueue() + threads = [] + for i in range(max_workers): + t = threading.Thread(target=worker) + t.start() + threads.append(t) + + for t in items: + pq.put((i, t.copy())) + + def stop_workers(): + for i in range(len(threads)): + pq.put((-1, None)) + for t in threads: + t.join() + + # block until all tasks are done + try: + while pq.unfinished_tasks > 0: + time.sleep(0.5) + except KeyboardInterrupt: + print("CTRL+C terminating...") + stop_workers() + sys.exit(1) + + stop_workers() + + if error is not None: + # Try to reprocess using a single thread + # in case this was a memory error + log.ODM_WARNING("Failed to run process in parallel, retrying with a single thread...") + use_single_thread = True + else: + use_single_thread = True + + if use_single_thread: + # Boring, single thread processing + for q in items: + process_one(q) \ No newline at end of file diff --git a/opendm/dem/commands.py b/opendm/dem/commands.py index c973f494..e7e1e327 100755 --- a/opendm/dem/commands.py +++ b/opendm/dem/commands.py @@ -80,6 +80,9 @@ def create_dem(input_point_cloud, dem_type, output_type='max', radiuses=['0.56'] verbose=False, decimation=None, keep_unfilled_copy=False, apply_smoothing=True): """ Create DEM from multiple radii, and optionally gapfill """ + + # TODO: refactor to use concurrency.parallel_map + global error error = None diff --git a/opendm/point_cloud.py b/opendm/point_cloud.py index 9b4303b2..84be6b72 100644 --- a/opendm/point_cloud.py +++ b/opendm/point_cloud.py @@ -5,6 +5,7 @@ from opendm import context from opendm.system import run from opendm import entwine from opendm import io +from opendm.concurrency import parallel_map from pipes import quote def ply_info(input_ply): @@ -18,7 +19,7 @@ def ply_info(input_ply): with open(input_ply, 'r') as f: line = f.readline().strip().lower() i = 0 - while line != "end_header" and i < 100: + while line != "end_header": line = f.readline().strip().lower() props = line.split(" ") if len(props) == 3: @@ -27,6 +28,9 @@ def ply_info(input_ply): elif props[0] == "element" and props[1] == "vertex": vertex_count = int(props[2]) i += 1 + if i > 100: + raise IOError("Cannot find end_header field. Invalid PLY?") + return { 'has_normals': has_normals, @@ -99,7 +103,7 @@ def filter(input_point_cloud, output_point_cloud, standard_deviation=2.5, meank= # Do we need to split this? VERTEX_THRESHOLD = 300000 - max_concurrency = min(max_concurrency, math.ceil(info['vertex_count'] / VERTEX_THRESHOLD)) + max_concurrency = min(max_concurrency, int(math.ceil(info['vertex_count'] / VERTEX_THRESHOLD))) vertices_per_submodel = int(math.ceil(info['vertex_count'] / max(1, max_concurrency))) should_split = max_concurrency > 1 and info['vertex_count'] > VERTEX_THRESHOLD @@ -111,22 +115,25 @@ def filter(input_point_cloud, output_point_cloud, standard_deviation=2.5, meank= point_cloud_submodels = split(input_point_cloud, partsdir, "part.ply", capacity=vertices_per_submodel, dims=dims) - # Filter - for pcs in point_cloud_submodels: + def run_filter(pcs): # Recurse - filter(pcs, io.related_file_path(pcs, postfix="_filtered"), + filter(pcs['path'], io.related_file_path(pcs['path'], postfix="_filtered"), standard_deviation=standard_deviation, meank=meank, sample_radius=sample_radius, verbose=verbose, max_concurrency=1) + # Filter + parallel_map(run_filter, [{'path': p} for p in point_cloud_submodels], max_concurrency) # Merge log.ODM_INFO("Merging %s point cloud chunks to %s" % (len(point_cloud_submodels), output_point_cloud)) filtered_pcs = [io.related_file_path(pcs, postfix="_filtered") for pcs in point_cloud_submodels] - merge_ply(filtered_pcs, output_point_cloud, dims) + #merge_ply(filtered_pcs, output_point_cloud, dims) + fast_merge_ply(filtered_pcs, output_point_cloud) - # TODO REMOVE parts + if os.path.exists(partsdir): + shutil.rmtree(partsdir) else: # Process point cloud (or a point cloud submodel) in a single step filterArgs = { @@ -230,6 +237,59 @@ def merge(input_point_cloud_files, output_file, rerun=False): system.run('lasmerge -i {all_inputs} -o "{output}"'.format(**kwargs)) +def fast_merge_ply(input_point_cloud_files, output_file): + # Assumes that all input files share the same header/content format + # As the merge is a naive byte stream copy + + num_files = len(input_point_cloud_files) + if num_files == 0: + log.ODM_WARNING("No input point cloud files to process") + return + + if io.file_exists(output_file): + log.ODM_WARNING("Removing previous point cloud: %s" % output_file) + os.remove(output_file) + + vertex_count = sum([ply_info(pcf)['vertex_count'] for pcf in input_point_cloud_files]) + master_file = input_point_cloud_files[0] + with open(output_file, "wb") as out: + with open(master_file, "r") as fhead: + # Copy header + line = fhead.readline() + out.write(line) + + i = 0 + while line.strip().lower() != "end_header": + line = fhead.readline() + + # Intercept element vertex field + if line.lower().startswith("element vertex "): + out.write("element vertex %s\n" % vertex_count) + else: + out.write(line) + + i += 1 + if i > 100: + raise IOError("Cannot find end_header field. Invalid PLY?") + + for ipc in input_point_cloud_files: + i = 0 + with open(ipc, "rb") as fin: + # Skip header + line = fin.readline() + while line.strip().lower() != "end_header": + line = fin.readline() + + i += 1 + if i > 100: + raise IOError("Cannot find end_header field. Invalid PLY?") + + # Write fields + out.write(fin.read()) + + return output_file + + def merge_ply(input_point_cloud_files, output_file, dims=None): num_files = len(input_point_cloud_files) if num_files == 0: