From 744260af7e6358bb1bbf925fe77337d22f20a9f4 Mon Sep 17 00:00:00 2001 From: Piero Toffanin Date: Sat, 26 Jul 2025 13:30:07 -0400 Subject: [PATCH 1/2] Improve cache hits during raster export --- app/raster_utils.py | 38 +++++++++++++++++++++++++++++++++----- worker/tasks.py | 7 +++++-- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/app/raster_utils.py b/app/raster_utils.py index 4e0af61a..66c989ac 100644 --- a/app/raster_utils.py +++ b/app/raster_utils.py @@ -53,8 +53,8 @@ def compute_subwindows(window, max_window_size, overlap_pixels=0): # Generate the list of windows windows = [] - for x_offset in x_offsets: - for y_offset in y_offsets: + for y_offset in y_offsets: + for x_offset in x_offsets: w = Window( x_offset, y_offset, @@ -71,18 +71,39 @@ def compute_subwindows(window, max_window_size, overlap_pixels=0): return windows +def compute_block_aligned_subwindows(src, win): + subwins = [] + for _, w in src.block_windows(1): + dst_w = Window( + w.col_off - win.col_off, + w.row_off - win.row_off, + w.width, + w.height + ) + subwins.append([w, dst_w]) + + return subwins + def padded_window(w, pad): return Window(w.col_off - pad, w.row_off - pad, w.width + pad * 2, w.height + pad * 2) -def export_raster(input, output, progress_callback=None, **opts): +def export_raster(input, output, progress_callback=None, is_aborted=None, **opts): now = time.time() current_progress = 0 + last_update = 0 + def p(text, perc=0): nonlocal current_progress + nonlocal last_update + + t = time.time() current_progress += perc - if progress_callback is not None: - progress_callback(text, current_progress) + + if t - last_update >= 1: + if progress_callback is not None: + progress_callback(text, current_progress) + last_update = t epsg = opts.get('epsg') expression = opts.get('expression') @@ -204,6 +225,7 @@ def export_raster(input, output, progress_callback=None, **opts): alpha_index = src.colorinterp.index(ColorInterp.alpha) + 1 subwins = compute_subwindows(win, window_size) + # subwins = compute_block_aligned_subwindows(src, win) if rgb and expression is None: # More than 4 bands? @@ -291,6 +313,8 @@ def export_raster(input, output, progress_callback=None, **opts): with rasterio.open(output_raster, 'w', **profile) as dst: for idx, (w, dst_w) in enumerate(subwins): + if is_aborted is not None and is_aborted(): + return p(f"Processing tile {idx}/{num_wins}", progress_per_win) data = src.read(indexes=indexes, window=w, out_dtype=np.float32) @@ -328,6 +352,8 @@ def export_raster(input, output, progress_callback=None, **opts): # Apply hillshading, colormaps to elevation with rasterio.open(output_raster, 'w', **profile) as dst: for idx, (w, dst_w) in enumerate(subwins): + if is_aborted is not None and is_aborted(): + return p(f"Processing tile {idx}/{num_wins}", progress_per_win) # Apply colormap? @@ -380,6 +406,8 @@ def export_raster(input, output, progress_callback=None, **opts): # Copy bands as-is with rasterio.open(output_raster, 'w', **profile) as dst: for idx, (w, dst_w) in enumerate(subwins): + if is_aborted is not None and is_aborted(): + return p(f"Processing tile {idx}/{num_wins}", progress_per_win) arr = src.read(indexes=indexes, window=w) diff --git a/worker/tasks.py b/worker/tasks.py index cd7c5a35..eae52e95 100644 --- a/worker/tasks.py +++ b/worker/tasks.py @@ -20,6 +20,7 @@ from nodeodm.models import ProcessingNode from webodm import settings import worker from .celery import app +from celery.contrib.abortable import AbortableTask from app.raster_utils import export_raster as export_raster_sync, extension_for_export_format from app.pointcloud_utils import export_pointcloud as export_pointcloud_sync from django.utils import timezone @@ -190,15 +191,17 @@ def process_pending_tasks(): process_task.delay(task.id) -@app.task(bind=True, time_limit=settings.WORKERS_MAX_TIME_LIMIT) +@app.task(bind=True, time_limit=settings.WORKERS_MAX_TIME_LIMIT, base=AbortableTask) def export_raster(self, input, **opts): try: logger.info("Exporting raster {} with options: {}".format(input, json.dumps(opts))) tmpfile = tempfile.mktemp('_raster.{}'.format(extension_for_export_format(opts.get('format', 'gtiff'))), dir=settings.MEDIA_TMP) def progress_callback(status, perc): self.update_state(state="PROGRESS", meta={"status": status, "progress": perc}) + def is_aborted(): + return self.is_aborted() - export_raster_sync(input, tmpfile, progress_callback=progress_callback, **opts) + export_raster_sync(input, tmpfile, progress_callback=progress_callback, is_aborted=is_aborted, **opts) result = {'file': tmpfile} if settings.TESTING: From dc3e1cc037c9914bbdc791770061178d1c684d41 Mon Sep 17 00:00:00 2001 From: Piero Toffanin Date: Sat, 26 Jul 2025 13:57:59 -0400 Subject: [PATCH 2/2] Remove abort checks --- app/raster_utils.py | 8 +------- worker/tasks.py | 7 ++----- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/app/raster_utils.py b/app/raster_utils.py index 66c989ac..ba02c3c1 100644 --- a/app/raster_utils.py +++ b/app/raster_utils.py @@ -87,7 +87,7 @@ def compute_block_aligned_subwindows(src, win): def padded_window(w, pad): return Window(w.col_off - pad, w.row_off - pad, w.width + pad * 2, w.height + pad * 2) -def export_raster(input, output, progress_callback=None, is_aborted=None, **opts): +def export_raster(input, output, progress_callback=None, **opts): now = time.time() current_progress = 0 @@ -313,8 +313,6 @@ def export_raster(input, output, progress_callback=None, is_aborted=None, **opts with rasterio.open(output_raster, 'w', **profile) as dst: for idx, (w, dst_w) in enumerate(subwins): - if is_aborted is not None and is_aborted(): - return p(f"Processing tile {idx}/{num_wins}", progress_per_win) data = src.read(indexes=indexes, window=w, out_dtype=np.float32) @@ -352,8 +350,6 @@ def export_raster(input, output, progress_callback=None, is_aborted=None, **opts # Apply hillshading, colormaps to elevation with rasterio.open(output_raster, 'w', **profile) as dst: for idx, (w, dst_w) in enumerate(subwins): - if is_aborted is not None and is_aborted(): - return p(f"Processing tile {idx}/{num_wins}", progress_per_win) # Apply colormap? @@ -406,8 +402,6 @@ def export_raster(input, output, progress_callback=None, is_aborted=None, **opts # Copy bands as-is with rasterio.open(output_raster, 'w', **profile) as dst: for idx, (w, dst_w) in enumerate(subwins): - if is_aborted is not None and is_aborted(): - return p(f"Processing tile {idx}/{num_wins}", progress_per_win) arr = src.read(indexes=indexes, window=w) diff --git a/worker/tasks.py b/worker/tasks.py index eae52e95..cd7c5a35 100644 --- a/worker/tasks.py +++ b/worker/tasks.py @@ -20,7 +20,6 @@ from nodeodm.models import ProcessingNode from webodm import settings import worker from .celery import app -from celery.contrib.abortable import AbortableTask from app.raster_utils import export_raster as export_raster_sync, extension_for_export_format from app.pointcloud_utils import export_pointcloud as export_pointcloud_sync from django.utils import timezone @@ -191,17 +190,15 @@ def process_pending_tasks(): process_task.delay(task.id) -@app.task(bind=True, time_limit=settings.WORKERS_MAX_TIME_LIMIT, base=AbortableTask) +@app.task(bind=True, time_limit=settings.WORKERS_MAX_TIME_LIMIT) def export_raster(self, input, **opts): try: logger.info("Exporting raster {} with options: {}".format(input, json.dumps(opts))) tmpfile = tempfile.mktemp('_raster.{}'.format(extension_for_export_format(opts.get('format', 'gtiff'))), dir=settings.MEDIA_TMP) def progress_callback(status, perc): self.update_state(state="PROGRESS", meta={"status": status, "progress": perc}) - def is_aborted(): - return self.is_aborted() - export_raster_sync(input, tmpfile, progress_callback=progress_callback, is_aborted=is_aborted, **opts) + export_raster_sync(input, tmpfile, progress_callback=progress_callback, **opts) result = {'file': tmpfile} if settings.TESTING: