kopia lustrzana https://github.com/OpenDroneMap/WebODM
from_pgraster and to_pgraster out-of-db support
rodzic
03baa57dbd
commit
dcdf69feb9
|
@ -0,0 +1,185 @@
|
|||
import binascii
|
||||
import struct
|
||||
|
||||
from django.contrib.gis.db.backends.postgis.const import GDAL_TO_POSTGIS
|
||||
from django.contrib.gis.db.backends.postgis.pgraster import (
|
||||
GDAL_TO_STRUCT, POSTGIS_HEADER_STRUCTURE, POSTGIS_TO_GDAL,
|
||||
STRUCT_SIZE,
|
||||
pack)
|
||||
from django.contrib.gis.db.backends.postgis.pgraster import chunk, unpack
|
||||
from django.contrib.gis.db.models.fields import RasterField
|
||||
from django.forms import ValidationError
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
class OutOfDbRasterField(RasterField):
|
||||
"""
|
||||
Out-of-db Raster field for GeoDjango -- evaluates into GDALRaster objects.
|
||||
"""
|
||||
|
||||
description = _("Out-of-db Raster Field")
|
||||
|
||||
def from_db_value(self, value, expression, connection, context):
|
||||
return connection.ops.parse_raster(value)
|
||||
|
||||
def get_db_prep_value(self, value, connection, prepared=False):
|
||||
self._check_connection(connection)
|
||||
# Prepare raster for writing to database.
|
||||
if not prepared:
|
||||
value = connection.ops.deconstruct_raster(value)
|
||||
return super(OutOfDbRasterField, self).get_db_prep_value(value, connection, prepared)
|
||||
|
||||
|
||||
class POSTGIS_BANDTYPES(object):
|
||||
BANDTYPE_FLAG_OFFDB = 1 << 7
|
||||
BANDTYPE_FLAG_HASNODATA = 1 << 6
|
||||
BANDTYPE_FLAG_ISNODATA = 1 << 5
|
||||
|
||||
|
||||
def from_pgraster(data):
|
||||
"""
|
||||
Convert a PostGIS HEX String into a dictionary.
|
||||
"""
|
||||
if data is None:
|
||||
return
|
||||
|
||||
# Split raster header from data
|
||||
header, data = chunk(data, 122)
|
||||
header = unpack(POSTGIS_HEADER_STRUCTURE, header)
|
||||
|
||||
# Parse band data
|
||||
bands = []
|
||||
pixeltypes = []
|
||||
|
||||
while data:
|
||||
# Get pixel type for this band
|
||||
pixeltype, data = chunk(data, 2)
|
||||
pixeltype = unpack('B', pixeltype)[0]
|
||||
|
||||
# Check flags
|
||||
offdb = has_nodata = False
|
||||
|
||||
if POSTGIS_BANDTYPES.BANDTYPE_FLAG_OFFDB & pixeltype == POSTGIS_BANDTYPES.BANDTYPE_FLAG_OFFDB:
|
||||
offdb = True
|
||||
pixeltype ^= POSTGIS_BANDTYPES.BANDTYPE_FLAG_OFFDB
|
||||
if POSTGIS_BANDTYPES.BANDTYPE_FLAG_HASNODATA & pixeltype == POSTGIS_BANDTYPES.BANDTYPE_FLAG_HASNODATA:
|
||||
has_nodata = True
|
||||
pixeltype ^= POSTGIS_BANDTYPES.BANDTYPE_FLAG_HASNODATA
|
||||
if POSTGIS_BANDTYPES.BANDTYPE_FLAG_ISNODATA & pixeltype == POSTGIS_BANDTYPES.BANDTYPE_FLAG_ISNODATA:
|
||||
raise ValidationError("Band has pixeltype BANDTYPE_FLAG_ISNODATA flag set, but we don't know how to handle it.")
|
||||
|
||||
# Convert datatype from PostGIS to GDAL & get pack type and size
|
||||
pixeltype = POSTGIS_TO_GDAL[pixeltype]
|
||||
pack_type = GDAL_TO_STRUCT[pixeltype]
|
||||
pack_size = 2 * STRUCT_SIZE[pack_type]
|
||||
|
||||
# Parse band nodata value. The nodata value is part of the
|
||||
# PGRaster string even if the nodata flag is True, so it always
|
||||
# has to be chunked off the data string.
|
||||
nodata, data = chunk(data, pack_size)
|
||||
nodata = unpack(pack_type, nodata)[0]
|
||||
|
||||
if offdb:
|
||||
# Extract band number
|
||||
band_num, data = chunk(data, 2)
|
||||
|
||||
# Find NULL byte for end of file path
|
||||
file_path_length = (binascii.unhexlify(data).find(b'\x00') + 1) * 2
|
||||
|
||||
# Extract path
|
||||
file_path, data = chunk(data, file_path_length)
|
||||
band_result = {'path' : binascii.unhexlify(file_path).decode()[:-1]} # Remove last NULL byte
|
||||
else:
|
||||
# Chunk and unpack band data (pack size times nr of pixels)
|
||||
band, data = chunk(data, pack_size * header[10] * header[11])
|
||||
band_result = {'data': binascii.unhexlify(band)}
|
||||
|
||||
# If the nodata flag is True, set the nodata value.
|
||||
if has_nodata:
|
||||
band_result['nodata_value'] = nodata
|
||||
if offdb:
|
||||
band_result['offdb'] = True
|
||||
|
||||
# Append band data to band list
|
||||
bands.append(band_result)
|
||||
|
||||
# Store pixeltype of this band in pixeltypes array
|
||||
pixeltypes.append(pixeltype)
|
||||
|
||||
# Check that all bands have the same pixeltype.
|
||||
# This is required by GDAL. PostGIS rasters could have different pixeltypes
|
||||
# for bands of the same raster.
|
||||
if len(set(pixeltypes)) != 1:
|
||||
raise ValidationError("Band pixeltypes are not all equal.")
|
||||
|
||||
|
||||
return {
|
||||
'srid': int(header[9]),
|
||||
'width': header[10], 'height': header[11],
|
||||
'datatype': pixeltypes[0],
|
||||
'origin': (header[5], header[6]),
|
||||
'scale': (header[3], header[4]),
|
||||
'skew': (header[7], header[8]),
|
||||
'bands': bands,
|
||||
}
|
||||
|
||||
|
||||
def to_pgraster(rast, offdb = False):
|
||||
"""
|
||||
Convert a GDALRaster into PostGIS Raster format.
|
||||
"""
|
||||
# Return if the raster is null
|
||||
if rast is None or rast == '':
|
||||
return
|
||||
|
||||
# Prepare the raster header data as a tuple. The first two numbers are
|
||||
# the endianness and the PostGIS Raster Version, both are fixed by
|
||||
# PostGIS at the moment.
|
||||
rasterheader = (
|
||||
1, 0, len(rast.bands), rast.scale.x, rast.scale.y,
|
||||
rast.origin.x, rast.origin.y, rast.skew.x, rast.skew.y,
|
||||
rast.srs.srid, rast.width, rast.height,
|
||||
)
|
||||
|
||||
# Hexlify raster header
|
||||
result = pack(POSTGIS_HEADER_STRUCTURE, rasterheader)
|
||||
i = 0
|
||||
|
||||
for band in rast.bands:
|
||||
# The PostGIS raster band header has exactly two elements, a 8BUI byte
|
||||
# and the nodata value.
|
||||
#
|
||||
# The 8BUI stores both the PostGIS pixel data type and a nodata flag.
|
||||
# It is composed as the datatype integer plus optional flags for existing
|
||||
# nodata values, offdb or isnodata:
|
||||
# 8BUI_VALUE = PG_PIXEL_TYPE (0-11) + FLAGS
|
||||
#
|
||||
# For example, if the byte value is 71, then the datatype is
|
||||
# 71-64 = 7 (32BSI) and the nodata value is True.
|
||||
structure = 'B' + GDAL_TO_STRUCT[band.datatype()]
|
||||
|
||||
# Get band pixel type in PostGIS notation
|
||||
pixeltype = GDAL_TO_POSTGIS[band.datatype()]
|
||||
|
||||
# Set the nodata flag
|
||||
if band.nodata_value is not None:
|
||||
pixeltype |= POSTGIS_BANDTYPES.BANDTYPE_FLAG_HASNODATA
|
||||
if offdb:
|
||||
pixeltype |= POSTGIS_BANDTYPES.BANDTYPE_FLAG_OFFDB
|
||||
|
||||
# Pack band header
|
||||
bandheader = pack(structure, (pixeltype, band.nodata_value or 0))
|
||||
|
||||
# Hexlify band data
|
||||
if offdb:
|
||||
# Band num | Path | NULL terminator
|
||||
band_data_hex = binascii.hexlify(struct.Struct('b').pack(i) + rast.name.encode('utf-8') + b'\x00').upper()
|
||||
else:
|
||||
band_data_hex = binascii.hexlify(band.data(as_memoryview=True)).upper()
|
||||
|
||||
# Add packed header and band data to result
|
||||
result += bandheader + band_data_hex
|
||||
|
||||
i += 1
|
||||
|
||||
# Cast raster to string before passing it to the DB
|
||||
return result.decode()
|
251
app/models.py
251
app/models.py
|
@ -187,146 +187,159 @@ class Task(models.Model):
|
|||
with a processing node or executing a pending action.
|
||||
"""
|
||||
|
||||
if self.processing_node:
|
||||
# Need to process some images (UUID not yet set)?
|
||||
if not self.uuid:
|
||||
logger.info("Processing... {}".format(self))
|
||||
try:
|
||||
if self.processing_node:
|
||||
# Need to process some images (UUID not yet set)?
|
||||
if not self.uuid:
|
||||
logger.info("Processing... {}".format(self))
|
||||
|
||||
images = [image.path() for image in self.imageupload_set.all()]
|
||||
images = [image.path() for image in self.imageupload_set.all()]
|
||||
|
||||
try:
|
||||
# This takes a while
|
||||
uuid = self.processing_node.process_new_task(images, self.name, self.options)
|
||||
|
||||
# Refresh task object before committing change
|
||||
self.refresh_from_db()
|
||||
self.uuid = uuid
|
||||
self.save()
|
||||
|
||||
# TODO: log process has started processing
|
||||
|
||||
except ProcessingException as e:
|
||||
self.set_failure(str(e))
|
||||
|
||||
if self.pending_action is not None:
|
||||
try:
|
||||
# This takes a while
|
||||
uuid = self.processing_node.process_new_task(images, self.name, self.options)
|
||||
if self.pending_action == pending_actions.CANCEL:
|
||||
# Do we need to cancel the task on the processing node?
|
||||
logger.info("Canceling task {}".format(self))
|
||||
if self.processing_node and self.uuid:
|
||||
self.processing_node.cancel_task(self.uuid)
|
||||
self.pending_action = None
|
||||
self.save()
|
||||
else:
|
||||
raise ProcessingException("Cannot cancel a task that has no processing node or UUID")
|
||||
|
||||
# Refresh task object before committing change
|
||||
self.refresh_from_db()
|
||||
self.uuid = uuid
|
||||
elif self.pending_action == pending_actions.RESTART:
|
||||
logger.info("Restarting task {}".format(self))
|
||||
if self.processing_node and self.uuid:
|
||||
|
||||
# Check if the UUID is still valid, as processing nodes purge
|
||||
# results after a set amount of time, the UUID might have eliminated.
|
||||
try:
|
||||
info = self.processing_node.get_task_info(self.uuid)
|
||||
uuid_still_exists = info['uuid'] == self.uuid
|
||||
except ProcessingException:
|
||||
uuid_still_exists = False
|
||||
|
||||
if uuid_still_exists:
|
||||
# Good to go
|
||||
self.processing_node.restart_task(self.uuid)
|
||||
else:
|
||||
# Task has been purged (or processing node is offline)
|
||||
# TODO: what if processing node went offline?
|
||||
|
||||
# Process this as a new task
|
||||
# Removing its UUID will cause the scheduler
|
||||
# to process this the next tick
|
||||
self.uuid = None
|
||||
|
||||
self.console_output = ""
|
||||
self.processing_time = -1
|
||||
self.status = None
|
||||
self.last_error = None
|
||||
self.pending_action = None
|
||||
self.save()
|
||||
else:
|
||||
raise ProcessingException("Cannot restart a task that has no processing node or UUID")
|
||||
|
||||
elif self.pending_action == pending_actions.REMOVE:
|
||||
logger.info("Removing task {}".format(self))
|
||||
if self.processing_node and self.uuid:
|
||||
# Attempt to delete the resources on the processing node
|
||||
# We don't care if this fails, as resources on processing nodes
|
||||
# Are expected to be purged on their own after a set amount of time anyway
|
||||
try:
|
||||
self.processing_node.remove_task(self.uuid)
|
||||
except ProcessingException:
|
||||
pass
|
||||
|
||||
# What's more important is that we delete our task properly here
|
||||
self.delete()
|
||||
|
||||
# Stop right here!
|
||||
return
|
||||
|
||||
except ProcessingException as e:
|
||||
self.last_error = str(e)
|
||||
self.save()
|
||||
|
||||
# TODO: log process has started processing
|
||||
if self.processing_node:
|
||||
# Need to update status (first time, queued or running?)
|
||||
if self.uuid and self.status in [None, status_codes.QUEUED, status_codes.RUNNING]:
|
||||
# Update task info from processing node
|
||||
try:
|
||||
info = self.processing_node.get_task_info(self.uuid)
|
||||
|
||||
except ProcessingException as e:
|
||||
self.set_failure(str(e))
|
||||
self.processing_time = info["processingTime"]
|
||||
self.status = info["status"]["code"]
|
||||
|
||||
if self.pending_action is not None:
|
||||
try:
|
||||
if self.pending_action == pending_actions.CANCEL:
|
||||
# Do we need to cancel the task on the processing node?
|
||||
logger.info("Canceling task {}".format(self))
|
||||
if self.processing_node and self.uuid:
|
||||
self.processing_node.cancel_task(self.uuid)
|
||||
self.pending_action = None
|
||||
self.save()
|
||||
else:
|
||||
raise ProcessingException("Cannot cancel a task that has no processing node or UUID")
|
||||
current_lines_count = len(self.console_output.split("\n")) - 1
|
||||
self.console_output += self.processing_node.get_task_console_output(self.uuid, current_lines_count)
|
||||
|
||||
elif self.pending_action == pending_actions.RESTART:
|
||||
logger.info("Restarting task {}".format(self))
|
||||
if self.processing_node and self.uuid:
|
||||
if "errorMessage" in info["status"]:
|
||||
self.last_error = info["status"]["errorMessage"]
|
||||
|
||||
# Check if the UUID is still valid, as processing nodes purge
|
||||
# results after a set amount of time, the UUID might have eliminated.
|
||||
try:
|
||||
info = self.processing_node.get_task_info(self.uuid)
|
||||
uuid_still_exists = info['uuid'] == self.uuid
|
||||
except ProcessingException:
|
||||
uuid_still_exists = False
|
||||
# Has the task just been canceled, failed, or completed?
|
||||
if self.status in [status_codes.FAILED, status_codes.COMPLETED, status_codes.CANCELED]:
|
||||
logger.info("Processing status: {} for {}".format(self.status, self))
|
||||
|
||||
if uuid_still_exists:
|
||||
# Good to go
|
||||
self.processing_node.restart_task(self.uuid)
|
||||
else:
|
||||
# Task has been purged (or processing node is offline)
|
||||
# TODO: what if processing node went offline?
|
||||
if self.status == status_codes.COMPLETED:
|
||||
try:
|
||||
assets_dir = self.assets_path("")
|
||||
if not os.path.exists(assets_dir):
|
||||
os.makedirs(assets_dir)
|
||||
|
||||
# Process this as a new task
|
||||
# Removing its UUID will cause the scheduler
|
||||
# to process this the next tick
|
||||
self.uuid = None
|
||||
logger.info("Downloading all.zip for {}".format(self))
|
||||
|
||||
self.console_output = ""
|
||||
self.processing_time = -1
|
||||
self.status = None
|
||||
self.last_error = None
|
||||
self.pending_action = None
|
||||
self.save()
|
||||
else:
|
||||
raise ProcessingException("Cannot restart a task that has no processing node or UUID")
|
||||
# Download all assets
|
||||
#zip_stream = self.processing_node.download_task_asset(self.uuid, "all.zip")
|
||||
zip_path = os.path.join(assets_dir, "all.zip")
|
||||
#with open(zip_path, 'wb') as fd:
|
||||
# for chunk in zip_stream.iter_content(4096):
|
||||
# fd.write(chunk)
|
||||
|
||||
elif self.pending_action == pending_actions.REMOVE:
|
||||
logger.info("Removing task {}".format(self))
|
||||
if self.processing_node and self.uuid:
|
||||
# Attempt to delete the resources on the processing node
|
||||
# We don't care if this fails, as resources on processing nodes
|
||||
# Are expected to be purged on their own after a set amount of time anyway
|
||||
try:
|
||||
self.processing_node.remove_task(self.uuid)
|
||||
except ProcessingException:
|
||||
pass
|
||||
logger.info("Done downloading all.zip for {}".format(self))
|
||||
|
||||
# What's more important is that we delete our task properly here
|
||||
self.delete()
|
||||
# Extract from zip
|
||||
with zipfile.ZipFile(zip_path, "r") as zip_h:
|
||||
zip_h.extractall(assets_dir)
|
||||
|
||||
# Stop right here!
|
||||
return
|
||||
logger.info("Extracted all.zip for {}".format(self))
|
||||
|
||||
except ProcessingException as e:
|
||||
self.last_error = str(e)
|
||||
self.save()
|
||||
|
||||
if self.processing_node:
|
||||
# Need to update status (first time, queued or running?)
|
||||
if self.uuid and self.status in [None, status_codes.QUEUED, status_codes.RUNNING]:
|
||||
# Update task info from processing node
|
||||
try:
|
||||
info = self.processing_node.get_task_info(self.uuid)
|
||||
|
||||
self.processing_time = info["processingTime"]
|
||||
self.status = info["status"]["code"]
|
||||
|
||||
current_lines_count = len(self.console_output.split("\n")) - 1
|
||||
self.console_output += self.processing_node.get_task_console_output(self.uuid, current_lines_count)
|
||||
|
||||
if "errorMessage" in info["status"]:
|
||||
self.last_error = info["status"]["errorMessage"]
|
||||
|
||||
# Has the task just been canceled, failed, or completed?
|
||||
if self.status in [status_codes.FAILED, status_codes.COMPLETED, status_codes.CANCELED]:
|
||||
logger.info("Processing status: {} for {}".format(self.status, self))
|
||||
|
||||
if self.status == status_codes.COMPLETED:
|
||||
try:
|
||||
assets_dir = self.assets_path("")
|
||||
if not os.path.exists(assets_dir):
|
||||
os.makedirs(assets_dir)
|
||||
|
||||
# Download all assets
|
||||
zip_stream = self.processing_node.download_task_asset(self.uuid, "all.zip")
|
||||
zip_path = os.path.join(assets_dir, "all.zip")
|
||||
with open(zip_path, 'wb') as fd:
|
||||
for chunk in zip_stream.iter_content(4096):
|
||||
fd.write(chunk)
|
||||
|
||||
# Extract from zip
|
||||
with zipfile.ZipFile(zip_path, "r") as zip_h:
|
||||
zip_h.extractall(assets_dir)
|
||||
|
||||
# Add to database orthophoto
|
||||
orthophoto_path = self.assets_path("odm_orthophoto", "odm_orthophoto.tif")
|
||||
if os.path.exists(orthophoto_path):
|
||||
self.orthophoto = GDALRaster(orthophoto_path, write=True)
|
||||
# Add to database orthophoto
|
||||
orthophoto_path = self.assets_path("odm_orthophoto", "odm_orthophoto.tif")
|
||||
if os.path.exists(orthophoto_path):
|
||||
self.orthophoto = GDALRaster(orthophoto_path, write=True)
|
||||
|
||||
self.save()
|
||||
except ProcessingException as e:
|
||||
self.set_failure(str(e))
|
||||
else:
|
||||
# FAILED, CANCELED
|
||||
self.save()
|
||||
except ProcessingException as e:
|
||||
self.set_failure(str(e))
|
||||
else:
|
||||
# FAILED, CANCELED
|
||||
# Still waiting...
|
||||
self.save()
|
||||
else:
|
||||
# Still waiting...
|
||||
self.save()
|
||||
except ProcessingException as e:
|
||||
self.set_failure(str(e))
|
||||
except ProcessingException as e:
|
||||
self.set_failure(str(e))
|
||||
except ConnectionRefusedError as e:
|
||||
logger.warning("Task {} cannot communicate with processing node: {}".format(self, str(e)))
|
||||
|
||||
# In the future we might want to retry instead of just failing
|
||||
#self.set_failure(str(e))
|
||||
|
||||
|
||||
def get_tile_path(self, z, x, y):
|
||||
return self.assets_path("orthophoto_tiles", z, x, "{}.png".format(y))
|
||||
|
|
|
@ -113,7 +113,7 @@ def setup():
|
|||
scheduler.add_job(process_pending_tasks, 'interval', seconds=5)
|
||||
scheduler.add_job(cleanup_projects, 'interval', seconds=15)
|
||||
except SchedulerAlreadyRunningError:
|
||||
logger.warn("Scheduler already running (this is OK while testing)")
|
||||
logger.warning("Scheduler already running (this is OK while testing)")
|
||||
|
||||
def teardown():
|
||||
logger.info("Stopping scheduler...")
|
||||
|
@ -121,4 +121,4 @@ def teardown():
|
|||
scheduler.shutdown()
|
||||
logger.info("Scheduler stopped")
|
||||
except SchedulerNotRunningError:
|
||||
logger.warn("Scheduler not running")
|
||||
logger.warning("Scheduler not running")
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
from django.contrib.gis.gdal import GDALRaster
|
||||
|
||||
from .classes import BootTestCase
|
||||
from app.fields import from_pgraster, to_pgraster
|
||||
import os
|
||||
|
||||
class TestApi(BootTestCase):
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def test_pgraster_functions(self):
|
||||
# Make sure conversion from PostGIS <---> GDALRaster works
|
||||
# for out-of-db
|
||||
raster = GDALRaster(os.path.join("app", "fixtures", "orthophoto.tif"))
|
||||
|
||||
self.assertTrue(raster.srid == 32615)
|
||||
self.assertTrue(raster.width == 212)
|
||||
|
||||
#hexwkb =
|
|
@ -182,6 +182,7 @@ LOGGING = {
|
|||
'django': {
|
||||
'handlers': ['console'],
|
||||
'propagate': True,
|
||||
'level': 'WARNING',
|
||||
},
|
||||
'app.logger': {
|
||||
'handlers': ['console'],
|
||||
|
@ -189,7 +190,7 @@ LOGGING = {
|
|||
},
|
||||
'apscheduler.executors.default': {
|
||||
'handlers': ['console'],
|
||||
'level': 'WARN',
|
||||
'level': 'WARNING',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Ładowanie…
Reference in New Issue