kopia lustrzana https://github.com/bellingcat/auto-archiver
Unit tests for storage types + fix storage too long issues for local storage
rodzic
4c21795d5f
commit
e89a8da3b4
|
@ -6,7 +6,7 @@ nested media retrieval, and type validation.
|
|||
from __future__ import annotations
|
||||
import os
|
||||
import traceback
|
||||
from typing import Any, List
|
||||
from typing import Any, List, Iterator
|
||||
from dataclasses import dataclass, field
|
||||
from dataclasses_json import dataclass_json, config
|
||||
import mimetypes
|
||||
|
@ -47,7 +47,7 @@ class Media:
|
|||
for any_media in self.all_inner_media(include_self=True):
|
||||
s.store(any_media, url, metadata=metadata)
|
||||
|
||||
def all_inner_media(self, include_self=False):
|
||||
def all_inner_media(self, include_self=False) -> Iterator[Media]:
|
||||
"""Retrieves all media, including nested media within properties or transformations on original media.
|
||||
This function returns a generator for all the inner media.
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ class Storage(BaseModule):
|
|||
if media.is_stored(in_storage=self):
|
||||
logger.debug(f"{media.key} already stored, skipping")
|
||||
return
|
||||
|
||||
self.set_key(media, url, metadata)
|
||||
self.upload(media, metadata=metadata)
|
||||
media.add_url(self.get_cdn_url(media))
|
||||
|
@ -50,34 +51,55 @@ class Storage(BaseModule):
|
|||
with open(media.filename, 'rb') as f:
|
||||
return self.uploadf(f, media, **kwargs)
|
||||
|
||||
def set_key(self, media: Media, url, metadata: Metadata) -> None:
|
||||
def set_key(self, media: Media, url: str, metadata: Metadata) -> None:
|
||||
"""takes the media and optionally item info and generates a key"""
|
||||
if media.key is not None and len(media.key) > 0: return
|
||||
folder = metadata.get_context('folder', '')
|
||||
filename, ext = os.path.splitext(media.filename)
|
||||
|
||||
# Handle path_generator logic
|
||||
path_generator = self.config.get("path_generator", "url")
|
||||
path_generator = self.path_generator
|
||||
if path_generator == "flat":
|
||||
path = ""
|
||||
# TODO: this is never used
|
||||
filename = slugify(filename) # Ensure filename is slugified
|
||||
elif path_generator == "url":
|
||||
path = slugify(url)
|
||||
elif path_generator == "random":
|
||||
path = self.config.get("random_path", random_str(24), True)
|
||||
path = random_str(24)
|
||||
else:
|
||||
raise ValueError(f"Invalid path_generator: {path_generator}")
|
||||
|
||||
# Handle filename_generator logic
|
||||
filename_generator = self.config.get("filename_generator", "random")
|
||||
filename_generator = self.filename_generator
|
||||
if filename_generator == "random":
|
||||
filename = random_str(24)
|
||||
elif filename_generator == "static":
|
||||
# load the hash_enricher module
|
||||
he = self.module_factory.get_module(HashEnricher, self.config)
|
||||
he = self.module_factory.get_module("hash_enricher", self.config)
|
||||
hd = he.calculate_hash(media.filename)
|
||||
filename = hd[:24]
|
||||
else:
|
||||
raise ValueError(f"Invalid filename_generator: {filename_generator}")
|
||||
|
||||
media.key = os.path.join(folder, path, f"{filename}{ext}")
|
||||
key = os.path.join(folder, path, f"{filename}{ext}")
|
||||
if len(key) > self.max_file_length():
|
||||
# truncate the path
|
||||
max_path_length = self.max_file_length() - len(filename) - len(ext) - len(folder) - 1
|
||||
path = path[:max_path_length]
|
||||
logger.warning(f'Filename too long ({len(key)} characters), truncating to {self.max_file_length()} characters')
|
||||
key = os.path.join(folder, path, f"{filename}{ext}")
|
||||
|
||||
|
||||
media.key = key
|
||||
|
||||
|
||||
def max_file_length(self) -> int:
|
||||
"""
|
||||
Returns the maximum length of a file name that can be stored in the storage service.
|
||||
|
||||
Files are truncated if they exceed this length.
|
||||
Override this method in subclasses if the storage service has a different maximum file length.
|
||||
"""
|
||||
return 255 # safe max file length for most filesystems (macOS, Windows, Linux)
|
||||
|
||||
|
|
|
@ -10,6 +10,8 @@ from auto_archiver.core import Storage
|
|||
|
||||
class LocalStorage(Storage):
|
||||
|
||||
MAX_FILE_LENGTH = 255
|
||||
|
||||
def get_cdn_url(self, media: Media) -> str:
|
||||
# TODO: is this viable with Storage.configs on path/filename?
|
||||
dest = os.path.join(self.save_to, media.key)
|
||||
|
@ -20,11 +22,31 @@ class LocalStorage(Storage):
|
|||
def upload(self, media: Media, **kwargs) -> bool:
|
||||
# override parent so that we can use shutil.copy2 and keep metadata
|
||||
dest = os.path.join(self.save_to, media.key)
|
||||
|
||||
if len(dest) > self.max_file_length():
|
||||
old_dest_length = len(dest)
|
||||
filename, ext = os.path.splitext(media.key)
|
||||
dir, filename = os.path.split(filename)
|
||||
# see whether we should truncate filename or dir
|
||||
if len(dir) > len(filename):
|
||||
dir = dir[:self.MAX_FILE_LENGTH - len(self.save_to) - len(ext) - len(filename) - 1]
|
||||
else:
|
||||
filename = filename[:self.MAX_FILE_LENGTH - len(self.save_to) - len(ext) - len(filename) - 1]
|
||||
|
||||
# override media.key
|
||||
media.key = os.path.join(dir, f"{filename}{ext}")
|
||||
dest = os.path.join(self.save_to, dir, f"{filename}{ext}")
|
||||
logger.warning(f'Filename too long ({old_dest_length} characters), truncating to {len(dest)} characters')
|
||||
|
||||
os.makedirs(os.path.dirname(dest), exist_ok=True)
|
||||
logger.debug(f'[{self.__class__.__name__}] storing file {media.filename} with key {media.key} to {dest}')
|
||||
|
||||
res = shutil.copy2(media.filename, dest)
|
||||
logger.info(res)
|
||||
return True
|
||||
|
||||
# must be implemented even if unused
|
||||
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass
|
||||
|
||||
def max_file_length(self):
|
||||
return self.MAX_FILE_LENGTH
|
|
@ -67,4 +67,8 @@ class S3Storage(Storage):
|
|||
if 'Contents' in resp:
|
||||
return resp['Contents'][0]['Key']
|
||||
return False
|
||||
|
||||
def max_file_length(self):
|
||||
# Amazon AWS max file length is 1024, but we will use 1000 to be safe
|
||||
return 1000
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ from auto_archiver.modules.local_storage import LocalStorage
|
|||
@pytest.fixture
|
||||
def local_storage(setup_module, tmp_path) -> LocalStorage:
|
||||
save_to = tmp_path / "local_archive"
|
||||
save_to.mkdir()
|
||||
configs: dict = {
|
||||
"path_generator": "flat",
|
||||
"filename_generator": "static",
|
||||
|
@ -19,7 +20,6 @@ def local_storage(setup_module, tmp_path) -> LocalStorage:
|
|||
}
|
||||
return setup_module("local_storage", configs)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_media(tmp_path) -> Media:
|
||||
"""Fixture creating a Media object with temporary source file"""
|
||||
|
@ -27,6 +27,13 @@ def sample_media(tmp_path) -> Media:
|
|||
src_file.write_text("test content")
|
||||
return Media(key="subdir/test.txt", filename=str(src_file))
|
||||
|
||||
def test_really_long_website_url_save(local_storage, tmp_path):
|
||||
long_filename = os.path.join(local_storage.save_to, "file"*100 + ".txt")
|
||||
src_file = tmp_path / "source.txt"
|
||||
src_file.write_text("test content")
|
||||
media = Media(key=long_filename, filename=str(src_file))
|
||||
assert local_storage.upload(media) is True
|
||||
assert src_file.read_text() == Path(local_storage.get_cdn_url(media)).read_text()
|
||||
|
||||
def test_get_cdn_url_relative(local_storage):
|
||||
media = Media(key="test.txt", filename="dummy.txt")
|
||||
|
|
|
@ -2,9 +2,9 @@ from typing import Type
|
|||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
from auto_archiver.core.metadata import Metadata, Media
|
||||
from auto_archiver.core.storage import Storage
|
||||
|
||||
from auto_archiver.core.module import ModuleFactory
|
||||
|
||||
class TestStorageBase(object):
|
||||
|
||||
|
@ -20,3 +20,79 @@ class TestStorageBase(object):
|
|||
self.storage: Type[Storage] = setup_module(
|
||||
self.module_name, self.config
|
||||
)
|
||||
|
||||
|
||||
class TestBaseStorage(Storage):
|
||||
|
||||
name = "test_storage"
|
||||
|
||||
def get_cdn_url(self, media):
|
||||
return "cdn_url"
|
||||
|
||||
def uploadf(self, file, key, **kwargs):
|
||||
return True
|
||||
|
||||
@pytest.fixture
|
||||
def storage_base():
|
||||
def _storage_base(config):
|
||||
storage_base = TestBaseStorage()
|
||||
storage_base.config_setup({TestBaseStorage.name : config})
|
||||
storage_base.module_factory = ModuleFactory()
|
||||
return storage_base
|
||||
|
||||
return _storage_base
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"path_generator, filename_generator, url, expected_key",
|
||||
[
|
||||
("flat", "static", "https://example.com/file/", "folder/6ae8a75555209fd6c44157c0.txt"),
|
||||
("flat", "random", "https://example.com/file/", "folder/pretend-random.txt"),
|
||||
("url", "static", "https://example.com/file/", "folder/https-example-com-file/6ae8a75555209fd6c44157c0.txt"),
|
||||
("url", "random", "https://example.com/file/", "folder/https-example-com-file/pretend-random.txt"),
|
||||
("random", "static", "https://example.com/file/", "folder/pretend-random/6ae8a75555209fd6c44157c0.txt"),
|
||||
("random", "random", "https://example.com/file/", "folder/pretend-random/pretend-random.txt"),
|
||||
|
||||
],
|
||||
)
|
||||
def test_storage_setup(storage_base, path_generator, filename_generator, url, expected_key, mocker):
|
||||
mock_random = mocker.patch("auto_archiver.core.storage.random_str")
|
||||
mock_random.return_value = "pretend-random"
|
||||
|
||||
# create dummy.txt file
|
||||
with open("dummy.txt", "w") as f:
|
||||
f.write("test content")
|
||||
|
||||
config: dict = {
|
||||
"path_generator": path_generator,
|
||||
"filename_generator": filename_generator,
|
||||
}
|
||||
storage: Storage = storage_base(config)
|
||||
assert storage.path_generator == path_generator
|
||||
assert storage.filename_generator == filename_generator
|
||||
|
||||
metadata = Metadata()
|
||||
metadata.set_context("folder", "folder")
|
||||
media = Media(filename="dummy.txt")
|
||||
storage.set_key(media, url, metadata)
|
||||
print(media.key)
|
||||
assert media.key == expected_key
|
||||
|
||||
|
||||
def test_really_long_name(storage_base):
|
||||
config: dict = {
|
||||
"path_generator": "url",
|
||||
"filename_generator": "static",
|
||||
}
|
||||
storage: Storage = storage_base(config)
|
||||
|
||||
# create dummy.txt file
|
||||
with open("dummy.txt", "w") as f:
|
||||
f.write("test content")
|
||||
|
||||
url = f"https://example.com/{'file'*100}"
|
||||
media = Media(filename="dummy.txt")
|
||||
storage.set_key(media, url, Metadata())
|
||||
assert len(media.key) <= storage.max_file_length()
|
||||
assert media.key == "https-example-com-filefilefilefilefilefilefilefilefilefilefilefile\
|
||||
filefilefilefilefilefilefilefilefilefilefilefilefilefilefilefilefilefilefilefilefilefilefile\
|
||||
filefilefilefilefilefilefilefilefilefilefilefilefilefilefilefilefile/6ae8a75555209fd6c44157c0.txt"
|
Ładowanie…
Reference in New Issue