OpenDroneMap-ODM/opendm/video/video2dataset.py

351 wiersze
14 KiB
Python

import datetime
from fractions import Fraction
import io
from math import ceil, floor
import time
import cv2
import os
import collections
from PIL import Image
import numpy as np
import piexif
from opendm import log
from opendm.video.srtparser import SrtFileParser
from opendm.video.parameters import Parameters
from opendm.video.checkers import BlackFrameChecker, SimilarityChecker, ThresholdBlurChecker
class Video2Dataset:
def __init__(self, parameters : Parameters):
self.parameters = parameters
self.blur_checker = ThresholdBlurChecker(parameters.blur_threshold) if parameters.blur_threshold is not None else None
self.similarity_checker = SimilarityChecker(parameters.distance_threshold) if parameters.distance_threshold is not None else None
self.black_checker = BlackFrameChecker(parameters.black_ratio_threshold, parameters.pixel_black_threshold) if parameters.black_ratio_threshold is not None or parameters.pixel_black_threshold is not None else None
self.frame_index = parameters.start
self.f = None
def ProcessVideo(self):
self.date_now = None
start = time.time()
if (self.parameters.stats_file is not None):
self.f = open(self.parameters.stats_file, "w")
self.f.write("global_idx;file_name;frame_index;blur_score;is_blurry;is_black;last_frame_index;similarity_score;is_similar;written\n")
self.global_idx = 0
output_file_paths = []
# foreach input file
for input_file in self.parameters.input:
# get file name
file_name = os.path.basename(input_file)
log.ODM_INFO("Processing video: {}".format(input_file))
# get video info
video_info = get_video_info(input_file)
log.ODM_INFO(video_info)
# Set pseudo start time
if self.date_now is None:
try:
self.date_now = datetime.datetime.fromtimestamp(os.path.getmtime(input_file))
except:
self.date_now = datetime.datetime.now()
else:
self.date_now += datetime.timedelta(seconds=video_info.total_frames / video_info.frame_rate)
log.ODM_INFO("Use pseudo start time: %s" % self.date_now)
if self.parameters.use_srt:
name = os.path.splitext(input_file)[0]
srt_files = [name + ".srt", name + ".SRT"]
srt_parser = None
for srt_file in srt_files:
if os.path.exists(srt_file):
log.ODM_INFO("Loading SRT file: {}".format(srt_file))
try:
srt_parser = SrtFileParser(srt_file)
srt_parser.parse()
break
except Exception as e:
log.ODM_INFO("Error parsing SRT file: {}".format(e))
srt_parser = None
else:
srt_parser = None
if (self.black_checker is not None and self.black_checker.NeedPreProcess()):
start2 = time.time()
log.ODM_INFO("Preprocessing for black frame checker... this might take a bit")
self.black_checker.PreProcess(input_file, self.parameters.start, self.parameters.end)
end = time.time()
log.ODM_INFO("Preprocessing time: {:.2f}s".format(end - start2))
log.ODM_INFO("Calculated luminance_range_size is {}".format(self.black_checker.luminance_range_size))
log.ODM_INFO("Calculated luminance_minimum_value is {}".format(self.black_checker.luminance_minimum_value))
log.ODM_INFO("Calculated absolute_threshold is {}".format(self.black_checker.absolute_threshold))
# open video file
cap = cv2.VideoCapture(input_file)
if (not cap.isOpened()):
log.ODM_INFO("Error opening video stream or file")
return
if (self.parameters.start is not None):
cap.set(cv2.CAP_PROP_POS_FRAMES, self.parameters.start)
self.frame_index = self.parameters.start
start_frame = self.parameters.start
else:
start_frame = 0
frames_to_process = self.parameters.end - start_frame + 1 if (self.parameters.end is not None) else video_info.total_frames - start_frame
progress = 0
while (cap.isOpened()):
ret, frame = cap.read()
if not ret:
break
if (self.parameters.end is not None and self.frame_index > self.parameters.end):
break
# Calculate progress percentage
prev_progress = progress
progress = floor((self.frame_index - start_frame + 1) / frames_to_process * 100)
if progress != prev_progress:
print("[{}][{:3d}%] Processing frame {}/{}: ".format(file_name, progress, self.frame_index - start_frame + 1, frames_to_process), end="\r")
stats = self.ProcessFrame(frame, video_info, srt_parser)
if stats is not None and self.parameters.stats_file is not None:
self.WriteStats(input_file, stats)
# Add element to array
if stats is not None and "written" in stats.keys():
output_file_paths.append(stats["path"])
cap.release()
if self.f is not None:
self.f.close()
if self.parameters.limit is not None and self.parameters.limit > 0 and self.global_idx >= self.parameters.limit:
log.ODM_INFO("Limit of {} frames reached, trimming dataset".format(self.parameters.limit))
output_file_paths = limit_files(output_file_paths, self.parameters.limit)
end = time.time()
log.ODM_INFO("Total processing time: {:.2f}s".format(end - start))
return output_file_paths
def ProcessFrame(self, frame, video_info, srt_parser):
res = {"frame_index": self.frame_index, "global_idx": self.global_idx}
frame_bw = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
h, w = frame_bw.shape
resolution = self.parameters.internal_resolution
if resolution < w or resolution < h:
m = max(w, h)
factor = resolution / m
frame_bw = cv2.resize(frame_bw, (int(ceil(w * factor)), int(ceil(h * factor))), interpolation=cv2.INTER_NEAREST)
if (self.blur_checker is not None):
blur_score, is_blurry = self.blur_checker.IsBlur(frame_bw, self.frame_index)
res["blur_score"] = blur_score
res["is_blurry"] = is_blurry
if is_blurry:
# print ("blurry, skipping")
self.frame_index += 1
return res
if (self.black_checker is not None):
is_black = self.black_checker.IsBlack(frame_bw, self.frame_index)
res["is_black"] = is_black
if is_black:
# print ("black, skipping")
self.frame_index += 1
return res
if (self.similarity_checker is not None):
similarity_score, is_similar, last_frame_index = self.similarity_checker.IsSimilar(frame_bw, self.frame_index)
res["similarity_score"] = similarity_score
res["is_similar"] = is_similar
res["last_frame_index"] = last_frame_index
if is_similar:
# print ("similar to {}, skipping".format(self.similarity_checker.last_image_id))
self.frame_index += 1
return res
path = self.SaveFrame(frame, video_info, srt_parser)
res["written"] = True
res["path"] = path
self.frame_index += 1
self.global_idx += 1
return res
def SaveFrame(self, frame, video_info, srt_parser: SrtFileParser):
max_dim = self.parameters.max_dimension
if max_dim is not None:
h, w, _ = frame.shape
if max_dim < w or max_dim < h:
m = max(w, h)
factor = max_dim / m
frame = cv2.resize(frame, (int(ceil(w * factor)), int(ceil(h * factor))), interpolation=cv2.INTER_AREA)
path = os.path.join(self.parameters.output,
"{}_{}_{}.{}".format(video_info.basename, self.global_idx, self.frame_index, self.parameters.frame_format))
_, buf = cv2.imencode('.' + self.parameters.frame_format, frame)
delta = datetime.timedelta(seconds=(self.frame_index / video_info.frame_rate))
elapsed_time = datetime.datetime(1900, 1, 1) + delta
img = Image.open(io.BytesIO(buf))
entry = gps_coords = None
if srt_parser is not None:
entry = srt_parser.get_entry(elapsed_time)
gps_coords = srt_parser.get_gps(elapsed_time)
exif_time = (elapsed_time + (self.date_now - datetime.datetime(1900, 1, 1)))
elapsed_time_str = exif_time.strftime("%Y:%m:%d %H:%M:%S")
subsec_time_str = exif_time.strftime("%f")
# Exif dict contains the following keys: '0th', 'Exif', 'GPS', '1st', 'thumbnail'
# Set the EXIF metadata
exif_dict = {
"0th": {
piexif.ImageIFD.Software: "ODM",
piexif.ImageIFD.DateTime: elapsed_time_str,
piexif.ImageIFD.XResolution: (frame.shape[1], 1),
piexif.ImageIFD.YResolution: (frame.shape[0], 1),
piexif.ImageIFD.Make: "DJI" if video_info.basename.lower().startswith("dji") else "Unknown",
piexif.ImageIFD.Model: "Unknown"
},
"Exif": {
piexif.ExifIFD.DateTimeOriginal: elapsed_time_str,
piexif.ExifIFD.DateTimeDigitized: elapsed_time_str,
piexif.ExifIFD.SubSecTime: subsec_time_str,
piexif.ExifIFD.PixelXDimension: frame.shape[1],
piexif.ExifIFD.PixelYDimension: frame.shape[0],
}}
if entry is not None:
if entry["shutter"] is not None:
exif_dict["Exif"][piexif.ExifIFD.ExposureTime] = (1, int(entry["shutter"]))
if entry["focal_len"] is not None:
exif_dict["Exif"][piexif.ExifIFD.FocalLength] = (entry["focal_len"], 100)
if entry["fnum"] is not None:
exif_dict["Exif"][piexif.ExifIFD.FNumber] = float_to_rational(entry["fnum"])
if entry["iso"] is not None:
exif_dict["Exif"][piexif.ExifIFD.ISOSpeedRatings] = entry["iso"]
if gps_coords is not None:
exif_dict["GPS"] = get_gps_location(elapsed_time, gps_coords[1], gps_coords[0], gps_coords[2])
exif_bytes = piexif.dump(exif_dict)
img.save(path, exif=exif_bytes, quality=95)
return path
def WriteStats(self, input_file, stats):
self.f.write("{};{};{};{};{};{};{};{};{};{}\n".format(
stats["global_idx"],
input_file,
stats["frame_index"],
stats["blur_score"] if "blur_score" in stats else "",
stats["is_blurry"] if "is_blurry" in stats else "",
stats["is_black"] if "is_black" in stats else "",
stats["last_frame_index"] if "last_frame_index" in stats else "",
stats["similarity_score"] if "similarity_score" in stats else "",
stats["is_similar"] if "is_similar" in stats else "",
stats["written"] if "written" in stats else "").replace(".", ","))
def get_video_info(input_file):
video = cv2.VideoCapture(input_file)
basename = os.path.splitext(os.path.basename(input_file))[0]
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
frame_rate = video.get(cv2.CAP_PROP_FPS)
video.release()
return collections.namedtuple("VideoInfo", ["total_frames", "frame_rate", "basename"])(total_frames, frame_rate, basename)
def float_to_rational(f):
f = Fraction(f).limit_denominator()
return (f.numerator, f.denominator)
def limit_files(paths, limit):
if len(paths) <= limit:
return paths
to_keep = []
all_idxes = np.arange(0, len(paths))
keep_idxes = np.linspace(0, len(paths) - 1, limit, dtype=int)
remove_idxes = set(all_idxes) - set(keep_idxes)
p = np.array(paths)
to_keep = list(p[keep_idxes])
for idx in remove_idxes:
os.remove(paths[idx])
return to_keep
def to_deg(value, loc):
"""convert decimal coordinates into degrees, munutes and seconds tuple
Keyword arguments: value is float gps-value, loc is direction list ["S", "N"] or ["W", "E"]
return: tuple like (25, 13, 48.343 ,'N')
"""
if value < 0:
loc_value = loc[0]
elif value > 0:
loc_value = loc[1]
else:
loc_value = ""
abs_value = abs(value)
deg = int(abs_value)
t1 = (abs_value-deg)*60
min = int(t1)
sec = round((t1 - min)* 60, 5)
return (deg, min, sec, loc_value)
def get_gps_location(elapsed_time, lat, lng, altitude):
lat_deg = to_deg(lat, ["S", "N"])
lng_deg = to_deg(lng, ["W", "E"])
exiv_lat = (float_to_rational(lat_deg[0]), float_to_rational(lat_deg[1]), float_to_rational(lat_deg[2]))
exiv_lng = (float_to_rational(lng_deg[0]), float_to_rational(lng_deg[1]), float_to_rational(lng_deg[2]))
gps_ifd = {
piexif.GPSIFD.GPSVersionID: (2, 0, 0, 0),
piexif.GPSIFD.GPSDateStamp: elapsed_time.strftime('%Y:%m:%d')
}
if lat is not None and lng is not None:
gps_ifd[piexif.GPSIFD.GPSLatitudeRef] = lat_deg[3]
gps_ifd[piexif.GPSIFD.GPSLatitude] = exiv_lat
gps_ifd[piexif.GPSIFD.GPSLongitudeRef] = lng_deg[3]
gps_ifd[piexif.GPSIFD.GPSLongitude] = exiv_lng
if altitude is not None:
gps_ifd[piexif.GPSIFD.GPSAltitudeRef] = 0
gps_ifd[piexif.GPSIFD.GPSAltitude] = float_to_rational(round(altitude))
return gps_ifd