kopia lustrzana https://github.com/bellingcat/auto-archiver
109 wiersze
4.1 KiB
Python
109 wiersze
4.1 KiB
Python
|
|
from __future__ import annotations
|
|
import os
|
|
import traceback
|
|
from typing import Any, List
|
|
from dataclasses import dataclass, field
|
|
from dataclasses_json import dataclass_json, config
|
|
import mimetypes
|
|
|
|
import ffmpeg
|
|
from ffmpeg._run import Error
|
|
|
|
from .context import ArchivingContext
|
|
|
|
from loguru import logger
|
|
|
|
|
|
@dataclass_json # annotation order matters
|
|
@dataclass
|
|
class Media:
|
|
filename: str
|
|
key: str = None
|
|
urls: List[str] = field(default_factory=list)
|
|
properties: dict = field(default_factory=dict)
|
|
_mimetype: str = None # eg: image/jpeg
|
|
_stored: bool = field(default=False, repr=False, metadata=config(exclude=lambda _: True)) # always exclude
|
|
|
|
def store(self: Media, override_storages: List = None, url: str = "url-not-available", metadata: Any = None):
|
|
# 'Any' typing for metadata to avoid circular imports. Stores the media
|
|
# into the provided/available storages [Storage] repeats the process for
|
|
# its properties, in case they have inner media themselves for now it
|
|
# only goes down 1 level but it's easy to make it recursive if needed.
|
|
storages = override_storages or ArchivingContext.get("storages")
|
|
if not len(storages):
|
|
logger.warning(f"No storages found in local context or provided directly for {self.filename}.")
|
|
return
|
|
|
|
for s in storages:
|
|
for any_media in self.all_inner_media(include_self=True):
|
|
s.store(any_media, url, metadata=metadata)
|
|
|
|
def all_inner_media(self, include_self=False):
|
|
""" Media can be inside media properties, examples include transformations on original media.
|
|
This function returns a generator for all the inner media.
|
|
"""
|
|
if include_self: yield self
|
|
for prop in self.properties.values():
|
|
if isinstance(prop, Media):
|
|
for inner_media in prop.all_inner_media(include_self=True):
|
|
yield inner_media
|
|
if isinstance(prop, list):
|
|
for prop_media in prop:
|
|
if isinstance(prop_media, Media):
|
|
for inner_media in prop_media.all_inner_media(include_self=True):
|
|
yield inner_media
|
|
|
|
def is_stored(self) -> bool:
|
|
return len(self.urls) > 0 and len(self.urls) == len(ArchivingContext.get("storages"))
|
|
|
|
def set(self, key: str, value: Any) -> Media:
|
|
self.properties[key] = value
|
|
return self
|
|
|
|
def get(self, key: str, default: Any = None) -> Any:
|
|
return self.properties.get(key, default)
|
|
|
|
def add_url(self, url: str) -> None:
|
|
# url can be remote, local, ...
|
|
self.urls.append(url)
|
|
|
|
@property # getter .mimetype
|
|
def mimetype(self) -> str:
|
|
if not self.filename or len(self.filename) == 0:
|
|
logger.warning(f"cannot get mimetype from media without filename: {self}")
|
|
return ""
|
|
if not self._mimetype:
|
|
self._mimetype = mimetypes.guess_type(self.filename)[0]
|
|
return self._mimetype or ""
|
|
|
|
@mimetype.setter # setter .mimetype
|
|
def mimetype(self, v: str) -> None:
|
|
self._mimetype = v
|
|
|
|
def is_video(self) -> bool:
|
|
return self.mimetype.startswith("video")
|
|
|
|
def is_audio(self) -> bool:
|
|
return self.mimetype.startswith("audio")
|
|
|
|
def is_image(self) -> bool:
|
|
return self.mimetype.startswith("image")
|
|
|
|
def is_valid_video(self) -> bool:
|
|
# checks for video streams with ffmpeg, or min file size for a video
|
|
# self.is_video() should be used together with this method
|
|
try:
|
|
streams = ffmpeg.probe(self.filename, select_streams='v')['streams']
|
|
logger.warning(f"STREAMS FOR {self.filename} {streams}")
|
|
return any(s.get("duration_ts", 0) > 0 for s in streams)
|
|
except Error: return False # ffmpeg errors when reading bad files
|
|
except Exception as e:
|
|
logger.error(e)
|
|
logger.error(traceback.format_exc())
|
|
try:
|
|
fsize = os.path.getsize(self.filename)
|
|
return fsize > 20_000
|
|
except: pass
|
|
return True
|