kopia lustrzana https://github.com/OpenDroneMap/ODM
Zenmuse XT support
rodzic
a922aaecbc
commit
80e4b4d649
|
@ -178,6 +178,7 @@ set(custom_libs OpenSfM
|
||||||
PyPopsift
|
PyPopsift
|
||||||
Obj2Tiles
|
Obj2Tiles
|
||||||
OpenPointClass
|
OpenPointClass
|
||||||
|
ExifTool
|
||||||
)
|
)
|
||||||
|
|
||||||
externalproject_add(mve
|
externalproject_add(mve
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
import base64
|
||||||
|
from rasterio.io import MemoryFile
|
||||||
|
from opendm.system import run
|
||||||
|
from opendm import log
|
||||||
|
from opendm.utils import double_quote
|
||||||
|
|
||||||
|
def extract_raw_thermal_image_data(image_path):
|
||||||
|
try:
|
||||||
|
f, tmp_file_path = tempfile.mkstemp(suffix='.json')
|
||||||
|
os.close(f)
|
||||||
|
|
||||||
|
try:
|
||||||
|
output = run("exiftool -b -x ThumbnailImage -x PreviewImage -j \"%s\" > \"%s\"" % (image_path, tmp_file_path), quiet=True)
|
||||||
|
|
||||||
|
with open(tmp_file_path) as f:
|
||||||
|
j = json.loads(f.read())
|
||||||
|
|
||||||
|
if isinstance(j, list):
|
||||||
|
j = j[0] # single file
|
||||||
|
|
||||||
|
if "RawThermalImage" in j:
|
||||||
|
imageBytes = base64.b64decode(j["RawThermalImage"][len("base64:"):])
|
||||||
|
|
||||||
|
with MemoryFile(imageBytes) as memfile:
|
||||||
|
with memfile.open() as dataset:
|
||||||
|
img = dataset.read()
|
||||||
|
bands, h, w = img.shape
|
||||||
|
|
||||||
|
if bands != 1:
|
||||||
|
raise Exception("Raw thermal image has more than one band? This is not supported")
|
||||||
|
|
||||||
|
# (1, 512, 640) --> (512, 640, 1)
|
||||||
|
img = img[0][:,:,None]
|
||||||
|
|
||||||
|
del j["RawThermalImage"]
|
||||||
|
|
||||||
|
return extract_temperature_params_from(j), img
|
||||||
|
else:
|
||||||
|
raise Exception("Invalid JSON (not a list)")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
log.ODM_WARNING("Cannot extract tags using exiftool: %s" % str(e))
|
||||||
|
return {}, None
|
||||||
|
finally:
|
||||||
|
if os.path.isfile(tmp_file_path):
|
||||||
|
os.remove(tmp_file_path)
|
||||||
|
except Exception as e:
|
||||||
|
log.ODM_WARNING("Cannot create temporary file: %s" % str(e))
|
||||||
|
return {}, None
|
||||||
|
|
||||||
|
def unit(unit):
|
||||||
|
def _convert(v):
|
||||||
|
if isinstance(v, float):
|
||||||
|
return v
|
||||||
|
elif isinstance(v, str):
|
||||||
|
if not v[-1].isnumeric():
|
||||||
|
if v[-1].upper() != unit.upper():
|
||||||
|
log.ODM_WARNING("Assuming %s is in %s" % (v, unit))
|
||||||
|
return float(v[:-1])
|
||||||
|
else:
|
||||||
|
return float(v)
|
||||||
|
else:
|
||||||
|
return float(v)
|
||||||
|
return _convert
|
||||||
|
|
||||||
|
def extract_temperature_params_from(tags):
|
||||||
|
# Defaults
|
||||||
|
meta = {
|
||||||
|
"Emissivity": float,
|
||||||
|
"ObjectDistance": unit("m"),
|
||||||
|
"AtmosphericTemperature": unit("C"),
|
||||||
|
"ReflectedApparentTemperature": unit("C"),
|
||||||
|
"IRWindowTemperature": unit("C"),
|
||||||
|
"IRWindowTransmission": float,
|
||||||
|
"RelativeHumidity": unit("%"),
|
||||||
|
"PlanckR1": float,
|
||||||
|
"PlanckB": float,
|
||||||
|
"PlanckF": float,
|
||||||
|
"PlanckO": float,
|
||||||
|
"PlanckR2": float,
|
||||||
|
}
|
||||||
|
|
||||||
|
params = {}
|
||||||
|
|
||||||
|
for m in meta:
|
||||||
|
if m not in tags:
|
||||||
|
# All or nothing
|
||||||
|
raise Exception("Cannot find %s in tags" % m)
|
||||||
|
params[m] = (meta[m])(tags[m])
|
||||||
|
|
||||||
|
return params
|
|
@ -305,7 +305,7 @@ class ODM_Photo:
|
||||||
|
|
||||||
for xtags in xmp:
|
for xtags in xmp:
|
||||||
try:
|
try:
|
||||||
band_name = self.get_xmp_tag(xtags, ['Camera:BandName', '@Camera:BandName'])
|
band_name = self.get_xmp_tag(xtags, ['Camera:BandName', '@Camera:BandName', 'FLIR:BandName'])
|
||||||
if band_name is not None:
|
if band_name is not None:
|
||||||
self.band_name = band_name.replace(" ", "")
|
self.band_name = band_name.replace(" ", "")
|
||||||
|
|
||||||
|
|
|
@ -66,10 +66,11 @@ def sighandler(signum, frame):
|
||||||
signal.signal(signal.SIGINT, sighandler)
|
signal.signal(signal.SIGINT, sighandler)
|
||||||
signal.signal(signal.SIGTERM, sighandler)
|
signal.signal(signal.SIGTERM, sighandler)
|
||||||
|
|
||||||
def run(cmd, env_paths=[context.superbuild_bin_path], env_vars={}, packages_paths=context.python_packages_paths):
|
def run(cmd, env_paths=[context.superbuild_bin_path], env_vars={}, packages_paths=context.python_packages_paths, quiet=False):
|
||||||
"""Run a system command"""
|
"""Run a system command"""
|
||||||
global running_subprocesses
|
global running_subprocesses
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
log.ODM_INFO('running %s' % cmd)
|
log.ODM_INFO('running %s' % cmd)
|
||||||
env = os.environ.copy()
|
env = os.environ.copy()
|
||||||
|
|
||||||
|
@ -101,6 +102,7 @@ def run(cmd, env_paths=[context.superbuild_bin_path], env_vars={}, packages_path
|
||||||
|
|
||||||
retcode = p.wait()
|
retcode = p.wait()
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
log.logger.log_json_process(cmd, retcode, list(lines))
|
log.logger.log_json_process(cmd, retcode, list(lines))
|
||||||
|
|
||||||
running_subprocesses.remove(p)
|
running_subprocesses.remove(p)
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
from opendm import log
|
|
||||||
from opendm.thermal_tools import dji_unpack
|
|
||||||
import cv2
|
import cv2
|
||||||
import os
|
import os
|
||||||
|
from opendm import log
|
||||||
|
from opendm.thermal_tools import dji_unpack
|
||||||
|
from opendm.exiftool import extract_raw_thermal_image_data
|
||||||
|
from opendm.thermal_tools.thermal_utils import sensor_vals_to_temp
|
||||||
|
|
||||||
def resize_to_match(image, match_photo = None):
|
def resize_to_match(image, match_photo = None):
|
||||||
"""
|
"""
|
||||||
|
@ -19,17 +21,15 @@ def resize_to_match(image, match_photo = None):
|
||||||
interpolation=cv2.INTER_LANCZOS4)
|
interpolation=cv2.INTER_LANCZOS4)
|
||||||
return image
|
return image
|
||||||
|
|
||||||
def dn_to_temperature(photo, image, dataset_tree):
|
def dn_to_temperature(photo, image, images_path):
|
||||||
"""
|
"""
|
||||||
Convert Digital Number values to temperature (C) values
|
Convert Digital Number values to temperature (C) values
|
||||||
:param photo ODM_Photo
|
:param photo ODM_Photo
|
||||||
:param image numpy array containing image data
|
:param image numpy array containing image data
|
||||||
:param dataset_tree path to original source image to read data using PIL for DJI thermal photos
|
:param images_path path to original source image to read data using PIL for DJI thermal photos
|
||||||
:return numpy array with temperature (C) image values
|
:return numpy array with temperature (C) image values
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Handle thermal bands
|
# Handle thermal bands
|
||||||
if photo.is_thermal():
|
if photo.is_thermal():
|
||||||
# Every camera stores thermal information differently
|
# Every camera stores thermal information differently
|
||||||
|
@ -51,11 +51,18 @@ def dn_to_temperature(photo, image, dataset_tree):
|
||||||
else:
|
else:
|
||||||
return image
|
return image
|
||||||
elif photo.camera_make == "DJI" and photo.camera_model == "MAVIC2-ENTERPRISE-ADVANCED":
|
elif photo.camera_make == "DJI" and photo.camera_model == "MAVIC2-ENTERPRISE-ADVANCED":
|
||||||
image = dji_unpack.extract_temperatures_dji(photo, image, dataset_tree)
|
image = dji_unpack.extract_temperatures_dji(photo, image, images_path)
|
||||||
image = image.astype("float32")
|
image = image.astype("float32")
|
||||||
return image
|
return image
|
||||||
else:
|
else:
|
||||||
log.ODM_WARNING("Unsupported camera [%s %s], thermal band will have digital numbers." % (photo.camera_make, photo.camera_model))
|
try:
|
||||||
|
params, image = extract_raw_thermal_image_data(os.path.join(images_path, photo.filename))
|
||||||
|
image = sensor_vals_to_temp(image, **params)
|
||||||
|
except Exception as e:
|
||||||
|
log.ODM_WARNING("Cannot radiometrically calibrate %s: %s" % (photo.filename, str(e)))
|
||||||
|
|
||||||
|
image = image.astype("float32")
|
||||||
|
return image
|
||||||
else:
|
else:
|
||||||
image = image.astype("float32")
|
image = image.astype("float32")
|
||||||
log.ODM_WARNING("Tried to radiometrically calibrate a non-thermal image with temperature values (%s)" % photo.filename)
|
log.ODM_WARNING("Tried to radiometrically calibrate a non-thermal image with temperature values (%s)" % photo.filename)
|
||||||
|
|
|
@ -1,271 +0,0 @@
|
||||||
"""
|
|
||||||
THIS IS WIP, DON'T USE THIS FILE, IT IS HERE FOR FURTHER IMPROVEMENT
|
|
||||||
Tools for extracting thermal data from FLIR images.
|
|
||||||
Derived from https://bitbucket.org/nimmerwoner/flyr/src/master/
|
|
||||||
"""
|
|
||||||
|
|
||||||
import os
|
|
||||||
from io import BufferedIOBase, BytesIO
|
|
||||||
from typing import BinaryIO, Dict, Optional, Tuple, Union
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
from PIL import Image
|
|
||||||
|
|
||||||
# Constants
|
|
||||||
SEGMENT_SEP = b"\xff"
|
|
||||||
APP1_MARKER = b"\xe1"
|
|
||||||
MAGIC_FLIR_DEF = b"FLIR\x00"
|
|
||||||
|
|
||||||
CHUNK_APP1_BYTES_COUNT = len(APP1_MARKER)
|
|
||||||
CHUNK_LENGTH_BYTES_COUNT = 2
|
|
||||||
CHUNK_MAGIC_BYTES_COUNT = len(MAGIC_FLIR_DEF)
|
|
||||||
CHUNK_SKIP_BYTES_COUNT = 1
|
|
||||||
CHUNK_NUM_BYTES_COUNT = 1
|
|
||||||
CHUNK_TOT_BYTES_COUNT = 1
|
|
||||||
CHUNK_PARTIAL_METADATA_LENGTH = CHUNK_APP1_BYTES_COUNT + CHUNK_LENGTH_BYTES_COUNT + CHUNK_MAGIC_BYTES_COUNT
|
|
||||||
CHUNK_METADATA_LENGTH = (
|
|
||||||
CHUNK_PARTIAL_METADATA_LENGTH + CHUNK_SKIP_BYTES_COUNT + CHUNK_NUM_BYTES_COUNT + CHUNK_TOT_BYTES_COUNT
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def unpack(path_or_stream: Union[str, BinaryIO]) -> np.ndarray:
|
|
||||||
"""Unpacks the FLIR image, meaning that it will return the thermal data embedded in the image.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
path_or_stream : Union[str, BinaryIO]
|
|
||||||
Either a path (string) to a FLIR file, or a byte stream such as
|
|
||||||
BytesIO or file opened as `open(file_path, "rb")`.
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
FlyrThermogram
|
|
||||||
When successful, a FlyrThermogram object containing thermogram data.
|
|
||||||
"""
|
|
||||||
if isinstance(path_or_stream, str) and os.path.isfile(path_or_stream):
|
|
||||||
with open(path_or_stream, "rb") as flirh:
|
|
||||||
return unpack(flirh)
|
|
||||||
elif isinstance(path_or_stream, BufferedIOBase):
|
|
||||||
stream = path_or_stream
|
|
||||||
flir_app1_stream = extract_flir_app1(stream)
|
|
||||||
flir_records = parse_flir_app1(flir_app1_stream)
|
|
||||||
raw_np = parse_thermal(flir_app1_stream, flir_records)
|
|
||||||
|
|
||||||
return raw_np
|
|
||||||
else:
|
|
||||||
raise ValueError("Incorrect input")
|
|
||||||
|
|
||||||
|
|
||||||
def extract_flir_app1(stream: BinaryIO) -> BinaryIO:
|
|
||||||
"""Extracts the FLIR APP1 bytes.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
---------
|
|
||||||
stream : BinaryIO
|
|
||||||
A full bytes stream of a JPEG file, expected to be a FLIR file.
|
|
||||||
|
|
||||||
Raises
|
|
||||||
------
|
|
||||||
ValueError
|
|
||||||
When the file is invalid in one the next ways, a
|
|
||||||
ValueError is thrown.
|
|
||||||
|
|
||||||
* File is not a JPEG
|
|
||||||
* A FLIR chunk number occurs more than once
|
|
||||||
* The total chunks count is inconsistent over multiple chunks
|
|
||||||
* No APP1 segments are successfully parsed
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
BinaryIO
|
|
||||||
A bytes stream of the APP1 FLIR segments
|
|
||||||
"""
|
|
||||||
# Check JPEG-ness
|
|
||||||
_ = stream.read(2)
|
|
||||||
|
|
||||||
chunks_count: Optional[int] = None
|
|
||||||
chunks: Dict[int, bytes] = {}
|
|
||||||
while True:
|
|
||||||
b = stream.read(1)
|
|
||||||
if b == b"":
|
|
||||||
break
|
|
||||||
|
|
||||||
if b != SEGMENT_SEP:
|
|
||||||
continue
|
|
||||||
|
|
||||||
parsed_chunk = parse_flir_chunk(stream, chunks_count)
|
|
||||||
if not parsed_chunk:
|
|
||||||
continue
|
|
||||||
|
|
||||||
chunks_count, chunk_num, chunk = parsed_chunk
|
|
||||||
chunk_exists = chunks.get(chunk_num, None) is not None
|
|
||||||
if chunk_exists:
|
|
||||||
raise ValueError("Invalid FLIR: duplicate chunk number")
|
|
||||||
chunks[chunk_num] = chunk
|
|
||||||
|
|
||||||
# Encountered all chunks, break out of loop to process found metadata
|
|
||||||
if chunk_num == chunks_count:
|
|
||||||
break
|
|
||||||
|
|
||||||
if chunks_count is None:
|
|
||||||
raise ValueError("Invalid FLIR: no metadata encountered")
|
|
||||||
|
|
||||||
flir_app1_bytes = b""
|
|
||||||
for chunk_num in range(chunks_count + 1):
|
|
||||||
flir_app1_bytes += chunks[chunk_num]
|
|
||||||
|
|
||||||
flir_app1_stream = BytesIO(flir_app1_bytes)
|
|
||||||
flir_app1_stream.seek(0)
|
|
||||||
return flir_app1_stream
|
|
||||||
|
|
||||||
|
|
||||||
def parse_flir_chunk(stream: BinaryIO, chunks_count: Optional[int]) -> Optional[Tuple[int, int, bytes]]:
|
|
||||||
"""Parse flir chunk."""
|
|
||||||
# Parse the chunk header. Headers are as follows (definition with example):
|
|
||||||
#
|
|
||||||
# \xff\xe1<length: 2 bytes>FLIR\x00\x01<chunk nr: 1 byte><chunk count: 1 byte>
|
|
||||||
# \xff\xe1\xff\xfeFLIR\x00\x01\x01\x0b
|
|
||||||
#
|
|
||||||
# Meaning: Exif APP1, 65534 long, FLIR chunk 1 out of 12
|
|
||||||
marker = stream.read(CHUNK_APP1_BYTES_COUNT)
|
|
||||||
|
|
||||||
length_bytes = stream.read(CHUNK_LENGTH_BYTES_COUNT)
|
|
||||||
length = int.from_bytes(length_bytes, "big")
|
|
||||||
length -= CHUNK_METADATA_LENGTH
|
|
||||||
magic_flir = stream.read(CHUNK_MAGIC_BYTES_COUNT)
|
|
||||||
|
|
||||||
if not (marker == APP1_MARKER and magic_flir == MAGIC_FLIR_DEF):
|
|
||||||
# Seek back to just after byte b and continue searching for chunks
|
|
||||||
stream.seek(-len(marker) - len(length_bytes) - len(magic_flir), 1)
|
|
||||||
return None
|
|
||||||
|
|
||||||
stream.seek(1, 1) # skip 1 byte, unsure what it is for
|
|
||||||
|
|
||||||
chunk_num = int.from_bytes(stream.read(CHUNK_NUM_BYTES_COUNT), "big")
|
|
||||||
chunks_tot = int.from_bytes(stream.read(CHUNK_TOT_BYTES_COUNT), "big")
|
|
||||||
|
|
||||||
# Remember total chunks to verify metadata consistency
|
|
||||||
if chunks_count is None:
|
|
||||||
chunks_count = chunks_tot
|
|
||||||
|
|
||||||
if ( # Check whether chunk metadata is consistent
|
|
||||||
chunks_tot is None or chunk_num < 0 or chunk_num > chunks_tot or chunks_tot != chunks_count
|
|
||||||
):
|
|
||||||
raise ValueError(f"Invalid FLIR: inconsistent total chunks, should be 0 or greater, but is {chunks_tot}")
|
|
||||||
|
|
||||||
return chunks_tot, chunk_num, stream.read(length + 1)
|
|
||||||
|
|
||||||
|
|
||||||
def parse_thermal(stream: BinaryIO, records: Dict[int, Tuple[int, int, int, int]]) -> np.ndarray:
|
|
||||||
"""Parse thermal."""
|
|
||||||
RECORD_IDX_RAW_DATA = 1
|
|
||||||
raw_data_md = records[RECORD_IDX_RAW_DATA]
|
|
||||||
_, _, raw_data = parse_raw_data(stream, raw_data_md)
|
|
||||||
return raw_data
|
|
||||||
|
|
||||||
|
|
||||||
def parse_flir_app1(stream: BinaryIO) -> Dict[int, Tuple[int, int, int, int]]:
|
|
||||||
"""Parse flir app1."""
|
|
||||||
# 0x00 - string[4] file format ID = "FFF\0"
|
|
||||||
# 0x04 - string[16] file creator: seen "\0","MTX IR\0","CAMCTRL\0"
|
|
||||||
# 0x14 - int32u file format version = 100
|
|
||||||
# 0x18 - int32u offset to record directory
|
|
||||||
# 0x1c - int32u number of entries in record directory
|
|
||||||
# 0x20 - int32u next free index ID = 2
|
|
||||||
# 0x24 - int16u swap pattern = 0 (?)
|
|
||||||
# 0x28 - int16u[7] spares
|
|
||||||
# 0x34 - int32u[2] reserved
|
|
||||||
# 0x3c - int32u checksum
|
|
||||||
|
|
||||||
# 1. Read 0x40 bytes and verify that its contents equals AFF\0 or FFF\0
|
|
||||||
_ = stream.read(4)
|
|
||||||
|
|
||||||
# 2. Read FLIR record directory metadata (ref 3)
|
|
||||||
stream.seek(16, 1)
|
|
||||||
_ = int.from_bytes(stream.read(4), "big")
|
|
||||||
record_dir_offset = int.from_bytes(stream.read(4), "big")
|
|
||||||
record_dir_entries_count = int.from_bytes(stream.read(4), "big")
|
|
||||||
stream.seek(28, 1)
|
|
||||||
_ = int.from_bytes(stream.read(4), "big")
|
|
||||||
|
|
||||||
# 3. Read record directory (which is a FLIR record entry repeated
|
|
||||||
# `record_dir_entries_count` times)
|
|
||||||
stream.seek(record_dir_offset)
|
|
||||||
record_dir_stream = BytesIO(stream.read(32 * record_dir_entries_count))
|
|
||||||
|
|
||||||
# First parse the record metadata
|
|
||||||
record_details: Dict[int, Tuple[int, int, int, int]] = {}
|
|
||||||
for record_nr in range(record_dir_entries_count):
|
|
||||||
record_dir_stream.seek(0)
|
|
||||||
details = parse_flir_record_metadata(stream, record_nr)
|
|
||||||
if details:
|
|
||||||
record_details[details[1]] = details
|
|
||||||
|
|
||||||
# Then parse the actual records
|
|
||||||
# for (entry_idx, type, offset, length) in record_details:
|
|
||||||
# parse_record = record_parsers[type]
|
|
||||||
# stream.seek(offset)
|
|
||||||
# record = BytesIO(stream.read(length + 36)) # + 36 needed to find end
|
|
||||||
# parse_record(record, offset, length)
|
|
||||||
|
|
||||||
return record_details
|
|
||||||
|
|
||||||
|
|
||||||
def parse_flir_record_metadata(stream: BinaryIO, record_nr: int) -> Optional[Tuple[int, int, int, int]]:
|
|
||||||
"""Parse flir record metadata."""
|
|
||||||
# FLIR record entry (ref 3):
|
|
||||||
# 0x00 - int16u record type
|
|
||||||
# 0x02 - int16u record subtype: RawData 1=BE, 2=LE, 3=PNG; 1 for other record types
|
|
||||||
# 0x04 - int32u record version: seen 0x64,0x66,0x67,0x68,0x6f,0x104
|
|
||||||
# 0x08 - int32u index id = 1
|
|
||||||
# 0x0c - int32u record offset from start of FLIR data
|
|
||||||
# 0x10 - int32u record length
|
|
||||||
# 0x14 - int32u parent = 0 (?)
|
|
||||||
# 0x18 - int32u object number = 0 (?)
|
|
||||||
# 0x1c - int32u checksum: 0 for no checksum
|
|
||||||
entry = 32 * record_nr
|
|
||||||
stream.seek(entry)
|
|
||||||
record_type = int.from_bytes(stream.read(2), "big")
|
|
||||||
if record_type < 1:
|
|
||||||
return None
|
|
||||||
|
|
||||||
_ = int.from_bytes(stream.read(2), "big")
|
|
||||||
_ = int.from_bytes(stream.read(4), "big")
|
|
||||||
_ = int.from_bytes(stream.read(4), "big")
|
|
||||||
record_offset = int.from_bytes(stream.read(4), "big")
|
|
||||||
record_length = int.from_bytes(stream.read(4), "big")
|
|
||||||
_ = int.from_bytes(stream.read(4), "big")
|
|
||||||
_ = int.from_bytes(stream.read(4), "big")
|
|
||||||
_ = int.from_bytes(stream.read(4), "big")
|
|
||||||
return (entry, record_type, record_offset, record_length)
|
|
||||||
|
|
||||||
|
|
||||||
def parse_raw_data(stream: BinaryIO, metadata: Tuple[int, int, int, int]):
|
|
||||||
"""Parse raw data."""
|
|
||||||
(_, _, offset, length) = metadata
|
|
||||||
stream.seek(offset)
|
|
||||||
|
|
||||||
stream.seek(2, 1)
|
|
||||||
width = int.from_bytes(stream.read(2), "little")
|
|
||||||
height = int.from_bytes(stream.read(2), "little")
|
|
||||||
|
|
||||||
stream.seek(offset + 32)
|
|
||||||
|
|
||||||
# Read the bytes with the raw thermal data and decode using PIL
|
|
||||||
thermal_bytes = stream.read(length)
|
|
||||||
thermal_stream = BytesIO(thermal_bytes)
|
|
||||||
thermal_img = Image.open(thermal_stream)
|
|
||||||
thermal_np = np.array(thermal_img)
|
|
||||||
|
|
||||||
# Check shape
|
|
||||||
if thermal_np.shape != (height, width):
|
|
||||||
msg = "Invalid FLIR: metadata's width and height don't match thermal data's actual width\
|
|
||||||
and height ({} vs ({}, {})"
|
|
||||||
msg = msg.format(thermal_np.shape, height, width)
|
|
||||||
raise ValueError(msg)
|
|
||||||
|
|
||||||
# FLIR PNG data is in the wrong byte order, fix that
|
|
||||||
fix_byte_order = np.vectorize(lambda x: (x >> 8) + ((x & 0x00FF) << 8))
|
|
||||||
thermal_np = fix_byte_order(thermal_np)
|
|
||||||
|
|
||||||
return width, height, thermal_np
|
|
Ładowanie…
Reference in New Issue