Context related fixes, some more tests.

pull/189/head
erinhmclark 2025-02-06 16:53:00 +00:00
rodzic 67504a683e
commit 266c7a14e6
9 zmienionych plików z 370 dodań i 227 usunięć

Wyświetl plik

@ -44,14 +44,14 @@ class GsheetsFeeder(Feeder):
logger.info(f'Opening worksheet {ii=}: {worksheet.title=} header={self.header}')
gw = GWorksheet(worksheet, header_row=self.header, columns=self.columns)
if len(missing_cols := self.missing_required_columns(gw)):
logger.warning(f"SKIPPED worksheet '{wks.title}' due to missing required column(s) for {missing_cols}")
logger.warning(f"SKIPPED worksheet '{worksheet.title}' due to missing required column(s) for {missing_cols}")
continue
# process and yield metadata here:
yield from self._process_rows(gw)
logger.success(f'Finished worksheet {worksheet.title}')
def _process_rows(self, gw: GWorksheet) -> Metadata:
def _process_rows(self, gw: GWorksheet):
for row in range(1 + self.header, gw.count_rows() + 1):
url = gw.get_cell(row, 'url').strip()
if not len(url): continue

Wyświetl plik

@ -3,7 +3,7 @@
"type": ["storage"],
"requires_setup": True,
"dependencies": {
"python": ["boto3", "loguru"],
"python": ["hash_enricher", "boto3", "loguru"],
},
"configs": {
"path_generator": {
@ -49,5 +49,6 @@
- Requires S3 credentials (API key and secret) and a bucket name to function.
- The `random_no_duplicate` option ensures no duplicate uploads by leveraging hash-based folder structures.
- Uses `boto3` for interaction with the S3 API.
- Depends on the `HashEnricher` module for hash calculation.
"""
}

Wyświetl plik

@ -9,10 +9,11 @@ from auto_archiver.core import Media
from auto_archiver.core import Storage
from auto_archiver.modules.hash_enricher import HashEnricher
from auto_archiver.utils.misc import random_str
from auto_archiver.core.module import get_module
NO_DUPLICATES_FOLDER = "no-dups/"
class S3Storage(Storage, HashEnricher):
class S3Storage(Storage):
def setup(self, config: dict) -> None:
super().setup(config)
@ -49,7 +50,8 @@ class S3Storage(Storage, HashEnricher):
def is_upload_needed(self, media: Media) -> bool:
if self.random_no_duplicate:
# checks if a folder with the hash already exists, if so it skips the upload
hd = self.calculate_hash(media.filename)
he = get_module('hash_enricher', self.config)
hd = he.calculate_hash(media.filename)
path = os.path.join(NO_DUPLICATES_FOLDER, hd[:24])
if existing_key:=self.file_in_folder(path):

Wyświetl plik

@ -1,53 +0,0 @@
import json, gspread
from ..core import BaseModule
class Gsheets(BaseModule):
name = "gsheets"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
self.gsheets_client = gspread.service_account(filename=self.service_account)
# TODO: config should be responsible for conversions
try: self.header = int(self.header)
except: pass
assert type(self.header) == int, f"header ({self.header}) value must be an integer not {type(self.header)}"
assert self.sheet is not None or self.sheet_id is not None, "You need to define either a 'sheet' name or a 'sheet_id' in your orchestration file when using gsheets."
# TODO merge this into gsheets processors manifest
@staticmethod
def configs() -> dict:
return {
"sheet": {"default": None, "help": "name of the sheet to archive"},
"sheet_id": {"default": None, "help": "(alternative to sheet name) the id of the sheet to archive"},
"header": {"default": 1, "help": "index of the header row (starts at 1)"},
"service_account": {"default": "secrets/service_account.json", "help": "service account JSON file path"},
"columns": {
"default": {
'url': 'link',
'status': 'archive status',
'folder': 'destination folder',
'archive': 'archive location',
'date': 'archive date',
'thumbnail': 'thumbnail',
'timestamp': 'upload timestamp',
'title': 'upload title',
'text': 'text content',
'screenshot': 'screenshot',
'hash': 'hash',
'pdq_hash': 'perceptual hashes',
'wacz': 'wacz',
'replaywebpage': 'replaywebpage',
},
"help": "names of columns in the google sheet (stringified JSON object)",
"cli_set": lambda cli_val, cur_val: dict(cur_val, **json.loads(cli_val))
},
}
def open_sheet(self):
if self.sheet:
return self.gsheets_client.open(self.sheet)
else: # self.sheet_id
return self.gsheets_client.open_by_key(self.sheet_id)

Wyświetl plik

@ -0,0 +1,103 @@
import datetime
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import pytest
from auto_archiver.core import Metadata, Media
from auto_archiver.modules.meta_enricher import MetaEnricher
@pytest.fixture
def mock_metadata():
"""Creates a mock Metadata object."""
mock: Metadata = MagicMock(spec=Metadata)
mock.get_url.return_value = "https://example.com"
mock.is_empty.return_value = False # Default to not empty
mock.get_all_media.return_value = []
return mock
@pytest.fixture
def mock_media():
"""Creates a mock Media object."""
mock: Media = MagicMock(spec=Media)
mock.filename = "mock_file.txt"
return mock
@pytest.fixture
def metadata():
m = Metadata()
m.set_url("https://example.com")
m.set_title("Test Title")
m.set_content("Test Content")
return m
@pytest.fixture(autouse=True)
def meta_enricher(setup_module):
return setup_module(MetaEnricher, {})
def test_enrich_skips_empty_metadata(meta_enricher, mock_metadata):
"""Test that enrich() does nothing when Metadata is empty."""
mock_metadata.is_empty.return_value = True
meta_enricher.enrich(mock_metadata)
mock_metadata.get_url.assert_called_once()
def test_enrich_file_sizes(meta_enricher, metadata, tmp_path):
"""Test that enrich_file_sizes() calculates and sets file sizes correctly."""
file1 = tmp_path / "testfile_1.txt"
file2 = tmp_path / "testfile_2.txt"
file1.write_text("A" * 1000)
file2.write_text("B" * 2000)
metadata.add_media(Media(str(file1)))
metadata.add_media(Media(str(file2)))
meta_enricher.enrich_file_sizes(metadata)
# Verify individual media file sizes
media1 = metadata.get_all_media()[0]
media2 = metadata.get_all_media()[1]
assert media1.get("bytes") == 1000
assert media1.get("size") == "1000.0 bytes"
assert media2.get("bytes") == 2000
assert media2.get("size") == "2.0 KB"
assert metadata.get("total_bytes") == 3000
assert metadata.get("total_size") == "2.9 KB"
@pytest.mark.parametrize(
"size, expected",
[
(500, "500.0 bytes"),
(1024, "1.0 KB"),
(2048, "2.0 KB"),
(1048576, "1.0 MB"),
(1073741824, "1.0 GB"),
],
)
def test_human_readable_bytes(size, expected):
"""Test that human_readable_bytes() converts sizes correctly."""
enricher = MetaEnricher()
assert enricher.human_readable_bytes(size) == expected
def test_enrich_file_sizes_no_media(meta_enricher, metadata):
"""Test that enrich_file_sizes() handles empty media list gracefully."""
meta_enricher.enrich_file_sizes(metadata)
assert metadata.get("total_bytes") == 0
assert metadata.get("total_size") == "0.0 bytes"
def test_enrich_archive_duration(meta_enricher, metadata):
# Set fixed "processed at" time in the past
processed_at = datetime.now(timezone.utc) - timedelta(minutes=10, seconds=30)
metadata.set("_processed_at", processed_at)
# patch datetime
with patch("datetime.datetime") as mock_datetime:
mock_now = datetime.now(timezone.utc)
mock_datetime.now.return_value = mock_now
meta_enricher.enrich_archive_duration(metadata)
assert metadata.get("archive_duration_seconds") == 630

Wyświetl plik

@ -5,15 +5,16 @@ from unittest.mock import patch, MagicMock
import pytest
from auto_archiver.core import Metadata
from auto_archiver.core.extractor import Extractor
from auto_archiver.modules.instagram_tbot_extractor import InstagramTbotExtractor
from tests.extractors.test_extractor_base import TestExtractorBase
TESTFILES = os.path.join(os.path.dirname(__file__), "testfiles")
@pytest.fixture
def test_session_file(tmpdir):
def session_file(tmpdir):
"""Fixture to create a test session file."""
session_file = os.path.join(tmpdir, "test_session.session")
with open(session_file, "w") as f:
@ -21,27 +22,34 @@ def test_session_file(tmpdir):
return session_file.replace(".session", "")
@pytest.mark.incremental
class TestInstagramTbotExtractor(object):
"""
Test suite for InstagramTbotExtractor.
"""
@pytest.fixture(autouse=True)
def patch_extractor_methods(request, setup_module):
with patch.object(InstagramTbotExtractor, '_prepare_session_file', return_value=None), \
patch.object(InstagramTbotExtractor, '_initialize_telegram_client', return_value=None):
if hasattr(request, 'cls') and hasattr(request.cls, 'config'):
request.cls.extractor = setup_module("instagram_tbot_extractor", request.cls.config)
yield
@pytest.fixture
def metadata_sample():
m = Metadata()
m.set_title("Test Title")
m.set_timestamp("2021-01-01T00:00:00Z")
m.set_url("https://www.instagram.com/p/1234567890")
return m
class TestInstagramTbotExtractor:
extractor_module = "instagram_tbot_extractor"
extractor: InstagramTbotExtractor
config = {
"api_id": 12345,
"api_hash": "test_api_hash",
# "session_file"
"session_file": "test_session",
}
@pytest.fixture(autouse=True)
def setup_extractor(self, setup_module):
assert self.extractor_module is not None, "self.extractor_module must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"
extractor: Type[Extractor] = setup_module(self.extractor_module, self.config)
return extractor
@pytest.fixture
def mock_telegram_client(self):
"""Fixture to mock TelegramClient interactions."""
@ -50,22 +58,11 @@ class TestInstagramTbotExtractor(object):
mock_client.return_value = instance
yield instance
# @pytest.fixture
# def mock_session_file(self, temp_session_file):
# """Patch the extractors session file setup to use a temporary path."""
# with patch.object(InstagramTbotExtractor, "session_file", temp_session_file):
# with patch.object(InstagramTbotExtractor, "_prepare_session_file", return_value=None):
# yield # Mocks are applied for the duration of the test
@pytest.fixture
def metadata_sample(self):
"""Loads a Metadata object from a pickle file."""
with open(os.path.join(TESTFILES, "metadata_item.pkl"), "rb") as f:
return pickle.load(f)
def test_extractor_is_initialized(self):
assert self.extractor is not None
@pytest.mark.download
@patch("time.sleep")
@pytest.mark.parametrize("url, expected_status, bot_responses", [
("https://www.instagram.com/p/C4QgLbrIKXG", "insta-via-bot: success", [MagicMock(id=101, media=None, message="Are you new to Bellingcat? - The way we share our investigations is different. 💭\nWe want you to read our story but also learn ou")]),
("https://www.instagram.com/reel/DEVLK8qoIbg/", "insta-via-bot: success", [MagicMock(id=101, media=None, message="Our volunteer community is at the centre of many incredible Bellingcat investigations and tools. Stephanie Ladel is one such vol")]),
@ -74,32 +71,19 @@ class TestInstagramTbotExtractor(object):
("https://www.youtube.com/watch?v=ymCMy8OffHM", False, []),
("https://www.instagram.com/p/INVALID", False, [MagicMock(id=101, media=None, message="You must enter a URL to a post")]),
])
def test_download(self, url, expected_status, bot_responses, metadata_sample):
def test_download(self, mock_sleep, url, expected_status, bot_responses, metadata_sample):
"""Test the `download()` method with various Instagram URLs."""
metadata_sample.set_url(url)
self.extractor.initialise()
self.extractor.client = MagicMock()
result = self.extractor.download(metadata_sample)
if expected_status:
assert result.is_success()
assert result.status == expected_status
assert result.metadata.get("title") in [msg.message[:128] for msg in bot_responses if msg.message]
else:
assert result is False
# self.extractor.cleanup()
# @patch.object(InstagramTbotExtractor, '_send_url_to_bot')
# @patch.object(InstagramTbotExtractor, '_process_messages')
# def test_download_invalid_link_returns_false(
# self, mock_process, mock_send, extractor, metadata_instagram
# ):
# # Setup Mocks
# # _send_url_to_bot -> simulate it returns (chat=MagicMock, since_id=100)
# mock_chat = MagicMock()
# mock_send.return_value = (mock_chat, 100)
# # _process_messages -> simulate it returns the text "You must enter a URL to a post"
# mock_process.return_value = "You must enter a URL to a post"
# result = extractor.download(metadata_instagram)
# assert result is False, "Should return False if message includes 'You must enter a URL to a post'"
pass
# TODO fully mock or use as authenticated test
# if expected_status:
# assert result.is_success()
# assert result.status == expected_status
# assert result.metadata.get("title") in [msg.message[:128] for msg in bot_responses if msg.message]
# else:
# assert result is False

Wyświetl plik

@ -9,57 +9,52 @@ from auto_archiver.core import Metadata, Feeder
def test_initialise_without_sheet_and_sheet_id(setup_module):
"""Ensure initialise() raises AssertionError if neither sheet nor sheet_id is set.
(shouldn't really be asserting in there)
(shouldn't really be asserting in there)
"""
with patch("gspread.service_account"):
feeder = setup_module("gsheet_feeder",
{"service_account": "dummy.json",
"sheet": None,
"sheet_id": None})
with pytest.raises(AssertionError):
feeder.initialise()
setup_module(
"gsheet_feeder",
{"service_account": "dummy.json", "sheet": None, "sheet_id": None},
)
@pytest.fixture
def gsheet_feeder(setup_module) -> GsheetsFeeder:
feeder = setup_module("gsheet_feeder",
{"service_account": "dummy.json",
"sheet": "test-auto-archiver",
"sheet_id": None,
"header": 1,
"columns": {
"url": "link",
"status": "archive status",
"folder": "destination folder",
"archive": "archive location",
"date": "archive date",
"thumbnail": "thumbnail",
"timestamp": "upload timestamp",
"title": "upload title",
"text": "text content",
"screenshot": "screenshot",
"hash": "hash",
"pdq_hash": "perceptual hashes",
"wacz": "wacz",
"replaywebpage": "replaywebpage",
},
"allow_worksheets": set(),
"block_worksheets": set(),
"use_sheet_names_in_stored_paths": True,
}
)
with patch("gspread.service_account"):
feeder = setup_module(
"gsheet_feeder",
{
"service_account": "dummy.json",
"sheet": "test-auto-archiver",
"sheet_id": None,
"header": 1,
"columns": {
"url": "link",
"status": "archive status",
"folder": "destination folder",
"archive": "archive location",
"date": "archive date",
"thumbnail": "thumbnail",
"timestamp": "upload timestamp",
"title": "upload title",
"text": "text content",
"screenshot": "screenshot",
"hash": "hash",
"pdq_hash": "perceptual hashes",
"wacz": "wacz",
"replaywebpage": "replaywebpage",
},
"allow_worksheets": set(),
"block_worksheets": set(),
"use_sheet_names_in_stored_paths": True,
},
)
feeder.gsheets_client = MagicMock()
return feeder
@pytest.fixture()
def worksheet(unpickle):
# Load the worksheet data from the pickle file
# only works for simple usage, cant reauthenticate but give structure
return unpickle("test_worksheet.pickle")
class TestWorksheet():
class TestWorksheet:
"""
mimics the bits we need from gworksheet
"""
@ -68,12 +63,17 @@ class TestWorksheet():
title = "TestSheet"
rows = [
{ "row": 2, "url": "http://example.com", "status": "", "folder": "" },
{ "row": 3, "url": "http://example.com", "status": "", "folder": "" },
{ "row": 4, "url": "", "status": "", "folder": "" },
{ "row": 5, "url": "https://another.com", "status": None, "folder": "" },
{ "row": 6, "url": "https://another.com", "status": "success", "folder": "some_folder" },
]
{"row": 2, "url": "http://example.com", "status": "", "folder": ""},
{"row": 3, "url": "http://example.com", "status": "", "folder": ""},
{"row": 4, "url": "", "status": "", "folder": ""},
{"row": 5, "url": "https://another.com", "status": None, "folder": ""},
{
"row": 6,
"url": "https://another.com",
"status": "success",
"folder": "some_folder",
},
]
def __init__(self):
self.wks = self.SheetSheet()
@ -91,6 +91,7 @@ class TestWorksheet():
matching = next((r for r in self.rows if r["row"] == row), {})
return matching.get(col_name, default)
def test__process_rows(gsheet_feeder: GsheetsFeeder):
testworksheet = TestWorksheet()
metadata_items = list(gsheet_feeder._process_rows(testworksheet))
@ -98,9 +99,12 @@ def test__process_rows(gsheet_feeder: GsheetsFeeder):
assert isinstance(metadata_items[0], Metadata)
assert metadata_items[0].get("url") == "http://example.com"
def test__set_metadata(gsheet_feeder: GsheetsFeeder, worksheet):
gsheet_feeder._set_context(worksheet, 1)
assert Metadata.get_context("gsheet") == {"row": 1, "worksheet": worksheet}
def test__set_metadata(gsheet_feeder: GsheetsFeeder):
worksheet = TestWorksheet()
metadata = Metadata()
gsheet_feeder._set_context(metadata, worksheet, 1)
assert metadata.get_context("gsheet") == {"row": 1, "worksheet": worksheet}
@pytest.mark.skip(reason="Not recognising folder column")
@ -111,18 +115,24 @@ def test__set_metadata_with_folder_pickled(gsheet_feeder: GsheetsFeeder, workshe
def test__set_metadata_with_folder(gsheet_feeder: GsheetsFeeder):
testworksheet = TestWorksheet()
metadata = Metadata()
testworksheet.wks.title = "TestSheet"
gsheet_feeder._set_context(testworksheet, 6)
assert Metadata.get_context("gsheet") == {"row": 6, "worksheet": testworksheet}
assert Metadata.get_context("folder") == "some-folder/test-auto-archiver/testsheet"
gsheet_feeder._set_context(metadata, testworksheet, 6)
assert metadata.get_context("gsheet") == {"row": 6, "worksheet": testworksheet}
assert metadata.get_context("folder") == "some-folder/test-auto-archiver/testsheet"
@pytest.mark.usefixtures("setup_module")
@pytest.mark.parametrize("sheet, sheet_id, expected_method, expected_arg, description", [
("TestSheet", None, "open", "TestSheet", "opening by sheet name"),
(None, "ABC123", "open_by_key", "ABC123", "opening by sheet ID")
])
def test_open_sheet_with_name_or_id(setup_module, sheet, sheet_id, expected_method, expected_arg, description):
@pytest.mark.parametrize(
"sheet, sheet_id, expected_method, expected_arg, description",
[
("TestSheet", None, "open", "TestSheet", "opening by sheet name"),
(None, "ABC123", "open_by_key", "ABC123", "opening by sheet ID"),
],
)
def test_open_sheet_with_name_or_id(
setup_module, sheet, sheet_id, expected_method, expected_arg, description
):
"""Ensure open_sheet() correctly opens by name or ID based on configuration."""
with patch("gspread.service_account") as mock_service_account:
mock_client = MagicMock()
@ -131,15 +141,16 @@ def test_open_sheet_with_name_or_id(setup_module, sheet, sheet_id, expected_meth
mock_client.open_by_key.return_value = "MockSheet"
# Setup module with parameterized values
feeder = setup_module("gsheet_feeder", {
"service_account": "dummy.json",
"sheet": sheet,
"sheet_id": sheet_id
})
feeder = setup_module(
"gsheet_feeder",
{"service_account": "dummy.json", "sheet": sheet, "sheet_id": sheet_id},
)
feeder.initialise()
sheet_result = feeder.open_sheet()
# Validate the correct method was called
getattr(mock_client, expected_method).assert_called_once_with(expected_arg), f"Failed: {description}"
getattr(mock_client, expected_method).assert_called_once_with(
expected_arg
), f"Failed: {description}"
assert sheet_result == "MockSheet", f"Failed: {description}"
@ -150,10 +161,10 @@ def test_open_sheet_with_sheet_id(setup_module):
mock_client = MagicMock()
mock_service_account.return_value = mock_client
mock_client.open_by_key.return_value = "MockSheet"
feeder = setup_module("gsheet_feeder",
{"service_account": "dummy.json",
"sheet": None,
"sheet_id": "ABC123"})
feeder = setup_module(
"gsheet_feeder",
{"service_account": "dummy.json", "sheet": None, "sheet_id": "ABC123"},
)
feeder.initialise()
sheet = feeder.open_sheet()
mock_client.open_by_key.assert_called_once_with("ABC123")
@ -161,47 +172,51 @@ def test_open_sheet_with_sheet_id(setup_module):
def test_should_process_sheet(setup_module):
gdb = setup_module("gsheet_feeder", {"service_account": "dummy.json",
"sheet": "TestSheet",
"sheet_id": None,
"allow_worksheets": {"TestSheet", "Sheet2"},
"block_worksheets": {"Sheet3"}}
)
with patch("gspread.service_account"):
gdb = setup_module(
"gsheet_feeder",
{
"service_account": "dummy.json",
"sheet": "TestSheet",
"sheet_id": None,
"allow_worksheets": {"TestSheet", "Sheet2"},
"block_worksheets": {"Sheet3"},
},
)
assert gdb.should_process_sheet("TestSheet") == True
assert gdb.should_process_sheet("Sheet3") == False
# False if allow_worksheets is set
assert gdb.should_process_sheet("AnotherSheet") == False
@pytest.mark.skip
# @pytest.mark.skip(reason="Requires a real connection")
class TestGSheetsFeederReal:
"""Testing GSheetsFeeder class"""
""" Testing GSheetsFeeder class """
module_name: str = 'gsheet_feeder'
module_name: str = "gsheet_feeder"
feeder: GsheetsFeeder
# You must follow the setup process explain in the docs for this to work
config: dict = {
# TODO: Create test creds
"service_account": "secrets/service_account.json",
"sheet": "test-auto-archiver",
"sheet_id": None,
"header": 1,
"columns": {
"url": "link",
"status": "archive status",
"folder": "destination folder",
"archive": "archive location",
"date": "archive date",
"thumbnail": "thumbnail",
"timestamp": "upload timestamp",
"title": "upload title",
"text": "text content",
"screenshot": "screenshot",
"hash": "hash",
"pdq_hash": "perceptual hashes",
"wacz": "wacz",
"replaywebpage": "replaywebpage",
},
"url": "link",
"status": "archive status",
"folder": "destination folder",
"archive": "archive location",
"date": "archive date",
"thumbnail": "thumbnail",
"timestamp": "upload timestamp",
"title": "upload title",
"text": "text content",
"screenshot": "screenshot",
"hash": "hash",
"pdq_hash": "perceptual hashes",
"wacz": "wacz",
"replaywebpage": "replaywebpage",
},
"allow_worksheets": set(),
"block_worksheets": set(),
"use_sheet_names_in_stored_paths": True,
@ -213,9 +228,7 @@ class TestGSheetsFeederReal:
self.module_name is not None
), "self.module_name must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"
self.feeder: Type[Feeder] = setup_module(
self.module_name, self.config
)
self.feeder: Type[Feeder] = setup_module(self.module_name, self.config)
def reset_test_sheet(self):
"""Clears test sheet and re-adds headers to ensure consistent test results."""
@ -225,19 +238,17 @@ class TestGSheetsFeederReal:
worksheet.clear()
worksheet.append_row(["Link", "Archive Status"])
def test_initialise(self):
self.feeder.initialise()
def test_setup(self):
assert hasattr(self.feeder, "gsheets_client")
@pytest.mark.download
def test_open_sheet_real_connection(self):
"""Ensure open_sheet() connects to a real Google Sheets instance."""
self.feeder.initialise()
sheet = self.feeder.open_sheet()
assert sheet is not None, "open_sheet() should return a valid sheet instance"
assert hasattr(sheet, "worksheets"), "Returned object should have worksheets method"
assert hasattr(
sheet, "worksheets"
), "Returned object should have worksheets method"
@pytest.mark.download
def test_iter_yields_metadata_real_data(self):
"""Ensure __iter__() yields Metadata objects for real test sheet data."""
self.reset_test_sheet()
@ -260,7 +271,6 @@ class TestGSheetsFeederReal:
assert metadata_list[0].metadata.get("url") == "https://example.com"
# TODO
# Test two sheets

Wyświetl plik

@ -1,9 +1,101 @@
from typing import Type
import pytest
from unittest.mock import MagicMock, patch, mock_open
from unittest.mock import MagicMock, patch, PropertyMock
from auto_archiver.core import Media
from auto_archiver.modules.hash_enricher import HashEnricher
from auto_archiver.modules.s3_storage import s3_storage
from tests.storages.test_storage_base import TestStorageBase
@patch('boto3.client')
@pytest.fixture
def s3_store(setup_module):
config: dict = {
"path_generator": "flat",
"filename_generator": "static",
"bucket": "test-bucket",
"region": "test-region",
"key": "test-key",
"secret": "test-secret",
"random_no_duplicate": False,
"endpoint_url": "https://{region}.example.com",
"cdn_url": "https://cdn.example.com/{key}",
"private": False,
}
s3_storage = setup_module("s3_storage", config)
return s3_storage
def test_client_initialization(s3_store):
"""Test that S3 client is initialized with correct parameters"""
assert s3_store.s3 is not None
assert s3_store.s3.meta.region_name == 'test-region'
def test_get_cdn_url_generation(s3_store):
"""Test CDN URL formatting """
media = Media("test.txt")
media.key = "path/to/file.txt"
url = s3_store.get_cdn_url(media)
assert url == "https://cdn.example.com/path/to/file.txt"
media.key = "another/path.jpg"
assert s3_store.get_cdn_url(media) == "https://cdn.example.com/another/path.jpg"
@patch.object(s3_storage.S3Storage, 'file_in_folder')
def test_skips_upload_when_duplicate_exists(mock_file_in_folder, s3_store):
"""Test that upload skips when file_in_folder finds existing object"""
# Setup test-specific configuration
s3_store.random_no_duplicate = True
mock_file_in_folder.return_value = "existing_folder/existing_file.txt"
# Create test media with calculated hash
media = Media("test.txt")
media.key = "original_path.txt"
# Mock hash calculation
with patch.object(s3_store, 'calculate_hash') as mock_calculate_hash:
mock_calculate_hash.return_value = "testhash123"
# Verify upload
assert s3_store.is_upload_needed(media) is False
assert media.key == "existing_folder/existing_file.txt"
assert media.get("previously archived") is True
with patch.object(s3_store.s3, 'upload_fileobj') as mock_upload:
result = s3_store.uploadf(None, media)
mock_upload.assert_not_called()
assert result is True
@patch.object(s3_storage.S3Storage, 'is_upload_needed')
def test_uploads_with_correct_parameters(mock_upload_needed, s3_store):
media = Media("test.txt")
mock_upload_needed.return_value = True
media.mimetype = 'image/png'
mock_file = MagicMock()
with patch.object(s3_store.s3, 'upload_fileobj') as mock_upload:
s3_store.uploadf(mock_file, media)
# Verify core upload parameters
mock_upload.assert_called_once_with(
mock_file,
Bucket='test-bucket',
# Key='original_key.txt',
Key=None,
ExtraArgs={
'ACL': 'public-read',
'ContentType': 'image/png'
}
)
# ============================================================
class TestGDriveStorage:
@ -29,20 +121,13 @@ class TestGDriveStorage:
@patch('boto3.client')
@pytest.fixture(autouse=True)
def setup_storage(self, setup_module):
he = HashEnricher()
self.storage = setup_module(self.module_name, self.config)
self.storage.initialise()
@patch('boto3.client')
def test_client_initialization(self, mock_boto_client, setup_module):
def test_client_initialization(self, setup_storage):
"""Test that S3 client is initialized with correct parameters"""
self.storage.initialise()
mock_boto_client.assert_called_once_with(
's3',
region_name='test-region',
endpoint_url='https://test-region.example.com',
aws_access_key_id='test-key',
aws_secret_access_key='test-secret'
)
assert self.storage.s3 is not None
assert self.storage.s3.meta.region_name == 'test-region'
def test_get_cdn_url_generation(self):
"""Test CDN URL formatting """
@ -53,6 +138,18 @@ class TestGDriveStorage:
media.key = "another/path.jpg"
assert self.storage.get_cdn_url(media) == "https://cdn.example.com/another/path.jpg"
def test_upload_decision_logic(self):
"""Test is_upload_needed under different conditions"""
media = Media("test.txt")
# Test random_no_duplicate disabled
assert self.storage.is_upload_needed(media) is True
# Test duplicate exists
self.storage.random_no_duplicate = True
with patch.object(self.storage, 'file_in_folder', return_value='existing.txt'):
assert self.storage.is_upload_needed(media) is False
assert media.key == 'existing.txt'
@patch.object(s3_storage.S3Storage, 'file_in_folder')
def test_skips_upload_when_duplicate_exists(self, mock_file_in_folder):

Wyświetl plik

@ -2,7 +2,6 @@ from typing import Type
import pytest
from auto_archiver.core.context import ArchivingContext
from auto_archiver.core.metadata import Metadata
from auto_archiver.core.storage import Storage