From dcdf69feb9d3cb18265d68a4ff8d17fecda552ce Mon Sep 17 00:00:00 2001 From: Piero Toffanin Date: Sat, 10 Dec 2016 13:28:34 -0500 Subject: [PATCH] from_pgraster and to_pgraster out-of-db support --- app/fields.py | 185 +++++++++++++++++++++++++++++ app/models.py | 251 ++++++++++++++++++++------------------- app/scheduler.py | 4 +- app/tests/test_fields.py | 22 ++++ webodm/settings.py | 3 +- 5 files changed, 343 insertions(+), 122 deletions(-) create mode 100644 app/fields.py create mode 100644 app/tests/test_fields.py diff --git a/app/fields.py b/app/fields.py new file mode 100644 index 00000000..4c581243 --- /dev/null +++ b/app/fields.py @@ -0,0 +1,185 @@ +import binascii +import struct + +from django.contrib.gis.db.backends.postgis.const import GDAL_TO_POSTGIS +from django.contrib.gis.db.backends.postgis.pgraster import ( + GDAL_TO_STRUCT, POSTGIS_HEADER_STRUCTURE, POSTGIS_TO_GDAL, + STRUCT_SIZE, + pack) +from django.contrib.gis.db.backends.postgis.pgraster import chunk, unpack +from django.contrib.gis.db.models.fields import RasterField +from django.forms import ValidationError +from django.utils.translation import ugettext_lazy as _ + +class OutOfDbRasterField(RasterField): + """ + Out-of-db Raster field for GeoDjango -- evaluates into GDALRaster objects. + """ + + description = _("Out-of-db Raster Field") + + def from_db_value(self, value, expression, connection, context): + return connection.ops.parse_raster(value) + + def get_db_prep_value(self, value, connection, prepared=False): + self._check_connection(connection) + # Prepare raster for writing to database. + if not prepared: + value = connection.ops.deconstruct_raster(value) + return super(OutOfDbRasterField, self).get_db_prep_value(value, connection, prepared) + + +class POSTGIS_BANDTYPES(object): + BANDTYPE_FLAG_OFFDB = 1 << 7 + BANDTYPE_FLAG_HASNODATA = 1 << 6 + BANDTYPE_FLAG_ISNODATA = 1 << 5 + + +def from_pgraster(data): + """ + Convert a PostGIS HEX String into a dictionary. + """ + if data is None: + return + + # Split raster header from data + header, data = chunk(data, 122) + header = unpack(POSTGIS_HEADER_STRUCTURE, header) + + # Parse band data + bands = [] + pixeltypes = [] + + while data: + # Get pixel type for this band + pixeltype, data = chunk(data, 2) + pixeltype = unpack('B', pixeltype)[0] + + # Check flags + offdb = has_nodata = False + + if POSTGIS_BANDTYPES.BANDTYPE_FLAG_OFFDB & pixeltype == POSTGIS_BANDTYPES.BANDTYPE_FLAG_OFFDB: + offdb = True + pixeltype ^= POSTGIS_BANDTYPES.BANDTYPE_FLAG_OFFDB + if POSTGIS_BANDTYPES.BANDTYPE_FLAG_HASNODATA & pixeltype == POSTGIS_BANDTYPES.BANDTYPE_FLAG_HASNODATA: + has_nodata = True + pixeltype ^= POSTGIS_BANDTYPES.BANDTYPE_FLAG_HASNODATA + if POSTGIS_BANDTYPES.BANDTYPE_FLAG_ISNODATA & pixeltype == POSTGIS_BANDTYPES.BANDTYPE_FLAG_ISNODATA: + raise ValidationError("Band has pixeltype BANDTYPE_FLAG_ISNODATA flag set, but we don't know how to handle it.") + + # Convert datatype from PostGIS to GDAL & get pack type and size + pixeltype = POSTGIS_TO_GDAL[pixeltype] + pack_type = GDAL_TO_STRUCT[pixeltype] + pack_size = 2 * STRUCT_SIZE[pack_type] + + # Parse band nodata value. The nodata value is part of the + # PGRaster string even if the nodata flag is True, so it always + # has to be chunked off the data string. + nodata, data = chunk(data, pack_size) + nodata = unpack(pack_type, nodata)[0] + + if offdb: + # Extract band number + band_num, data = chunk(data, 2) + + # Find NULL byte for end of file path + file_path_length = (binascii.unhexlify(data).find(b'\x00') + 1) * 2 + + # Extract path + file_path, data = chunk(data, file_path_length) + band_result = {'path' : binascii.unhexlify(file_path).decode()[:-1]} # Remove last NULL byte + else: + # Chunk and unpack band data (pack size times nr of pixels) + band, data = chunk(data, pack_size * header[10] * header[11]) + band_result = {'data': binascii.unhexlify(band)} + + # If the nodata flag is True, set the nodata value. + if has_nodata: + band_result['nodata_value'] = nodata + if offdb: + band_result['offdb'] = True + + # Append band data to band list + bands.append(band_result) + + # Store pixeltype of this band in pixeltypes array + pixeltypes.append(pixeltype) + + # Check that all bands have the same pixeltype. + # This is required by GDAL. PostGIS rasters could have different pixeltypes + # for bands of the same raster. + if len(set(pixeltypes)) != 1: + raise ValidationError("Band pixeltypes are not all equal.") + + + return { + 'srid': int(header[9]), + 'width': header[10], 'height': header[11], + 'datatype': pixeltypes[0], + 'origin': (header[5], header[6]), + 'scale': (header[3], header[4]), + 'skew': (header[7], header[8]), + 'bands': bands, + } + + +def to_pgraster(rast, offdb = False): + """ + Convert a GDALRaster into PostGIS Raster format. + """ + # Return if the raster is null + if rast is None or rast == '': + return + + # Prepare the raster header data as a tuple. The first two numbers are + # the endianness and the PostGIS Raster Version, both are fixed by + # PostGIS at the moment. + rasterheader = ( + 1, 0, len(rast.bands), rast.scale.x, rast.scale.y, + rast.origin.x, rast.origin.y, rast.skew.x, rast.skew.y, + rast.srs.srid, rast.width, rast.height, + ) + + # Hexlify raster header + result = pack(POSTGIS_HEADER_STRUCTURE, rasterheader) + i = 0 + + for band in rast.bands: + # The PostGIS raster band header has exactly two elements, a 8BUI byte + # and the nodata value. + # + # The 8BUI stores both the PostGIS pixel data type and a nodata flag. + # It is composed as the datatype integer plus optional flags for existing + # nodata values, offdb or isnodata: + # 8BUI_VALUE = PG_PIXEL_TYPE (0-11) + FLAGS + # + # For example, if the byte value is 71, then the datatype is + # 71-64 = 7 (32BSI) and the nodata value is True. + structure = 'B' + GDAL_TO_STRUCT[band.datatype()] + + # Get band pixel type in PostGIS notation + pixeltype = GDAL_TO_POSTGIS[band.datatype()] + + # Set the nodata flag + if band.nodata_value is not None: + pixeltype |= POSTGIS_BANDTYPES.BANDTYPE_FLAG_HASNODATA + if offdb: + pixeltype |= POSTGIS_BANDTYPES.BANDTYPE_FLAG_OFFDB + + # Pack band header + bandheader = pack(structure, (pixeltype, band.nodata_value or 0)) + + # Hexlify band data + if offdb: + # Band num | Path | NULL terminator + band_data_hex = binascii.hexlify(struct.Struct('b').pack(i) + rast.name.encode('utf-8') + b'\x00').upper() + else: + band_data_hex = binascii.hexlify(band.data(as_memoryview=True)).upper() + + # Add packed header and band data to result + result += bandheader + band_data_hex + + i += 1 + + # Cast raster to string before passing it to the DB + return result.decode() \ No newline at end of file diff --git a/app/models.py b/app/models.py index 429317a4..a4587e12 100644 --- a/app/models.py +++ b/app/models.py @@ -187,146 +187,159 @@ class Task(models.Model): with a processing node or executing a pending action. """ - if self.processing_node: - # Need to process some images (UUID not yet set)? - if not self.uuid: - logger.info("Processing... {}".format(self)) + try: + if self.processing_node: + # Need to process some images (UUID not yet set)? + if not self.uuid: + logger.info("Processing... {}".format(self)) - images = [image.path() for image in self.imageupload_set.all()] + images = [image.path() for image in self.imageupload_set.all()] + try: + # This takes a while + uuid = self.processing_node.process_new_task(images, self.name, self.options) + + # Refresh task object before committing change + self.refresh_from_db() + self.uuid = uuid + self.save() + + # TODO: log process has started processing + + except ProcessingException as e: + self.set_failure(str(e)) + + if self.pending_action is not None: try: - # This takes a while - uuid = self.processing_node.process_new_task(images, self.name, self.options) + if self.pending_action == pending_actions.CANCEL: + # Do we need to cancel the task on the processing node? + logger.info("Canceling task {}".format(self)) + if self.processing_node and self.uuid: + self.processing_node.cancel_task(self.uuid) + self.pending_action = None + self.save() + else: + raise ProcessingException("Cannot cancel a task that has no processing node or UUID") - # Refresh task object before committing change - self.refresh_from_db() - self.uuid = uuid + elif self.pending_action == pending_actions.RESTART: + logger.info("Restarting task {}".format(self)) + if self.processing_node and self.uuid: + + # Check if the UUID is still valid, as processing nodes purge + # results after a set amount of time, the UUID might have eliminated. + try: + info = self.processing_node.get_task_info(self.uuid) + uuid_still_exists = info['uuid'] == self.uuid + except ProcessingException: + uuid_still_exists = False + + if uuid_still_exists: + # Good to go + self.processing_node.restart_task(self.uuid) + else: + # Task has been purged (or processing node is offline) + # TODO: what if processing node went offline? + + # Process this as a new task + # Removing its UUID will cause the scheduler + # to process this the next tick + self.uuid = None + + self.console_output = "" + self.processing_time = -1 + self.status = None + self.last_error = None + self.pending_action = None + self.save() + else: + raise ProcessingException("Cannot restart a task that has no processing node or UUID") + + elif self.pending_action == pending_actions.REMOVE: + logger.info("Removing task {}".format(self)) + if self.processing_node and self.uuid: + # Attempt to delete the resources on the processing node + # We don't care if this fails, as resources on processing nodes + # Are expected to be purged on their own after a set amount of time anyway + try: + self.processing_node.remove_task(self.uuid) + except ProcessingException: + pass + + # What's more important is that we delete our task properly here + self.delete() + + # Stop right here! + return + + except ProcessingException as e: + self.last_error = str(e) self.save() - # TODO: log process has started processing + if self.processing_node: + # Need to update status (first time, queued or running?) + if self.uuid and self.status in [None, status_codes.QUEUED, status_codes.RUNNING]: + # Update task info from processing node + try: + info = self.processing_node.get_task_info(self.uuid) - except ProcessingException as e: - self.set_failure(str(e)) + self.processing_time = info["processingTime"] + self.status = info["status"]["code"] - if self.pending_action is not None: - try: - if self.pending_action == pending_actions.CANCEL: - # Do we need to cancel the task on the processing node? - logger.info("Canceling task {}".format(self)) - if self.processing_node and self.uuid: - self.processing_node.cancel_task(self.uuid) - self.pending_action = None - self.save() - else: - raise ProcessingException("Cannot cancel a task that has no processing node or UUID") + current_lines_count = len(self.console_output.split("\n")) - 1 + self.console_output += self.processing_node.get_task_console_output(self.uuid, current_lines_count) - elif self.pending_action == pending_actions.RESTART: - logger.info("Restarting task {}".format(self)) - if self.processing_node and self.uuid: + if "errorMessage" in info["status"]: + self.last_error = info["status"]["errorMessage"] - # Check if the UUID is still valid, as processing nodes purge - # results after a set amount of time, the UUID might have eliminated. - try: - info = self.processing_node.get_task_info(self.uuid) - uuid_still_exists = info['uuid'] == self.uuid - except ProcessingException: - uuid_still_exists = False + # Has the task just been canceled, failed, or completed? + if self.status in [status_codes.FAILED, status_codes.COMPLETED, status_codes.CANCELED]: + logger.info("Processing status: {} for {}".format(self.status, self)) - if uuid_still_exists: - # Good to go - self.processing_node.restart_task(self.uuid) - else: - # Task has been purged (or processing node is offline) - # TODO: what if processing node went offline? + if self.status == status_codes.COMPLETED: + try: + assets_dir = self.assets_path("") + if not os.path.exists(assets_dir): + os.makedirs(assets_dir) - # Process this as a new task - # Removing its UUID will cause the scheduler - # to process this the next tick - self.uuid = None + logger.info("Downloading all.zip for {}".format(self)) - self.console_output = "" - self.processing_time = -1 - self.status = None - self.last_error = None - self.pending_action = None - self.save() - else: - raise ProcessingException("Cannot restart a task that has no processing node or UUID") + # Download all assets + #zip_stream = self.processing_node.download_task_asset(self.uuid, "all.zip") + zip_path = os.path.join(assets_dir, "all.zip") + #with open(zip_path, 'wb') as fd: + # for chunk in zip_stream.iter_content(4096): + # fd.write(chunk) - elif self.pending_action == pending_actions.REMOVE: - logger.info("Removing task {}".format(self)) - if self.processing_node and self.uuid: - # Attempt to delete the resources on the processing node - # We don't care if this fails, as resources on processing nodes - # Are expected to be purged on their own after a set amount of time anyway - try: - self.processing_node.remove_task(self.uuid) - except ProcessingException: - pass + logger.info("Done downloading all.zip for {}".format(self)) - # What's more important is that we delete our task properly here - self.delete() + # Extract from zip + with zipfile.ZipFile(zip_path, "r") as zip_h: + zip_h.extractall(assets_dir) - # Stop right here! - return + logger.info("Extracted all.zip for {}".format(self)) - except ProcessingException as e: - self.last_error = str(e) - self.save() - - if self.processing_node: - # Need to update status (first time, queued or running?) - if self.uuid and self.status in [None, status_codes.QUEUED, status_codes.RUNNING]: - # Update task info from processing node - try: - info = self.processing_node.get_task_info(self.uuid) - - self.processing_time = info["processingTime"] - self.status = info["status"]["code"] - - current_lines_count = len(self.console_output.split("\n")) - 1 - self.console_output += self.processing_node.get_task_console_output(self.uuid, current_lines_count) - - if "errorMessage" in info["status"]: - self.last_error = info["status"]["errorMessage"] - - # Has the task just been canceled, failed, or completed? - if self.status in [status_codes.FAILED, status_codes.COMPLETED, status_codes.CANCELED]: - logger.info("Processing status: {} for {}".format(self.status, self)) - - if self.status == status_codes.COMPLETED: - try: - assets_dir = self.assets_path("") - if not os.path.exists(assets_dir): - os.makedirs(assets_dir) - - # Download all assets - zip_stream = self.processing_node.download_task_asset(self.uuid, "all.zip") - zip_path = os.path.join(assets_dir, "all.zip") - with open(zip_path, 'wb') as fd: - for chunk in zip_stream.iter_content(4096): - fd.write(chunk) - - # Extract from zip - with zipfile.ZipFile(zip_path, "r") as zip_h: - zip_h.extractall(assets_dir) - - # Add to database orthophoto - orthophoto_path = self.assets_path("odm_orthophoto", "odm_orthophoto.tif") - if os.path.exists(orthophoto_path): - self.orthophoto = GDALRaster(orthophoto_path, write=True) + # Add to database orthophoto + orthophoto_path = self.assets_path("odm_orthophoto", "odm_orthophoto.tif") + if os.path.exists(orthophoto_path): + self.orthophoto = GDALRaster(orthophoto_path, write=True) + self.save() + except ProcessingException as e: + self.set_failure(str(e)) + else: + # FAILED, CANCELED self.save() - except ProcessingException as e: - self.set_failure(str(e)) else: - # FAILED, CANCELED + # Still waiting... self.save() - else: - # Still waiting... - self.save() - except ProcessingException as e: - self.set_failure(str(e)) + except ProcessingException as e: + self.set_failure(str(e)) + except ConnectionRefusedError as e: + logger.warning("Task {} cannot communicate with processing node: {}".format(self, str(e))) + + # In the future we might want to retry instead of just failing + #self.set_failure(str(e)) + def get_tile_path(self, z, x, y): return self.assets_path("orthophoto_tiles", z, x, "{}.png".format(y)) diff --git a/app/scheduler.py b/app/scheduler.py index 8f599e3a..5f6a3eb5 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -113,7 +113,7 @@ def setup(): scheduler.add_job(process_pending_tasks, 'interval', seconds=5) scheduler.add_job(cleanup_projects, 'interval', seconds=15) except SchedulerAlreadyRunningError: - logger.warn("Scheduler already running (this is OK while testing)") + logger.warning("Scheduler already running (this is OK while testing)") def teardown(): logger.info("Stopping scheduler...") @@ -121,4 +121,4 @@ def teardown(): scheduler.shutdown() logger.info("Scheduler stopped") except SchedulerNotRunningError: - logger.warn("Scheduler not running") + logger.warning("Scheduler not running") diff --git a/app/tests/test_fields.py b/app/tests/test_fields.py new file mode 100644 index 00000000..4c72e7ae --- /dev/null +++ b/app/tests/test_fields.py @@ -0,0 +1,22 @@ +from django.contrib.gis.gdal import GDALRaster + +from .classes import BootTestCase +from app.fields import from_pgraster, to_pgraster +import os + +class TestApi(BootTestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + def test_pgraster_functions(self): + # Make sure conversion from PostGIS <---> GDALRaster works + # for out-of-db + raster = GDALRaster(os.path.join("app", "fixtures", "orthophoto.tif")) + + self.assertTrue(raster.srid == 32615) + self.assertTrue(raster.width == 212) + + #hexwkb = \ No newline at end of file diff --git a/webodm/settings.py b/webodm/settings.py index 641d25b9..304f57be 100644 --- a/webodm/settings.py +++ b/webodm/settings.py @@ -182,6 +182,7 @@ LOGGING = { 'django': { 'handlers': ['console'], 'propagate': True, + 'level': 'WARNING', }, 'app.logger': { 'handlers': ['console'], @@ -189,7 +190,7 @@ LOGGING = { }, 'apscheduler.executors.default': { 'handlers': ['console'], - 'level': 'WARN', + 'level': 'WARNING', } } }