kopia lustrzana https://github.com/OpenDroneMap/ODM
351 wiersze
14 KiB
Python
351 wiersze
14 KiB
Python
import datetime
|
|
from fractions import Fraction
|
|
import io
|
|
from math import ceil, floor
|
|
import time
|
|
import cv2
|
|
import os
|
|
import collections
|
|
from PIL import Image
|
|
import numpy as np
|
|
import piexif
|
|
from opendm import log
|
|
from opendm.video.srtparser import SrtFileParser
|
|
from opendm.video.parameters import Parameters
|
|
from opendm.video.checkers import BlackFrameChecker, SimilarityChecker, ThresholdBlurChecker
|
|
|
|
class Video2Dataset:
|
|
|
|
def __init__(self, parameters : Parameters):
|
|
self.parameters = parameters
|
|
|
|
self.blur_checker = ThresholdBlurChecker(parameters.blur_threshold) if parameters.blur_threshold is not None else None
|
|
self.similarity_checker = SimilarityChecker(parameters.distance_threshold) if parameters.distance_threshold is not None else None
|
|
self.black_checker = BlackFrameChecker(parameters.black_ratio_threshold, parameters.pixel_black_threshold) if parameters.black_ratio_threshold is not None or parameters.pixel_black_threshold is not None else None
|
|
|
|
self.frame_index = parameters.start
|
|
self.f = None
|
|
|
|
|
|
def ProcessVideo(self):
|
|
self.date_now = None
|
|
start = time.time()
|
|
|
|
if (self.parameters.stats_file is not None):
|
|
self.f = open(self.parameters.stats_file, "w")
|
|
self.f.write("global_idx;file_name;frame_index;blur_score;is_blurry;is_black;last_frame_index;similarity_score;is_similar;written\n")
|
|
|
|
self.global_idx = 0
|
|
|
|
output_file_paths = []
|
|
|
|
# foreach input file
|
|
for input_file in self.parameters.input:
|
|
# get file name
|
|
file_name = os.path.basename(input_file)
|
|
log.ODM_INFO("Processing video: {}".format(input_file))
|
|
|
|
# get video info
|
|
video_info = get_video_info(input_file)
|
|
log.ODM_INFO(video_info)
|
|
|
|
# Set pseudo start time
|
|
if self.date_now is None:
|
|
try:
|
|
self.date_now = datetime.datetime.fromtimestamp(os.path.getmtime(input_file))
|
|
except:
|
|
self.date_now = datetime.datetime.now()
|
|
else:
|
|
self.date_now += datetime.timedelta(seconds=video_info.total_frames / video_info.frame_rate)
|
|
|
|
log.ODM_INFO("Use pseudo start time: %s" % self.date_now)
|
|
|
|
if self.parameters.use_srt:
|
|
|
|
name = os.path.splitext(input_file)[0]
|
|
|
|
srt_files = [name + ".srt", name + ".SRT"]
|
|
srt_parser = None
|
|
|
|
for srt_file in srt_files:
|
|
if os.path.exists(srt_file):
|
|
log.ODM_INFO("Loading SRT file: {}".format(srt_file))
|
|
try:
|
|
srt_parser = SrtFileParser(srt_file)
|
|
srt_parser.parse()
|
|
break
|
|
except Exception as e:
|
|
log.ODM_INFO("Error parsing SRT file: {}".format(e))
|
|
srt_parser = None
|
|
else:
|
|
srt_parser = None
|
|
|
|
if (self.black_checker is not None and self.black_checker.NeedPreProcess()):
|
|
start2 = time.time()
|
|
log.ODM_INFO("Preprocessing for black frame checker... this might take a bit")
|
|
self.black_checker.PreProcess(input_file, self.parameters.start, self.parameters.end)
|
|
end = time.time()
|
|
log.ODM_INFO("Preprocessing time: {:.2f}s".format(end - start2))
|
|
log.ODM_INFO("Calculated luminance_range_size is {}".format(self.black_checker.luminance_range_size))
|
|
log.ODM_INFO("Calculated luminance_minimum_value is {}".format(self.black_checker.luminance_minimum_value))
|
|
log.ODM_INFO("Calculated absolute_threshold is {}".format(self.black_checker.absolute_threshold))
|
|
|
|
# open video file
|
|
cap = cv2.VideoCapture(input_file)
|
|
if (not cap.isOpened()):
|
|
log.ODM_INFO("Error opening video stream or file")
|
|
return
|
|
|
|
if (self.parameters.start is not None):
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, self.parameters.start)
|
|
self.frame_index = self.parameters.start
|
|
start_frame = self.parameters.start
|
|
else:
|
|
start_frame = 0
|
|
|
|
frames_to_process = self.parameters.end - start_frame + 1 if (self.parameters.end is not None) else video_info.total_frames - start_frame
|
|
|
|
progress = 0
|
|
while (cap.isOpened()):
|
|
ret, frame = cap.read()
|
|
|
|
if not ret:
|
|
break
|
|
|
|
if (self.parameters.end is not None and self.frame_index > self.parameters.end):
|
|
break
|
|
|
|
# Calculate progress percentage
|
|
prev_progress = progress
|
|
progress = floor((self.frame_index - start_frame + 1) / frames_to_process * 100)
|
|
if progress != prev_progress:
|
|
print("[{}][{:3d}%] Processing frame {}/{}: ".format(file_name, progress, self.frame_index - start_frame + 1, frames_to_process), end="\r")
|
|
|
|
stats = self.ProcessFrame(frame, video_info, srt_parser)
|
|
|
|
if stats is not None and self.parameters.stats_file is not None:
|
|
self.WriteStats(input_file, stats)
|
|
|
|
# Add element to array
|
|
if stats is not None and "written" in stats.keys():
|
|
output_file_paths.append(stats["path"])
|
|
|
|
cap.release()
|
|
|
|
if self.f is not None:
|
|
self.f.close()
|
|
|
|
if self.parameters.limit is not None and self.parameters.limit > 0 and self.global_idx >= self.parameters.limit:
|
|
log.ODM_INFO("Limit of {} frames reached, trimming dataset".format(self.parameters.limit))
|
|
output_file_paths = limit_files(output_file_paths, self.parameters.limit)
|
|
|
|
end = time.time()
|
|
log.ODM_INFO("Total processing time: {:.2f}s".format(end - start))
|
|
return output_file_paths
|
|
|
|
|
|
def ProcessFrame(self, frame, video_info, srt_parser):
|
|
|
|
res = {"frame_index": self.frame_index, "global_idx": self.global_idx}
|
|
|
|
frame_bw = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
|
h, w = frame_bw.shape
|
|
resolution = self.parameters.internal_resolution
|
|
if resolution < w or resolution < h:
|
|
m = max(w, h)
|
|
factor = resolution / m
|
|
frame_bw = cv2.resize(frame_bw, (int(ceil(w * factor)), int(ceil(h * factor))), interpolation=cv2.INTER_NEAREST)
|
|
|
|
if (self.blur_checker is not None):
|
|
blur_score, is_blurry = self.blur_checker.IsBlur(frame_bw, self.frame_index)
|
|
res["blur_score"] = blur_score
|
|
res["is_blurry"] = is_blurry
|
|
|
|
if is_blurry:
|
|
# print ("blurry, skipping")
|
|
self.frame_index += 1
|
|
return res
|
|
|
|
if (self.black_checker is not None):
|
|
is_black = self.black_checker.IsBlack(frame_bw, self.frame_index)
|
|
res["is_black"] = is_black
|
|
|
|
if is_black:
|
|
# print ("black, skipping")
|
|
self.frame_index += 1
|
|
return res
|
|
|
|
if (self.similarity_checker is not None):
|
|
similarity_score, is_similar, last_frame_index = self.similarity_checker.IsSimilar(frame_bw, self.frame_index)
|
|
res["similarity_score"] = similarity_score
|
|
res["is_similar"] = is_similar
|
|
res["last_frame_index"] = last_frame_index
|
|
|
|
if is_similar:
|
|
# print ("similar to {}, skipping".format(self.similarity_checker.last_image_id))
|
|
self.frame_index += 1
|
|
return res
|
|
|
|
path = self.SaveFrame(frame, video_info, srt_parser)
|
|
res["written"] = True
|
|
res["path"] = path
|
|
self.frame_index += 1
|
|
self.global_idx += 1
|
|
|
|
return res
|
|
|
|
def SaveFrame(self, frame, video_info, srt_parser: SrtFileParser):
|
|
max_dim = self.parameters.max_dimension
|
|
if max_dim is not None:
|
|
h, w, _ = frame.shape
|
|
if max_dim < w or max_dim < h:
|
|
m = max(w, h)
|
|
factor = max_dim / m
|
|
frame = cv2.resize(frame, (int(ceil(w * factor)), int(ceil(h * factor))), interpolation=cv2.INTER_AREA)
|
|
|
|
path = os.path.join(self.parameters.output,
|
|
"{}_{}_{}.{}".format(video_info.basename, self.global_idx, self.frame_index, self.parameters.frame_format))
|
|
|
|
_, buf = cv2.imencode('.' + self.parameters.frame_format, frame)
|
|
|
|
delta = datetime.timedelta(seconds=(self.frame_index / video_info.frame_rate))
|
|
elapsed_time = datetime.datetime(1900, 1, 1) + delta
|
|
|
|
img = Image.open(io.BytesIO(buf))
|
|
|
|
entry = gps_coords = None
|
|
if srt_parser is not None:
|
|
entry = srt_parser.get_entry(elapsed_time)
|
|
gps_coords = srt_parser.get_gps(elapsed_time)
|
|
|
|
exif_time = (elapsed_time + (self.date_now - datetime.datetime(1900, 1, 1)))
|
|
elapsed_time_str = exif_time.strftime("%Y:%m:%d %H:%M:%S")
|
|
subsec_time_str = exif_time.strftime("%f")
|
|
|
|
# Exif dict contains the following keys: '0th', 'Exif', 'GPS', '1st', 'thumbnail'
|
|
# Set the EXIF metadata
|
|
exif_dict = {
|
|
"0th": {
|
|
piexif.ImageIFD.Software: "ODM",
|
|
piexif.ImageIFD.DateTime: elapsed_time_str,
|
|
piexif.ImageIFD.XResolution: (frame.shape[1], 1),
|
|
piexif.ImageIFD.YResolution: (frame.shape[0], 1),
|
|
piexif.ImageIFD.Make: "DJI" if video_info.basename.lower().startswith("dji") else "Unknown",
|
|
piexif.ImageIFD.Model: "Unknown"
|
|
},
|
|
"Exif": {
|
|
piexif.ExifIFD.DateTimeOriginal: elapsed_time_str,
|
|
piexif.ExifIFD.DateTimeDigitized: elapsed_time_str,
|
|
piexif.ExifIFD.SubSecTime: subsec_time_str,
|
|
piexif.ExifIFD.PixelXDimension: frame.shape[1],
|
|
piexif.ExifIFD.PixelYDimension: frame.shape[0],
|
|
}}
|
|
|
|
if entry is not None:
|
|
if entry["shutter"] is not None:
|
|
exif_dict["Exif"][piexif.ExifIFD.ExposureTime] = (1, int(entry["shutter"]))
|
|
if entry["focal_len"] is not None:
|
|
exif_dict["Exif"][piexif.ExifIFD.FocalLength] = (entry["focal_len"], 100)
|
|
if entry["fnum"] is not None:
|
|
exif_dict["Exif"][piexif.ExifIFD.FNumber] = float_to_rational(entry["fnum"])
|
|
if entry["iso"] is not None:
|
|
exif_dict["Exif"][piexif.ExifIFD.ISOSpeedRatings] = entry["iso"]
|
|
|
|
if gps_coords is not None:
|
|
exif_dict["GPS"] = get_gps_location(elapsed_time, gps_coords[1], gps_coords[0], gps_coords[2])
|
|
|
|
exif_bytes = piexif.dump(exif_dict)
|
|
img.save(path, exif=exif_bytes, quality=95)
|
|
|
|
return path
|
|
|
|
|
|
def WriteStats(self, input_file, stats):
|
|
self.f.write("{};{};{};{};{};{};{};{};{};{}\n".format(
|
|
stats["global_idx"],
|
|
input_file,
|
|
stats["frame_index"],
|
|
stats["blur_score"] if "blur_score" in stats else "",
|
|
stats["is_blurry"] if "is_blurry" in stats else "",
|
|
stats["is_black"] if "is_black" in stats else "",
|
|
stats["last_frame_index"] if "last_frame_index" in stats else "",
|
|
stats["similarity_score"] if "similarity_score" in stats else "",
|
|
stats["is_similar"] if "is_similar" in stats else "",
|
|
stats["written"] if "written" in stats else "").replace(".", ","))
|
|
|
|
|
|
def get_video_info(input_file):
|
|
|
|
video = cv2.VideoCapture(input_file)
|
|
basename = os.path.splitext(os.path.basename(input_file))[0]
|
|
|
|
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
frame_rate = video.get(cv2.CAP_PROP_FPS)
|
|
|
|
video.release()
|
|
|
|
return collections.namedtuple("VideoInfo", ["total_frames", "frame_rate", "basename"])(total_frames, frame_rate, basename)
|
|
|
|
def float_to_rational(f):
|
|
f = Fraction(f).limit_denominator()
|
|
return (f.numerator, f.denominator)
|
|
|
|
def limit_files(paths, limit):
|
|
if len(paths) <= limit:
|
|
return paths
|
|
|
|
to_keep = []
|
|
all_idxes = np.arange(0, len(paths))
|
|
keep_idxes = np.linspace(0, len(paths) - 1, limit, dtype=int)
|
|
remove_idxes = set(all_idxes) - set(keep_idxes)
|
|
|
|
p = np.array(paths)
|
|
to_keep = list(p[keep_idxes])
|
|
|
|
for idx in remove_idxes:
|
|
os.remove(paths[idx])
|
|
|
|
return to_keep
|
|
|
|
def to_deg(value, loc):
|
|
"""convert decimal coordinates into degrees, munutes and seconds tuple
|
|
Keyword arguments: value is float gps-value, loc is direction list ["S", "N"] or ["W", "E"]
|
|
return: tuple like (25, 13, 48.343 ,'N')
|
|
"""
|
|
if value < 0:
|
|
loc_value = loc[0]
|
|
elif value > 0:
|
|
loc_value = loc[1]
|
|
else:
|
|
loc_value = ""
|
|
abs_value = abs(value)
|
|
deg = int(abs_value)
|
|
t1 = (abs_value-deg)*60
|
|
min = int(t1)
|
|
sec = round((t1 - min)* 60, 5)
|
|
return (deg, min, sec, loc_value)
|
|
|
|
def get_gps_location(elapsed_time, lat, lng, altitude):
|
|
|
|
lat_deg = to_deg(lat, ["S", "N"])
|
|
lng_deg = to_deg(lng, ["W", "E"])
|
|
|
|
exiv_lat = (float_to_rational(lat_deg[0]), float_to_rational(lat_deg[1]), float_to_rational(lat_deg[2]))
|
|
exiv_lng = (float_to_rational(lng_deg[0]), float_to_rational(lng_deg[1]), float_to_rational(lng_deg[2]))
|
|
|
|
gps_ifd = {
|
|
piexif.GPSIFD.GPSVersionID: (2, 0, 0, 0),
|
|
piexif.GPSIFD.GPSDateStamp: elapsed_time.strftime('%Y:%m:%d')
|
|
}
|
|
|
|
if lat is not None and lng is not None:
|
|
gps_ifd[piexif.GPSIFD.GPSLatitudeRef] = lat_deg[3]
|
|
gps_ifd[piexif.GPSIFD.GPSLatitude] = exiv_lat
|
|
gps_ifd[piexif.GPSIFD.GPSLongitudeRef] = lng_deg[3]
|
|
gps_ifd[piexif.GPSIFD.GPSLongitude] = exiv_lng
|
|
if altitude is not None:
|
|
gps_ifd[piexif.GPSIFD.GPSAltitudeRef] = 0
|
|
gps_ifd[piexif.GPSIFD.GPSAltitude] = float_to_rational(round(altitude))
|
|
|
|
return gps_ifd |