diff --git a/opendm/dem/commands.py b/opendm/dem/commands.py index 56c65d9d..cb253c72 100755 --- a/opendm/dem/commands.py +++ b/opendm/dem/commands.py @@ -6,6 +6,7 @@ import math import time import shutil import functools +import threading from joblib import delayed, Parallel from opendm.system import run from opendm import point_cloud @@ -311,13 +312,19 @@ def median_smoothing(geotiff_path, output_path, smoothing_iterations=1, window_s if not os.path.exists(geotiff_path): raise Exception('File %s does not exist!' % geotiff_path) + # Prepare temporary files + folder_path, output_filename = os.path.split(output_path) + basename, ext = os.path.splitext(output_filename) + + output_dirty_in = os.path.join(folder_path, "{}.dirty_1{}".format(basename, ext)) + output_dirty_out = os.path.join(folder_path, "{}.dirty_2{}".format(basename, ext)) + log.ODM_INFO('Starting smoothing...') - with rasterio.open(geotiff_path) as img: + with rasterio.open(geotiff_path, num_threads=num_workers) as img, rasterio.open(output_dirty_in, "w+", BIGTIFF="IF_SAFER", num_threads=num_workers, **img.profile) as imgout, rasterio.open(output_dirty_out, "w+", BIGTIFF="IF_SAFER", num_threads=num_workers, **img.profile) as imgout2: nodata = img.nodatavals[0] dtype = img.dtypes[0] shape = img.shape - arr = img.read()[0] for i in range(smoothing_iterations): log.ODM_INFO("Smoothing iteration %s" % str(i + 1)) rows, cols = numpy.meshgrid(numpy.arange(0, shape[0], window_size), numpy.arange(0, shape[1], window_size)) @@ -329,41 +336,73 @@ def median_smoothing(geotiff_path, output_path, smoothing_iterations=1, window_s filter = functools.partial(ndimage.median_filter, size=9, output=dtype, mode='nearest') - # threading backend and GIL released filter are important for memory efficiency and multi-core performance - window_arrays = Parallel(n_jobs=num_workers, backend='threading')(delayed(window_filter_2d)(arr, nodata , window, 9, filter) for window in windows) + # We cannot read/write to the same file from multiple threads without causing race conditions. + # To safely read/write from multiple threads, we use a lock to protect the DatasetReader/Writer. + read_lock = threading.Lock() + write_lock = threading.Lock() - for window, win_arr in zip(windows, window_arrays): - arr[window[0]:window[2], window[1]:window[3]] = win_arr - log.ODM_INFO("Smoothing completed in %s" % str(datetime.now() - start)) - # write output - with rasterio.open(output_path, 'w', BIGTIFF="IF_SAFER", **img.profile) as imgout: - imgout.write(arr, 1) + # threading backend and GIL released filter are important for memory efficiency and multi-core performance + Parallel(n_jobs=num_workers, backend='threading')(delayed(window_filter_2d)(img, imgout, nodata , window, 9, filter, read_lock, write_lock) for window in windows) + + # Between each iteration we swap the input and output temporary files + #img_in, img_out = img_out, img_in + if (i == 0): + img = imgout + imgout = imgout2 + else: + img, imgout = imgout, img + + # If the number of iterations was even, we need to swap temporary files + if (smoothing_iterations % 2 != 0): + output_dirty_in, output_dirty_out = output_dirty_out, output_dirty_in + + # Cleaning temporary files + if os.path.exists(output_dirty_out): + os.replace(output_dirty_out, output_path) + if os.path.exists(output_dirty_in): + os.remove(output_dirty_in) log.ODM_INFO('Completed smoothing to create %s in %s' % (output_path, datetime.now() - start)) return output_path -def window_filter_2d(arr, nodata, window, kernel_size, filter): +def window_filter_2d(img, imgout, nodata, window, kernel_size, filter, read_lock, write_lock): """ Apply a filter to dem within a window, expects to work with kernal based filters - :param geotiff_path: path to the geotiff to filter + :param img: path to the geotiff to filter + :param imgout: path to write the giltered geotiff to. It can be the same as img to do the modification in place. :param window: the window to apply the filter, should be a list contains row start, col_start, row_end, col_end :param kernel_size: the size of the kernel for the filter, works with odd numbers, need to test if it works with even numbers :param filter: the filter function which takes a 2d array as input and filter results as output. + :param read_lock: threading lock for the read operation + :param write_lock: threading lock for the write operation """ - shape = arr.shape[:2] + shape = img.shape[:2] if window[0] < 0 or window[1] < 0 or window[2] > shape[0] or window[3] > shape[1]: raise Exception('Window is out of bounds') expanded_window = [ max(0, window[0] - kernel_size // 2), max(0, window[1] - kernel_size // 2), min(shape[0], window[2] + kernel_size // 2), min(shape[1], window[3] + kernel_size // 2) ] - win_arr = arr[expanded_window[0]:expanded_window[2], expanded_window[1]:expanded_window[3]] + + # Read input window + width = expanded_window[3] - expanded_window[1] + height = expanded_window[2] - expanded_window[0] + rasterio_window = rasterio.windows.Window(col_off=expanded_window[1], row_off=expanded_window[0], width=width, height=height) + with read_lock: + win_arr = img.read(indexes=1, window=rasterio_window) + # Should have a better way to handle nodata, similar to the way the filter algorithms handle the border (reflection, nearest, interpolation, etc). # For now will follow the old approach to guarantee identical outputs nodata_locs = win_arr == nodata win_arr = filter(win_arr) win_arr[nodata_locs] = nodata win_arr = win_arr[window[0] - expanded_window[0] : window[2] - expanded_window[0], window[1] - expanded_window[1] : window[3] - expanded_window[1]] - return win_arr + + # Write output window + width = window[3] - window[1] + height = window[2] - window[0] + rasterio_window = rasterio.windows.Window(col_off=window[1], row_off=window[0], width=width, height=height) + with write_lock: + imgout.write(win_arr, indexes=1, window=rasterio_window) def get_dem_radius_steps(stats_file, steps, resolution, multiplier = 1.0):