kopia lustrzana https://github.com/bellingcat/auto-archiver
Updates tests to use pytest-mock.
rodzic
657fbd357d
commit
f0fd9bf445
|
@ -1815,6 +1815,24 @@ loguru = "*"
|
|||
[package.extras]
|
||||
test = ["pytest", "pytest-cov"]
|
||||
|
||||
[[package]]
|
||||
name = "pytest-mock"
|
||||
version = "3.14.0"
|
||||
description = "Thin-wrapper around the mock package for easier use with pytest"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
groups = ["dev"]
|
||||
files = [
|
||||
{file = "pytest-mock-3.14.0.tar.gz", hash = "sha256:2719255a1efeceadbc056d6bf3df3d1c5015530fb40cf347c0f9afac88410bd0"},
|
||||
{file = "pytest_mock-3.14.0-py3-none-any.whl", hash = "sha256:0b72c38033392a5f4621342fe11e9219ac11ec9d375f8e2a0c164539e0d70f6f"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
pytest = ">=6.2.5"
|
||||
|
||||
[package.extras]
|
||||
dev = ["pre-commit", "pytest-asyncio", "tox"]
|
||||
|
||||
[[package]]
|
||||
name = "python-dateutil"
|
||||
version = "2.9.0.post0"
|
||||
|
@ -3166,4 +3184,4 @@ test = ["pytest (>=8.1,<9.0)", "pytest-rerunfailures (>=14.0,<15.0)"]
|
|||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.10,<3.13"
|
||||
content-hash = "b3a6142d6495bc4c8741e9411d29352af219851e4b84b263f991e1bb6db1614e"
|
||||
content-hash = "2d0a953383901fe12e97f6f56a76a9d8008788695425792eedbf739a18585188"
|
||||
|
|
|
@ -63,6 +63,7 @@ dependencies = [
|
|||
pytest = "^8.3.4"
|
||||
autopep8 = "^2.3.1"
|
||||
pytest-loguru = "^0.4.0"
|
||||
pytest-mock = "^3.14.0"
|
||||
|
||||
[tool.poetry.group.docs.dependencies]
|
||||
sphinx = "^8.1.3"
|
||||
|
|
|
@ -7,7 +7,6 @@ from datetime import datetime, timezone
|
|||
from tempfile import TemporaryDirectory
|
||||
from typing import Dict, Tuple
|
||||
import hashlib
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
|
@ -134,14 +133,29 @@ def unpickle():
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_binary_dependencies():
|
||||
with patch("shutil.which") as mock_shutil_which:
|
||||
# Mock all binary dependencies as available
|
||||
mock_shutil_which.return_value = "/usr/bin/fake_binary"
|
||||
yield mock_shutil_which
|
||||
def mock_binary_dependencies(mocker):
|
||||
mock_shutil_which = mocker.patch("shutil.which")
|
||||
# Mock all binary dependencies as available
|
||||
mock_shutil_which.return_value = "/usr/bin/fake_binary"
|
||||
return mock_shutil_which
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_datetime():
|
||||
return datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_sleep(mocker):
|
||||
"""Globally mock time.sleep to avoid delays."""
|
||||
return mocker.patch("time.sleep")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def metadata():
|
||||
metadata = Metadata()
|
||||
metadata.set("_processed_at", "2021-01-01T00:00:00")
|
||||
metadata.set_title("Example Title")
|
||||
metadata.set_content("Example Content")
|
||||
metadata.set_url("https://example.com")
|
||||
return metadata
|
|
@ -1,5 +1,3 @@
|
|||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata
|
||||
|
@ -35,35 +33,35 @@ def test_fetch_no_cache(api_db, metadata):
|
|||
assert api_db.fetch(metadata) is None
|
||||
|
||||
|
||||
def test_fetch_fail_status(api_db, metadata):
|
||||
def test_fetch_fail_status(api_db, metadata, mocker):
|
||||
# Test response fail in fetch method
|
||||
with patch("auto_archiver.modules.api_db.api_db.requests.get") as mock_get:
|
||||
mock_get.return_value.status_code = 400
|
||||
mock_get.return_value.json.return_value = {}
|
||||
with patch("loguru.logger.error") as mock_error:
|
||||
assert api_db.fetch(metadata) is False
|
||||
mock_error.assert_called_once_with("AA API FAIL (400): {}")
|
||||
mock_get = mocker.patch("auto_archiver.modules.api_db.api_db.requests.get")
|
||||
mock_get.return_value.status_code = 400
|
||||
mock_get.return_value.json.return_value = {}
|
||||
mock_error = mocker.patch("loguru.logger.error")
|
||||
assert api_db.fetch(metadata) is False
|
||||
mock_error.assert_called_once_with("AA API FAIL (400): {}")
|
||||
|
||||
|
||||
def test_fetch(api_db, metadata):
|
||||
def test_fetch(api_db, metadata, mocker):
|
||||
# Test successful fetch method
|
||||
with patch("auto_archiver.modules.api_db.api_db.requests.get") as mock_get,\
|
||||
patch("auto_archiver.core.metadata.datetime.datetime") as mock_datetime:
|
||||
mock_datetime.now.return_value = "2021-01-01T00:00:00"
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.json.return_value = [{"result": {}}, {"result":
|
||||
{'media': [], 'metadata': {'_processed_at': '2021-01-01T00:00:00', 'url': 'https://example.com'},
|
||||
'status': 'no archiver'}}]
|
||||
assert api_db.fetch(metadata) == metadata
|
||||
mock_get = mocker.patch("auto_archiver.modules.api_db.api_db.requests.get")
|
||||
mock_datetime = mocker.patch("auto_archiver.core.metadata.datetime.datetime")
|
||||
mock_datetime.now.return_value = "2021-01-01T00:00:00"
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.json.return_value = [{"result": {}}, {"result":
|
||||
{'media': [], 'metadata': {'_processed_at': '2021-01-01T00:00:00', 'url': 'https://example.com'},
|
||||
'status': 'no archiver'}}]
|
||||
assert api_db.fetch(metadata) == metadata
|
||||
|
||||
|
||||
def test_done_success(api_db, metadata):
|
||||
with patch("auto_archiver.modules.api_db.api_db.requests.post") as mock_post:
|
||||
mock_post.return_value.status_code = 201
|
||||
api_db.done(metadata)
|
||||
mock_post.assert_called_once()
|
||||
mock_post.assert_called_once_with("https://api.example.com/interop/submit-archive",
|
||||
json={'author_id': 'Someone', 'url': 'https://example.com',
|
||||
'public': False, 'group_id': '123', 'tags': ['[', ']'], 'result': '{"status": "no archiver", "metadata": {"_processed_at": "2021-01-01T00:00:00", "url": "https://example.com"}, "media": []}'},
|
||||
headers={'Authorization': 'Bearer test-token'})
|
||||
def test_done_success(api_db, metadata, mocker):
|
||||
mock_post = mocker.patch("auto_archiver.modules.api_db.api_db.requests.post")
|
||||
mock_post.return_value.status_code = 201
|
||||
api_db.done(metadata)
|
||||
mock_post.assert_called_once()
|
||||
mock_post.assert_called_once_with("https://api.example.com/interop/submit-archive",
|
||||
json={'author_id': 'Someone', 'url': 'https://example.com',
|
||||
'public': False, 'group_id': '123', 'tags': ['[', ']'], 'result': '{"status": "no archiver", "metadata": {"_processed_at": "2021-01-01T00:00:00", "url": "https://example.com"}, "media": []}'},
|
||||
headers={'Authorization': 'Bearer test-token'})
|
||||
|
||||
|
|
|
@ -1,6 +1,4 @@
|
|||
from datetime import datetime, timezone
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata, Media
|
||||
|
@ -9,8 +7,8 @@ from auto_archiver.modules.gsheet_feeder import GWorksheet
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_gworksheet():
|
||||
mock_gworksheet = MagicMock(spec=GWorksheet)
|
||||
def mock_gworksheet(mocker):
|
||||
mock_gworksheet = mocker.MagicMock(spec=GWorksheet)
|
||||
mock_gworksheet.col_exists.return_value = True
|
||||
mock_gworksheet.get_cell.return_value = ""
|
||||
mock_gworksheet.get_row.return_value = {}
|
||||
|
@ -18,14 +16,14 @@ def mock_gworksheet():
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_metadata():
|
||||
metadata: Metadata = MagicMock(spec=Metadata)
|
||||
def mock_metadata(mocker):
|
||||
metadata: Metadata = mocker.MagicMock(spec=Metadata)
|
||||
metadata.get_url.return_value = "http://example.com"
|
||||
metadata.status = "done"
|
||||
metadata.get_title.return_value = "Example Title"
|
||||
metadata.get.return_value = "Example Content"
|
||||
metadata.get_timestamp.return_value = "2025-01-01T00:00:00"
|
||||
metadata.get_final_media.return_value = MagicMock(spec=Media)
|
||||
metadata.get_final_media.return_value = mocker.MagicMock(spec=Media)
|
||||
metadata.get_all_media.return_value = []
|
||||
metadata.get_media_by_id.return_value = None
|
||||
metadata.get_first_image.return_value = None
|
||||
|
@ -47,21 +45,21 @@ def metadata():
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_media():
|
||||
def mock_media(mocker):
|
||||
"""Fixture for a mock Media object."""
|
||||
mock_media = MagicMock(spec=Media)
|
||||
mock_media = mocker.MagicMock(spec=Media)
|
||||
mock_media.urls = ["http://example.com/media"]
|
||||
mock_media.get.return_value = "not-calculated"
|
||||
return mock_media
|
||||
|
||||
@pytest.fixture
|
||||
def gsheets_db(mock_gworksheet, setup_module):
|
||||
def gsheets_db(mock_gworksheet, setup_module, mocker):
|
||||
db = setup_module("gsheet_db", {
|
||||
"allow_worksheets": "set()",
|
||||
"block_worksheets": "set()",
|
||||
"use_sheet_names_in_stored_paths": "True",
|
||||
})
|
||||
db._retrieve_gsheet = MagicMock(return_value=(mock_gworksheet, 1))
|
||||
db._retrieve_gsheet = mocker.MagicMock(return_value=(mock_gworksheet, 1))
|
||||
return db
|
||||
|
||||
|
||||
|
@ -109,27 +107,26 @@ def test_aborted(gsheets_db, mock_metadata, mock_gworksheet):
|
|||
mock_gworksheet.set_cell.assert_called_once_with(1, 'status', '')
|
||||
|
||||
|
||||
def test_done(gsheets_db, metadata, mock_gworksheet, expected_calls):
|
||||
with patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00'):
|
||||
gsheets_db.done(metadata)
|
||||
def test_done(gsheets_db, metadata, mock_gworksheet, expected_calls, mocker):
|
||||
mocker.patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00')
|
||||
gsheets_db.done(metadata)
|
||||
mock_gworksheet.batch_set_cell.assert_called_once_with(expected_calls)
|
||||
|
||||
|
||||
def test_done_cached(gsheets_db, metadata, mock_gworksheet):
|
||||
with patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00'):
|
||||
gsheets_db.done(metadata, cached=True)
|
||||
def test_done_cached(gsheets_db, metadata, mock_gworksheet, mocker):
|
||||
mocker.patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00')
|
||||
gsheets_db.done(metadata, cached=True)
|
||||
|
||||
# Verify the status message includes "[cached]"
|
||||
call_args = mock_gworksheet.batch_set_cell.call_args[0][0]
|
||||
assert any(call[2].startswith("[cached]") for call in call_args)
|
||||
|
||||
|
||||
def test_done_missing_media(gsheets_db, metadata, mock_gworksheet):
|
||||
def test_done_missing_media(gsheets_db, metadata, mock_gworksheet, mocker):
|
||||
# clear media from metadata
|
||||
metadata.media = []
|
||||
with patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp",
|
||||
return_value='2025-02-01T00:00:00+00:00'):
|
||||
gsheets_db.done(metadata)
|
||||
mocker.patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00')
|
||||
gsheets_db.done(metadata)
|
||||
# Verify nothing media-related gets updated
|
||||
call_args = mock_gworksheet.batch_set_cell.call_args[0][0]
|
||||
media_fields = {'archive', 'screenshot', 'thumbnail', 'wacz', 'replaywebpage'}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import datetime
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
@ -9,18 +8,18 @@ from auto_archiver.modules.meta_enricher import MetaEnricher
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_metadata():
|
||||
def mock_metadata(mocker):
|
||||
"""Creates a mock Metadata object."""
|
||||
mock: Metadata = MagicMock(spec=Metadata)
|
||||
mock: Metadata = mocker.MagicMock(spec=Metadata)
|
||||
mock.get_url.return_value = "https://example.com"
|
||||
mock.is_empty.return_value = False # Default to not empty
|
||||
mock.get_all_media.return_value = []
|
||||
return mock
|
||||
|
||||
@pytest.fixture
|
||||
def mock_media():
|
||||
def mock_media(mocker):
|
||||
"""Creates a mock Media object."""
|
||||
mock: Media = MagicMock(spec=Media)
|
||||
mock: Media = mocker.MagicMock(spec=Media)
|
||||
mock.filename = "mock_file.txt"
|
||||
return mock
|
||||
|
||||
|
@ -90,14 +89,14 @@ def test_enrich_file_sizes_no_media(meta_enricher, metadata):
|
|||
assert metadata.get("total_size") == "0.0 bytes"
|
||||
|
||||
|
||||
def test_enrich_archive_duration(meta_enricher, metadata):
|
||||
def test_enrich_archive_duration(meta_enricher, metadata, mocker):
|
||||
# Set fixed "processed at" time in the past
|
||||
processed_at = datetime.now(timezone.utc) - timedelta(minutes=10, seconds=30)
|
||||
metadata.set("_processed_at", processed_at)
|
||||
# patch datetime
|
||||
with patch("datetime.datetime") as mock_datetime:
|
||||
mock_now = datetime.now(timezone.utc)
|
||||
mock_datetime.now.return_value = mock_now
|
||||
meta_enricher.enrich_archive_duration(metadata)
|
||||
mock_datetime = mocker.patch("datetime.datetime")
|
||||
mock_now = datetime.now(timezone.utc)
|
||||
mock_datetime.now.return_value = mock_now
|
||||
meta_enricher.enrich_archive_duration(metadata)
|
||||
|
||||
assert metadata.get("archive_duration_seconds") == 630
|
|
@ -1,14 +1,13 @@
|
|||
from unittest.mock import MagicMock, patch, Mock
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.core import Media
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_media():
|
||||
def mock_media(mocker):
|
||||
"""Creates a mock Media object."""
|
||||
mock: Media = MagicMock(spec=Media)
|
||||
mock: Media = mocker.MagicMock(spec=Media)
|
||||
mock.filename = "mock_file.txt"
|
||||
return mock
|
||||
|
||||
|
@ -26,8 +25,8 @@ def enricher(setup_module, mock_binary_dependencies):
|
|||
("", {}),
|
||||
],
|
||||
)
|
||||
@patch("subprocess.run")
|
||||
def test_get_metadata(mock_run, enricher, output, expected):
|
||||
def test_get_metadata(enricher, output, expected, mocker):
|
||||
mock_run = mocker.patch("subprocess.run")
|
||||
mock_run.return_value.stdout = output
|
||||
mock_run.return_value.stderr = ""
|
||||
mock_run.return_value.returncode = 0
|
||||
|
@ -39,17 +38,17 @@ def test_get_metadata(mock_run, enricher, output, expected):
|
|||
)
|
||||
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_get_metadata_exiftool_not_found(mock_run, enricher):
|
||||
def test_get_metadata_exiftool_not_found(enricher, mocker):
|
||||
mock_run = mocker.patch("subprocess.run")
|
||||
mock_run.side_effect = FileNotFoundError
|
||||
result = enricher.get_metadata("test.jpg")
|
||||
assert result == {}
|
||||
|
||||
|
||||
def test_enrich_sets_metadata(enricher):
|
||||
media1 = Mock(filename="img1.jpg")
|
||||
media2 = Mock(filename="img2.jpg")
|
||||
metadata = Mock()
|
||||
def test_enrich_sets_metadata(enricher, mocker):
|
||||
media1 = mocker.Mock(filename="img1.jpg")
|
||||
media2 = mocker.Mock(filename="img2.jpg")
|
||||
metadata = mocker.Mock()
|
||||
metadata.media = [media1, media2]
|
||||
enricher.get_metadata = lambda f: {"key": "value"} if f == "img1.jpg" else {}
|
||||
|
||||
|
@ -60,24 +59,23 @@ def test_enrich_sets_metadata(enricher):
|
|||
assert metadata.media == [media1, media2]
|
||||
|
||||
|
||||
def test_enrich_empty_media(enricher):
|
||||
metadata = Mock()
|
||||
def test_enrich_empty_media(enricher, mocker):
|
||||
metadata = mocker.Mock()
|
||||
metadata.media = []
|
||||
# Should not raise errors
|
||||
enricher.enrich(metadata)
|
||||
|
||||
|
||||
@patch("loguru.logger.error")
|
||||
@patch("subprocess.run")
|
||||
def test_get_metadata_error_handling(mock_run, mock_logger_error, enricher):
|
||||
mock_run.side_effect = Exception("Test error")
|
||||
def test_get_metadata_error_handling(enricher, mocker):
|
||||
mocker.patch("subprocess.run", side_effect=Exception("Test error"))
|
||||
mock_log = mocker.patch("loguru.logger.error")
|
||||
result = enricher.get_metadata("test.jpg")
|
||||
assert result == {}
|
||||
mock_logger_error.assert_called_once()
|
||||
assert "Error occurred: " in mock_log.call_args[0][0]
|
||||
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_metadata_pickle(mock_run, enricher, unpickle):
|
||||
def test_metadata_pickle(enricher, unpickle, mocker):
|
||||
mock_run = mocker.patch("subprocess.run")
|
||||
# Uses pickled values
|
||||
mock_run.return_value = unpickle("metadata_enricher_exif.pickle")
|
||||
metadata = unpickle("metadata_enricher_ytshort_input.pickle")
|
||||
|
@ -86,4 +84,5 @@ def test_metadata_pickle(mock_run, enricher, unpickle):
|
|||
expected_media = expected.media
|
||||
actual_media = metadata.media
|
||||
assert len(expected_media) == len(actual_media)
|
||||
assert actual_media[0].properties.get("metadata") == expected_media[0].properties.get("metadata")
|
||||
assert actual_media[0].properties.get("metadata") == expected_media[0].properties.get("metadata")
|
||||
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from PIL import UnidentifiedImageError
|
||||
|
||||
|
@ -21,11 +19,11 @@ def metadata_with_images():
|
|||
return m
|
||||
|
||||
|
||||
def test_successful_enrich(metadata_with_images):
|
||||
def test_successful_enrich(metadata_with_images, mocker):
|
||||
with (
|
||||
patch("pdqhash.compute", return_value=([1, 0, 1, 0] * 64, 100)),
|
||||
patch("PIL.Image.open"),
|
||||
patch.object(Media, "is_image", return_value=True) as mock_is_image,
|
||||
mocker.patch("pdqhash.compute", return_value=([1, 0, 1, 0] * 64, 100)),
|
||||
mocker.patch("PIL.Image.open"),
|
||||
mocker.patch.object(Media, "is_image", return_value=True) as mock_is_image,
|
||||
):
|
||||
enricher = PdqHashEnricher()
|
||||
enricher.enrich(metadata_with_images)
|
||||
|
@ -35,27 +33,24 @@ def test_successful_enrich(metadata_with_images):
|
|||
assert media.get("pdq_hash") is not None
|
||||
|
||||
|
||||
def test_enrich_skip_non_image(metadata_with_images):
|
||||
with (
|
||||
patch.object(Media, "is_image", return_value=False),
|
||||
patch("pdqhash.compute") as mock_pdq,
|
||||
):
|
||||
enricher = PdqHashEnricher()
|
||||
enricher.enrich(metadata_with_images)
|
||||
mock_pdq.assert_not_called()
|
||||
def test_enrich_skip_non_image(metadata_with_images, mocker):
|
||||
mocker.patch.object(Media, "is_image", return_value=False)
|
||||
mock_pdq = mocker.patch("pdqhash.compute")
|
||||
|
||||
enricher = PdqHashEnricher()
|
||||
enricher.enrich(metadata_with_images)
|
||||
mock_pdq.assert_not_called()
|
||||
|
||||
|
||||
def test_enrich_handles_corrupted_image(metadata_with_images):
|
||||
with (
|
||||
patch("PIL.Image.open", side_effect=UnidentifiedImageError("Corrupted image")),
|
||||
patch("pdqhash.compute") as mock_pdq,
|
||||
patch("loguru.logger.error") as mock_logger,
|
||||
):
|
||||
enricher = PdqHashEnricher()
|
||||
enricher.enrich(metadata_with_images)
|
||||
def test_enrich_handles_corrupted_image(metadata_with_images, mocker):
|
||||
mocker.patch("PIL.Image.open", side_effect=UnidentifiedImageError("Corrupted image"))
|
||||
mock_pdq = mocker.patch("pdqhash.compute")
|
||||
mock_logger = mocker.patch("loguru.logger.error")
|
||||
enricher = PdqHashEnricher()
|
||||
enricher.enrich(metadata_with_images)
|
||||
|
||||
assert mock_logger.call_count == len(metadata_with_images.media)
|
||||
mock_pdq.assert_not_called()
|
||||
assert mock_logger.call_count == len(metadata_with_images.media)
|
||||
mock_pdq.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -66,19 +61,18 @@ def test_enrich_handles_corrupted_image(metadata_with_images):
|
|||
("regular-image", True),
|
||||
]
|
||||
)
|
||||
def test_enrich_excludes_by_filetype(media_id, should_have_hash):
|
||||
def test_enrich_excludes_by_filetype(media_id, should_have_hash, mocker):
|
||||
metadata = Metadata()
|
||||
metadata.set_url("https://example.com")
|
||||
metadata.add_media(Media(filename="image.jpg").set("id", media_id))
|
||||
|
||||
with (
|
||||
patch("pdqhash.compute", return_value=([1, 0, 1, 0] * 64, 100)),
|
||||
patch("PIL.Image.open"),
|
||||
patch.object(Media, "is_image", return_value=True),
|
||||
):
|
||||
enricher = PdqHashEnricher()
|
||||
enricher.enrich(metadata)
|
||||
mocker.patch("pdqhash.compute", return_value=([1, 0, 1, 0] * 64, 100))
|
||||
mocker.patch("PIL.Image.open")
|
||||
mocker.patch.object(Media, "is_image", return_value=True)
|
||||
|
||||
media_item = metadata.media[0]
|
||||
assert (media_item.get("pdq_hash") is not None) == should_have_hash
|
||||
enricher = PdqHashEnricher()
|
||||
enricher.enrich(metadata)
|
||||
|
||||
media_item = metadata.media[0]
|
||||
assert (media_item.get("pdq_hash") is not None) == should_have_hash
|
||||
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import base64
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
from selenium.common.exceptions import TimeoutException
|
||||
|
@ -9,53 +8,47 @@ from auto_archiver.modules.screenshot_enricher import ScreenshotEnricher
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_selenium_env():
|
||||
# Patches Selenium calls and driver checks in one place.
|
||||
with (
|
||||
patch("shutil.which") as mock_which,
|
||||
patch("auto_archiver.utils.webdriver.CookieSettingDriver") as mock_driver_class,
|
||||
patch(
|
||||
"selenium.webdriver.common.selenium_manager.SeleniumManager.binary_paths"
|
||||
) as mock_binary_paths,
|
||||
patch("pathlib.Path.is_file", return_value=True),
|
||||
patch("subprocess.Popen") as mock_popen,
|
||||
patch(
|
||||
"selenium.webdriver.common.service.Service.is_connectable",
|
||||
return_value=True,
|
||||
),
|
||||
patch("selenium.webdriver.FirefoxOptions") as mock_firefox_options,
|
||||
):
|
||||
# Mock driver existence
|
||||
def mock_which_side_effect(dep):
|
||||
return "/mock/geckodriver" if dep == "geckodriver" else None
|
||||
def mock_selenium_env(mocker):
|
||||
"""Patches Selenium calls and driver checks in one place."""
|
||||
|
||||
mock_which.side_effect = mock_which_side_effect
|
||||
# Mock binary paths
|
||||
mock_binary_paths.return_value = {
|
||||
"driver_path": "/mock/driver",
|
||||
"browser_path": "/mock/browser",
|
||||
}
|
||||
# Popen
|
||||
mock_proc = MagicMock()
|
||||
mock_proc.poll.return_value = None
|
||||
mock_popen.return_value = mock_proc
|
||||
# CookieSettingDriver -> returns a mock driver
|
||||
mock_driver = MagicMock()
|
||||
mock_driver_class.return_value = mock_driver
|
||||
# FirefoxOptions
|
||||
mock_options_instance = MagicMock()
|
||||
mock_firefox_options.return_value = mock_options_instance
|
||||
yield mock_driver, mock_driver_class, mock_options_instance
|
||||
# Patch external dependencies
|
||||
mock_which = mocker.patch("shutil.which")
|
||||
mock_driver_class = mocker.patch("auto_archiver.utils.webdriver.CookieSettingDriver")
|
||||
mock_binary_paths = mocker.patch("selenium.webdriver.common.selenium_manager.SeleniumManager.binary_paths")
|
||||
mock_is_file = mocker.patch("pathlib.Path.is_file", return_value=True)
|
||||
mock_popen = mocker.patch("subprocess.Popen")
|
||||
mock_is_connectable = mocker.patch("selenium.webdriver.common.service.Service.is_connectable", return_value=True)
|
||||
mock_firefox_options = mocker.patch("selenium.webdriver.FirefoxOptions")
|
||||
# Define side effect for `shutil.which`
|
||||
def mock_which_side_effect(dep):
|
||||
return "/mock/geckodriver" if dep == "geckodriver" else None
|
||||
mock_which.side_effect = mock_which_side_effect
|
||||
|
||||
# Mock binary paths
|
||||
mock_binary_paths.return_value = {
|
||||
"driver_path": "/mock/driver",
|
||||
"browser_path": "/mock/browser",
|
||||
}
|
||||
# Mock `subprocess.Popen`
|
||||
mock_proc = mocker.MagicMock()
|
||||
mock_proc.poll.return_value = None
|
||||
mock_popen.return_value = mock_proc
|
||||
# Mock `CookieSettingDriver`
|
||||
mock_driver = mocker.MagicMock()
|
||||
mock_driver_class.return_value = mock_driver
|
||||
# Mock `FirefoxOptions`
|
||||
mock_options_instance = mocker.MagicMock()
|
||||
mock_firefox_options.return_value = mock_options_instance
|
||||
yield mock_driver, mock_driver_class, mock_options_instance
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def common_patches(tmp_path):
|
||||
with (
|
||||
patch("auto_archiver.utils.url.is_auth_wall", return_value=False),
|
||||
patch("os.path.join", return_value=str(tmp_path / "test.png")),
|
||||
patch("time.sleep"),
|
||||
):
|
||||
yield
|
||||
def common_patches(tmp_path, mocker):
|
||||
"""Patches common utilities used across multiple tests."""
|
||||
mocker.patch("auto_archiver.utils.url.is_auth_wall", return_value=False)
|
||||
mocker.patch("os.path.join", return_value=str(tmp_path / "test.png"))
|
||||
mocker.patch("time.sleep")
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -117,37 +110,38 @@ def test_enrich_auth_wall(
|
|||
common_patches,
|
||||
url,
|
||||
is_auth,
|
||||
mocker
|
||||
):
|
||||
# Testing with and without is_auth_wall
|
||||
mock_driver, mock_driver_class, _ = mock_selenium_env
|
||||
with patch("auto_archiver.utils.url.is_auth_wall", return_value=is_auth):
|
||||
metadata_with_video.set_url(url)
|
||||
screenshot_enricher.enrich(metadata_with_video)
|
||||
mocker.patch("auto_archiver.utils.url.is_auth_wall", return_value=is_auth)
|
||||
metadata_with_video.set_url(url)
|
||||
screenshot_enricher.enrich(metadata_with_video)
|
||||
|
||||
if is_auth:
|
||||
mock_driver.get.assert_not_called()
|
||||
assert len(metadata_with_video.media) == 1
|
||||
assert metadata_with_video.media[0].properties.get("id") == "video1"
|
||||
else:
|
||||
mock_driver.get.assert_called_once_with(url)
|
||||
assert len(metadata_with_video.media) == 2
|
||||
assert metadata_with_video.media[1].properties.get("id") == "screenshot"
|
||||
if is_auth:
|
||||
mock_driver.get.assert_not_called()
|
||||
assert len(metadata_with_video.media) == 1
|
||||
assert metadata_with_video.media[0].properties.get("id") == "video1"
|
||||
else:
|
||||
mock_driver.get.assert_called_once_with(url)
|
||||
assert len(metadata_with_video.media) == 2
|
||||
assert metadata_with_video.media[1].properties.get("id") == "screenshot"
|
||||
|
||||
|
||||
def test_handle_timeout_exception(
|
||||
screenshot_enricher, metadata_with_video, mock_selenium_env
|
||||
screenshot_enricher, metadata_with_video, mock_selenium_env, mocker
|
||||
):
|
||||
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
|
||||
|
||||
mock_driver.get.side_effect = TimeoutException
|
||||
with patch("loguru.logger.info") as mock_log:
|
||||
screenshot_enricher.enrich(metadata_with_video)
|
||||
mock_log.assert_called_once_with("TimeoutException loading page for screenshot")
|
||||
assert len(metadata_with_video.media) == 1
|
||||
mock_log = mocker.patch("loguru.logger.info")
|
||||
screenshot_enricher.enrich(metadata_with_video)
|
||||
mock_log.assert_called_once_with("TimeoutException loading page for screenshot")
|
||||
assert len(metadata_with_video.media) == 1
|
||||
|
||||
|
||||
def test_handle_general_exception(
|
||||
screenshot_enricher, metadata_with_video, mock_selenium_env
|
||||
screenshot_enricher, metadata_with_video, mock_selenium_env, mocker
|
||||
):
|
||||
"""Test proper handling of unexpected general exceptions"""
|
||||
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
|
||||
|
@ -155,47 +149,43 @@ def test_handle_general_exception(
|
|||
mock_driver.get.return_value = None
|
||||
mock_driver.save_screenshot.side_effect = Exception("Unexpected Error")
|
||||
|
||||
with patch("loguru.logger.error") as mock_log:
|
||||
screenshot_enricher.enrich(metadata_with_video)
|
||||
# Verify that the exception was logged with the log
|
||||
mock_log.assert_called_once_with(
|
||||
"Got error while loading webdriver for screenshot enricher: Unexpected Error"
|
||||
)
|
||||
# And no new media was added due to the error
|
||||
assert len(metadata_with_video.media) == 1
|
||||
mock_log = mocker.patch("loguru.logger.error")
|
||||
screenshot_enricher.enrich(metadata_with_video)
|
||||
# Verify that the exception was logged with the log
|
||||
mock_log.assert_called_once_with(
|
||||
"Got error while loading webdriver for screenshot enricher: Unexpected Error"
|
||||
)
|
||||
# And no new media was added due to the error
|
||||
assert len(metadata_with_video.media) == 1
|
||||
|
||||
|
||||
def test_pdf_creation(screenshot_enricher, metadata_with_video, mock_selenium_env):
|
||||
def test_pdf_creation(mocker, screenshot_enricher, metadata_with_video, mock_selenium_env):
|
||||
"""Test PDF creation when save_to_pdf is enabled"""
|
||||
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
|
||||
|
||||
# Override the save_to_pdf option
|
||||
screenshot_enricher.save_to_pdf = True
|
||||
# Mock the print_page method to return base64-encoded content
|
||||
mock_driver.print_page.return_value = base64.b64encode(b"fake_pdf_content").decode(
|
||||
"utf-8"
|
||||
mock_driver.print_page.return_value = base64.b64encode(b"fake_pdf_content").decode("utf-8")
|
||||
# Patch functions with mocker
|
||||
mock_os_path_join = mocker.patch("os.path.join", side_effect=lambda *args: f"{args[-1]}")
|
||||
mock_random_str = mocker.patch(
|
||||
"auto_archiver.modules.screenshot_enricher.screenshot_enricher.random_str",
|
||||
return_value="fixed123",
|
||||
)
|
||||
with (
|
||||
patch("os.path.join", side_effect=lambda *args: f"{args[-1]}"),
|
||||
patch(
|
||||
"auto_archiver.modules.screenshot_enricher.screenshot_enricher.random_str",
|
||||
return_value="fixed123",
|
||||
),
|
||||
patch("builtins.open", new_callable=MagicMock()) as mock_open,
|
||||
patch("loguru.logger.error") as mock_log,
|
||||
):
|
||||
screenshot_enricher.enrich(metadata_with_video)
|
||||
mock_open = mocker.patch("builtins.open", new_callable=mocker.mock_open)
|
||||
mock_log_error = mocker.patch("loguru.logger.error")
|
||||
|
||||
# Verify screenshot and PDF creation
|
||||
mock_driver.save_screenshot.assert_called_once()
|
||||
mock_driver.print_page.assert_called_once_with(mock_driver.print_options)
|
||||
screenshot_enricher.enrich(metadata_with_video)
|
||||
# Verify screenshot and PDF creation
|
||||
mock_driver.save_screenshot.assert_called_once()
|
||||
mock_driver.print_page.assert_called_once_with(mock_driver.print_options)
|
||||
# Check that PDF file was opened and written
|
||||
mock_open.assert_any_call("pdf_fixed123.pdf", "wb")
|
||||
|
||||
# Check that PDF file was opened and written
|
||||
mock_open.assert_any_call("pdf_fixed123.pdf", "wb")
|
||||
# Ensure both screenshot and PDF were added as media
|
||||
assert len(metadata_with_video.media) == 3 # Original video + screenshot + PDF
|
||||
assert metadata_with_video.media[1].properties.get("id") == "screenshot"
|
||||
assert metadata_with_video.media[2].properties.get("id") == "pdf"
|
||||
# Ensure both screenshot and PDF were added as media
|
||||
assert len(metadata_with_video.media) == 3
|
||||
assert metadata_with_video.media[1].properties.get("id") == "screenshot"
|
||||
assert metadata_with_video.media[2].properties.get("id") == "pdf"
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
|
|
|
@ -1,6 +1,4 @@
|
|||
import ssl
|
||||
from unittest.mock import patch, mock_open
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata, Media
|
||||
|
@ -35,22 +33,22 @@ def test_empty_metadata(metadata, enricher):
|
|||
assert enricher.enrich(metadata) is None
|
||||
|
||||
|
||||
def test_ssl_enrich(metadata, enricher):
|
||||
with patch("ssl.get_server_certificate", return_value="TEST_CERT"), \
|
||||
patch("builtins.open", mock_open()) as mock_file:
|
||||
media_len_before = len(metadata.media)
|
||||
def test_ssl_enrich(metadata, enricher, mocker):
|
||||
mocker.patch("ssl.get_server_certificate", return_value="TEST_CERT")
|
||||
mock_file = mocker.patch("builtins.open", mocker.mock_open())
|
||||
media_len_before = len(metadata.media)
|
||||
enricher.enrich(metadata)
|
||||
|
||||
ssl.get_server_certificate.assert_called_once_with(("example.com", 443))
|
||||
mock_file.assert_called_once_with(f"{enricher.tmp_dir}/example-com.pem", "w")
|
||||
mock_file().write.assert_called_once_with("TEST_CERT")
|
||||
assert len(metadata.media) == media_len_before + 1
|
||||
# Ensure the certificate is added to metadata
|
||||
assert any(media.filename.endswith("example-com.pem") for media in metadata.media)
|
||||
|
||||
|
||||
def test_ssl_error_handling(enricher, metadata, mocker):
|
||||
mocker.patch("ssl.get_server_certificate", side_effect=ssl.SSLError("SSL error"))
|
||||
with pytest.raises(ssl.SSLError, match="SSL error"):
|
||||
enricher.enrich(metadata)
|
||||
|
||||
ssl.get_server_certificate.assert_called_once_with(("example.com", 443))
|
||||
mock_file.assert_called_once_with(f"{enricher.tmp_dir}/example-com.pem", "w")
|
||||
mock_file().write.assert_called_once_with("TEST_CERT")
|
||||
assert len(metadata.media) == media_len_before + 1
|
||||
# Ensure the certificate is added to metadata
|
||||
assert any(media.filename.endswith("example-com.pem") for media in metadata.media)
|
||||
|
||||
|
||||
def test_ssl_error_handling(enricher, metadata):
|
||||
with patch("ssl.get_server_certificate", side_effect=ssl.SSLError("SSL error")):
|
||||
with pytest.raises(ssl.SSLError, match="SSL error"):
|
||||
enricher.enrich(metadata)
|
||||
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.modules.thumbnail_enricher import ThumbnailEnricher
|
||||
|
||||
|
@ -22,32 +21,30 @@ def metadata_with_video():
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_ffmpeg_environment():
|
||||
def mock_ffmpeg_environment(mocker):
|
||||
# Mocking all the ffmpeg calls in one place
|
||||
with (
|
||||
patch("ffmpeg.input") as mock_ffmpeg_input,
|
||||
patch("os.makedirs") as mock_makedirs,
|
||||
patch.object(Media, "is_video", return_value=True),
|
||||
patch(
|
||||
"ffmpeg.probe",
|
||||
return_value={
|
||||
"streams": [
|
||||
{"codec_type": "video", "duration": "120"}
|
||||
] # Default 2-minute duration, but can override in tests
|
||||
},
|
||||
) as mock_probe,
|
||||
):
|
||||
mock_output = MagicMock()
|
||||
mock_ffmpeg_input.return_value.filter.return_value.output.return_value = (
|
||||
mock_output
|
||||
)
|
||||
mock_ffmpeg_input = mocker.patch("ffmpeg.input")
|
||||
mock_makedirs = mocker.patch("os.makedirs")
|
||||
mocker.patch.object(Media, "is_video", return_value=True),
|
||||
mock_probe = mocker.patch(
|
||||
"ffmpeg.probe",
|
||||
return_value={
|
||||
"streams": [
|
||||
{"codec_type": "video", "duration": "120"}
|
||||
] # Default 2-minute duration, but can override in tests
|
||||
},
|
||||
)
|
||||
mock_output = mocker.MagicMock()
|
||||
mock_ffmpeg_input.return_value.filter.return_value.output.return_value = (
|
||||
mock_output
|
||||
)
|
||||
|
||||
yield {
|
||||
"mock_ffmpeg_input": mock_ffmpeg_input,
|
||||
"mock_makedirs": mock_makedirs,
|
||||
"mock_output": mock_output,
|
||||
"mock_probe": mock_probe,
|
||||
}
|
||||
return {
|
||||
"mock_ffmpeg_input": mock_ffmpeg_input,
|
||||
"mock_makedirs": mock_makedirs,
|
||||
"mock_output": mock_output,
|
||||
"mock_probe": mock_probe,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize("thumbnails_per_minute, max_thumbnails, expected_count", [
|
||||
|
@ -68,28 +65,26 @@ def test_enrich_thumbnail_limits(
|
|||
thumbnails = metadata_with_video.media[0].get("thumbnails")
|
||||
assert len(thumbnails) == expected_count
|
||||
|
||||
def test_enrich_handles_probe_failure(thumbnail_enricher, metadata_with_video):
|
||||
with (
|
||||
patch("ffmpeg.probe", side_effect=Exception("Probe error")),
|
||||
patch("os.makedirs"),
|
||||
patch("loguru.logger.error") as mock_logger,
|
||||
patch.object(Media, "is_video", return_value=True),
|
||||
):
|
||||
thumbnail_enricher.enrich(metadata_with_video)
|
||||
# Ensure error was logged
|
||||
mock_logger.assert_called_with(
|
||||
f"error getting duration of video video.mp4: Probe error"
|
||||
)
|
||||
# Ensure no thumbnails were created
|
||||
thumbnails = metadata_with_video.media[0].get("thumbnails")
|
||||
assert thumbnails is None
|
||||
def test_enrich_handles_probe_failure(thumbnail_enricher, metadata_with_video, mocker):
|
||||
|
||||
mocker.patch("ffmpeg.probe", side_effect=Exception("Probe error"))
|
||||
mocker.patch("os.makedirs")
|
||||
mock_logger = mocker.patch("loguru.logger.error")
|
||||
mocker.patch.object(Media, "is_video", return_value=True)
|
||||
|
||||
thumbnail_enricher.enrich(metadata_with_video)
|
||||
# Ensure error was logged
|
||||
mock_logger.assert_called_with(
|
||||
f"error getting duration of video video.mp4: Probe error"
|
||||
)
|
||||
# Ensure no thumbnails were created
|
||||
thumbnails = metadata_with_video.media[0].get("thumbnails")
|
||||
assert thumbnails is None
|
||||
|
||||
|
||||
def test_enrich_skips_non_video_files(thumbnail_enricher, metadata_with_video):
|
||||
with (
|
||||
patch.object(Media, "is_video", return_value=False),
|
||||
patch("ffmpeg.input") as mock_ffmpeg,
|
||||
):
|
||||
def test_enrich_skips_non_video_files(thumbnail_enricher, metadata_with_video, mocker):
|
||||
mocker.patch.object(Media, "is_video", return_value=False)
|
||||
mock_ffmpeg = mocker.patch("ffmpeg.input")
|
||||
thumbnail_enricher.enrich(metadata_with_video)
|
||||
mock_ffmpeg.assert_not_called()
|
||||
|
||||
|
@ -102,21 +97,21 @@ def test_enrich_skips_non_video_files(thumbnail_enricher, metadata_with_video):
|
|||
(12, 20, 2), # test caught by t/min
|
||||
])
|
||||
def test_enrich_handles_short_video(
|
||||
thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment, thumbnails_per_minute, max_thumbnails, expected_count
|
||||
thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment, thumbnails_per_minute, max_thumbnails, expected_count, mocker
|
||||
):
|
||||
# override mock duration
|
||||
fake_duration = 10
|
||||
with patch(
|
||||
mocker.patch(
|
||||
"ffmpeg.probe",
|
||||
return_value={ "streams": [{"codec_type": "video", "duration": str(fake_duration)}]},
|
||||
):
|
||||
thumbnail_enricher.thumbnails_per_minute = thumbnails_per_minute
|
||||
thumbnail_enricher.max_thumbnails = max_thumbnails
|
||||
)
|
||||
thumbnail_enricher.thumbnails_per_minute = thumbnails_per_minute
|
||||
thumbnail_enricher.max_thumbnails = max_thumbnails
|
||||
|
||||
thumbnail_enricher.enrich(metadata_with_video)
|
||||
assert mock_ffmpeg_environment["mock_output"].run.call_count == expected_count
|
||||
thumbnails = metadata_with_video.media[0].get("thumbnails")
|
||||
assert len(thumbnails) == expected_count
|
||||
thumbnail_enricher.enrich(metadata_with_video)
|
||||
assert mock_ffmpeg_environment["mock_output"].run.call_count == expected_count
|
||||
thumbnails = metadata_with_video.media[0].get("thumbnails")
|
||||
assert len(thumbnails) == expected_count
|
||||
|
||||
|
||||
def test_uses_existing_duration(
|
||||
|
@ -128,28 +123,26 @@ def test_uses_existing_duration(
|
|||
assert mock_ffmpeg_environment["mock_output"].run.call_count == 4
|
||||
|
||||
|
||||
def test_enrich_metadata_structure(thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment):
|
||||
def test_enrich_metadata_structure(thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment, mocker):
|
||||
fake_duration = 120
|
||||
with patch("ffmpeg.probe", return_value={
|
||||
'streams': [{'codec_type': 'video', 'duration': str(fake_duration)}]
|
||||
}):
|
||||
thumbnail_enricher.thumbnails_per_minute = 2
|
||||
thumbnail_enricher.max_thumbnails = 4
|
||||
mocker.patch("ffmpeg.probe", return_value={'streams': [{'codec_type': 'video', 'duration': str(fake_duration)}]})
|
||||
thumbnail_enricher.thumbnails_per_minute = 2
|
||||
thumbnail_enricher.max_thumbnails = 4
|
||||
|
||||
thumbnail_enricher.enrich(metadata_with_video)
|
||||
thumbnail_enricher.enrich(metadata_with_video)
|
||||
|
||||
media_item = metadata_with_video.media[0]
|
||||
thumbnails = media_item.get("thumbnails")
|
||||
media_item = metadata_with_video.media[0]
|
||||
thumbnails = media_item.get("thumbnails")
|
||||
|
||||
# Assert normal metadata
|
||||
assert media_item.get("id") == "video1"
|
||||
assert media_item.get("duration") == fake_duration
|
||||
# Evenly spaced timestamps
|
||||
expected_timestamps = ["24.000s", "48.000s", "72.000s", "96.000s"]
|
||||
assert thumbnails is not None
|
||||
assert len(thumbnails) == 4
|
||||
# Assert normal metadata
|
||||
assert media_item.get("id") == "video1"
|
||||
assert media_item.get("duration") == fake_duration
|
||||
# Evenly spaced timestamps
|
||||
expected_timestamps = ["24.000s", "48.000s", "72.000s", "96.000s"]
|
||||
assert thumbnails is not None
|
||||
assert len(thumbnails) == 4
|
||||
|
||||
for index, thumbnail in enumerate(thumbnails):
|
||||
assert thumbnail.filename is not None
|
||||
assert thumbnail.properties.get("id") == f"thumbnail_{index}"
|
||||
assert thumbnail.properties.get("timestamp") == expected_timestamps[index]
|
||||
for index, thumbnail in enumerate(thumbnails):
|
||||
assert thumbnail.filename is not None
|
||||
assert thumbnail.properties.get("id") == f"thumbnail_{index}"
|
||||
assert thumbnail.properties.get("timestamp") == expected_timestamps[index]
|
||||
|
|
|
@ -1,18 +1,14 @@
|
|||
import shutil
|
||||
import sys
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.modules.s3_storage import S3Storage
|
||||
|
||||
from auto_archiver.modules.whisper_enricher import WhisperEnricher
|
||||
|
||||
|
||||
TEST_S3_URL = "http://cdn.example.com/test.mp4"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def enricher():
|
||||
def enricher(mocker):
|
||||
"""Fixture with mocked S3 and API dependencies"""
|
||||
config = {
|
||||
"api_endpoint": "http://testapi",
|
||||
|
@ -22,7 +18,7 @@ def enricher():
|
|||
"action": "translate",
|
||||
"steps": {"storages": ["s3_storage"]}
|
||||
}
|
||||
mock_s3 = MagicMock(spec=S3Storage)
|
||||
mock_s3 = mocker.MagicMock(spec=S3Storage)
|
||||
mock_s3.get_cdn_url.return_value = TEST_S3_URL
|
||||
instance = WhisperEnricher()
|
||||
instance.name = "whisper_enricher"
|
||||
|
@ -43,16 +39,16 @@ def metadata():
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_requests():
|
||||
with patch("auto_archiver.modules.whisper_enricher.whisper_enricher.requests") as mock_requests:
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 201
|
||||
mock_response.json.return_value = {"id": "job123"}
|
||||
mock_requests.post.return_value = mock_response
|
||||
yield mock_requests
|
||||
def mock_requests(mocker):
|
||||
mock_requests = mocker.patch("auto_archiver.modules.whisper_enricher.whisper_enricher.requests")
|
||||
mock_response = mocker.MagicMock()
|
||||
mock_response.status_code = 201
|
||||
mock_response.json.return_value = {"id": "job123"}
|
||||
mock_requests.post.return_value = mock_response
|
||||
yield mock_requests
|
||||
|
||||
|
||||
def test_successful_job_submission(enricher, metadata, mock_requests):
|
||||
def test_successful_job_submission(enricher, metadata, mock_requests, mocker):
|
||||
"""Test successful media processing with S3 configured"""
|
||||
whisper, mock_s3 = enricher
|
||||
# Configure mock S3 URL to match test expectation
|
||||
|
@ -65,13 +61,13 @@ def test_successful_job_submission(enricher, metadata, mock_requests):
|
|||
metadata.media = [m]
|
||||
|
||||
# Mock the complete API interaction chain
|
||||
mock_status_response = MagicMock()
|
||||
mock_status_response = mocker.MagicMock()
|
||||
mock_status_response.status_code = 200
|
||||
mock_status_response.json.return_value = {
|
||||
"status": "success",
|
||||
"meta": {}
|
||||
}
|
||||
mock_artifacts_response = MagicMock()
|
||||
mock_artifacts_response = mocker.MagicMock()
|
||||
mock_artifacts_response.status_code = 200
|
||||
mock_artifacts_response.json.return_value = [{
|
||||
"data": [{"start": 0, "end": 5, "text": "test transcript"}]
|
||||
|
@ -93,35 +89,39 @@ def test_successful_job_submission(enricher, metadata, mock_requests):
|
|||
# Verify job status checks
|
||||
assert mock_requests.get.call_count == 2
|
||||
assert "artifact_0_text" in metadata.media[0].get("whisper_model")
|
||||
assert metadata.media[0].get("whisper_model") == {'artifact_0_text': 'test transcript', 'job_artifacts_check': 'http://testapi/jobs/job123/artifacts', 'job_id': 'job123', 'job_status_check': 'http://testapi/jobs/job123'}
|
||||
assert metadata.media[0].get("whisper_model") == {'artifact_0_text': 'test transcript',
|
||||
'job_artifacts_check': 'http://testapi/jobs/job123/artifacts',
|
||||
'job_id': 'job123',
|
||||
'job_status_check': 'http://testapi/jobs/job123'}
|
||||
|
||||
|
||||
|
||||
def test_submit_job(enricher):
|
||||
def test_submit_job(enricher, mocker):
|
||||
"""Test job submission method"""
|
||||
whisper, _ = enricher
|
||||
m = Media("test.mp4")
|
||||
m.add_url(TEST_S3_URL)
|
||||
with patch("auto_archiver.modules.whisper_enricher.whisper_enricher.requests") as mock_requests:
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 201
|
||||
mock_response.json.return_value = {"id": "job123"}
|
||||
mock_requests.post.return_value = mock_response
|
||||
job_id = whisper.submit_job(m)
|
||||
mock_requests = mocker.patch("auto_archiver.modules.whisper_enricher.whisper_enricher.requests")
|
||||
mock_response = mocker.MagicMock()
|
||||
mock_response.status_code = 201
|
||||
mock_response.json.return_value = {"id": "job123"}
|
||||
mock_requests.post.return_value = mock_response
|
||||
job_id = whisper.submit_job(m)
|
||||
assert job_id == "job123"
|
||||
|
||||
def test_submit_raises_status(enricher):
|
||||
|
||||
def test_submit_raises_status(enricher, mocker):
|
||||
whisper, _ = enricher
|
||||
m = Media("test.mp4")
|
||||
m.add_url(TEST_S3_URL)
|
||||
with patch("auto_archiver.modules.whisper_enricher.whisper_enricher.requests") as mock_requests:
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 400
|
||||
mock_response.json.return_value = {"id": "job123"}
|
||||
mock_requests.post.return_value = mock_response
|
||||
with pytest.raises(AssertionError) as exc_info:
|
||||
whisper.submit_job(m)
|
||||
assert str(exc_info.value) == "calling the whisper api http://testapi returned a non-success code: 400"
|
||||
mock_requests = mocker.patch("auto_archiver.modules.whisper_enricher.whisper_enricher.requests")
|
||||
mock_response = mocker.MagicMock()
|
||||
mock_response.status_code = 400
|
||||
mock_response.json.return_value = {"id": "job123"}
|
||||
mock_requests.post.return_value = mock_response
|
||||
with pytest.raises(AssertionError) as exc_info:
|
||||
whisper.submit_job(m)
|
||||
assert str(exc_info.value) == "calling the whisper api http://testapi returned a non-success code: 400"
|
||||
|
||||
|
||||
# @pytest.mark.parametrize("test_url, status", ["http://cdn.example.com/test.mp4",])
|
||||
def test_submit_job_fails(enricher):
|
||||
|
@ -131,5 +131,3 @@ def test_submit_job_fails(enricher):
|
|||
m.add_url("http://cdn.wrongurl.com/test.mp4")
|
||||
with pytest.raises(AssertionError):
|
||||
whisper.submit_job(m)
|
||||
|
||||
|
||||
|
|
|
@ -1,15 +1,12 @@
|
|||
from datetime import datetime
|
||||
from typing import Type
|
||||
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from auto_archiver.core import Metadata
|
||||
from auto_archiver.modules.instagram_api_extractor.instagram_api_extractor import InstagramAPIExtractor
|
||||
from .test_extractor_base import TestExtractorBase
|
||||
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_user_response():
|
||||
return {
|
||||
|
@ -115,74 +112,74 @@ class TestInstagramAPIExtractor(TestExtractorBase):
|
|||
# test gets text (metadata title)
|
||||
pass
|
||||
|
||||
def test_download_profile_basic(self, metadata, mock_user_response):
|
||||
def test_download_profile_basic(self, metadata, mock_user_response, mocker):
|
||||
"""Test basic profile download without full_profile"""
|
||||
with patch.object(self.extractor, 'call_api') as mock_call, \
|
||||
patch.object(self.extractor, 'download_from_url') as mock_download:
|
||||
# Mock API responses
|
||||
mock_call.return_value = mock_user_response
|
||||
mock_download.return_value = "profile.jpg"
|
||||
mock_call = mocker.patch.object(self.extractor, 'call_api')
|
||||
mock_download = mocker.patch.object(self.extractor, 'download_from_url')
|
||||
# Mock API responses
|
||||
mock_call.return_value = mock_user_response
|
||||
mock_download.return_value = "profile.jpg"
|
||||
|
||||
result = self.extractor.download_profile(metadata, "test_user")
|
||||
assert result.status == "insta profile: success"
|
||||
assert result.get_title() == "Test User"
|
||||
assert result.get("data") == self.extractor.cleanup_dict(mock_user_response["user"])
|
||||
# Verify profile picture download
|
||||
mock_call.assert_called_once_with("v2/user/by/username", {"username": "test_user"})
|
||||
mock_download.assert_called_once_with("http://example.com/profile.jpg")
|
||||
assert len(result.media) == 1
|
||||
assert result.media[0].filename == "profile.jpg"
|
||||
result = self.extractor.download_profile(metadata, "test_user")
|
||||
assert result.status == "insta profile: success"
|
||||
assert result.get_title() == "Test User"
|
||||
assert result.get("data") == self.extractor.cleanup_dict(mock_user_response["user"])
|
||||
# Verify profile picture download
|
||||
mock_call.assert_called_once_with("v2/user/by/username", {"username": "test_user"})
|
||||
mock_download.assert_called_once_with("http://example.com/profile.jpg")
|
||||
assert len(result.media) == 1
|
||||
assert result.media[0].filename == "profile.jpg"
|
||||
|
||||
def test_download_profile_full(self, metadata, mock_user_response, mock_story_response):
|
||||
def test_download_profile_full(self, metadata, mock_user_response, mock_story_response, mocker):
|
||||
"""Test full profile download with stories/posts"""
|
||||
with patch.object(self.extractor, 'call_api') as mock_call, \
|
||||
patch.object(self.extractor, 'download_all_posts') as mock_posts, \
|
||||
patch.object(self.extractor, 'download_all_highlights') as mock_highlights, \
|
||||
patch.object(self.extractor, 'download_all_tagged') as mock_tagged, \
|
||||
patch.object(self.extractor, '_download_stories_reusable') as mock_stories:
|
||||
mock_call = mocker.patch.object(self.extractor, 'call_api')
|
||||
mock_posts = mocker.patch.object(self.extractor, 'download_all_posts')
|
||||
mock_highlights = mocker.patch.object(self.extractor, 'download_all_highlights')
|
||||
mock_tagged = mocker.patch.object(self.extractor, 'download_all_tagged')
|
||||
mock_stories = mocker.patch.object(self.extractor, '_download_stories_reusable')
|
||||
|
||||
self.extractor.full_profile = True
|
||||
mock_call.side_effect = [
|
||||
mock_user_response,
|
||||
mock_story_response
|
||||
]
|
||||
mock_highlights.return_value = None
|
||||
mock_stories.return_value = mock_story_response
|
||||
mock_posts.return_value = None
|
||||
mock_tagged.return_value = None
|
||||
self.extractor.full_profile = True
|
||||
mock_call.side_effect = [
|
||||
mock_user_response,
|
||||
mock_story_response
|
||||
]
|
||||
mock_highlights.return_value = None
|
||||
mock_stories.return_value = mock_story_response
|
||||
mock_posts.return_value = None
|
||||
mock_tagged.return_value = None
|
||||
|
||||
result = self.extractor.download_profile(metadata, "test_user")
|
||||
assert result.get("#stories") == len(mock_story_response)
|
||||
mock_posts.assert_called_once_with(result, "123")
|
||||
assert "errors" not in result.metadata
|
||||
result = self.extractor.download_profile(metadata, "test_user")
|
||||
assert result.get("#stories") == len(mock_story_response)
|
||||
mock_posts.assert_called_once_with(result, "123")
|
||||
assert "errors" not in result.metadata
|
||||
|
||||
def test_download_profile_not_found(self, metadata):
|
||||
def test_download_profile_not_found(self, metadata, mocker):
|
||||
"""Test profile not found error"""
|
||||
with patch.object(self.extractor, 'call_api') as mock_call:
|
||||
mock_call.return_value = {"user": None}
|
||||
with pytest.raises(AssertionError) as exc_info:
|
||||
self.extractor.download_profile(metadata, "invalid_user")
|
||||
assert "User invalid_user not found" in str(exc_info.value)
|
||||
mock_call = mocker.patch.object(self.extractor, 'call_api')
|
||||
mock_call.return_value = {"user": None}
|
||||
with pytest.raises(AssertionError) as exc_info:
|
||||
self.extractor.download_profile(metadata, "invalid_user")
|
||||
assert "User invalid_user not found" in str(exc_info.value)
|
||||
|
||||
def test_download_profile_error_handling(self, metadata, mock_user_response):
|
||||
def test_download_profile_error_handling(self, metadata, mock_user_response, mocker):
|
||||
"""Test error handling in full profile mode"""
|
||||
with (patch.object(self.extractor, 'call_api') as mock_call, \
|
||||
patch.object(self.extractor, 'download_all_highlights') as mock_highlights, \
|
||||
patch.object(self.extractor, 'download_all_tagged') as mock_tagged, \
|
||||
patch.object(self.extractor, '_download_stories_reusable') as stories_tagged, \
|
||||
patch.object(self.extractor, 'download_all_posts') as mock_posts
|
||||
):
|
||||
self.extractor.full_profile = True
|
||||
mock_call.side_effect = [
|
||||
mock_user_response,
|
||||
Exception("Stories API failed"),
|
||||
Exception("Posts API failed")
|
||||
]
|
||||
mock_highlights.return_value = None
|
||||
mock_tagged.return_value = None
|
||||
stories_tagged.return_value = None
|
||||
mock_posts.return_value = None
|
||||
result = self.extractor.download_profile(metadata, "test_user")
|
||||
mock_call = mocker.patch.object(self.extractor, 'call_api')
|
||||
mock_highlights = mocker.patch.object(self.extractor, 'download_all_highlights')
|
||||
mock_tagged = mocker.patch.object(self.extractor, 'download_all_tagged')
|
||||
stories_tagged = mocker.patch.object(self.extractor, '_download_stories_reusable')
|
||||
mock_posts = mocker.patch.object(self.extractor, 'download_all_posts')
|
||||
|
||||
assert result.is_success()
|
||||
assert "Error downloading stories for test_user" in result.metadata["errors"]
|
||||
self.extractor.full_profile = True
|
||||
mock_call.side_effect = [
|
||||
mock_user_response,
|
||||
Exception("Stories API failed"),
|
||||
Exception("Posts API failed")
|
||||
]
|
||||
mock_highlights.return_value = None
|
||||
mock_tagged.return_value = None
|
||||
stories_tagged.return_value = None
|
||||
mock_posts.return_value = None
|
||||
result = self.extractor.download_profile(metadata, "test_user")
|
||||
|
||||
assert result.is_success()
|
||||
assert "Error downloading stories for test_user" in result.metadata["errors"]
|
|
@ -1,5 +1,4 @@
|
|||
import os
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
|
@ -11,16 +10,10 @@ TESTFILES = os.path.join(os.path.dirname(__file__), "testfiles")
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def patch_extractor_methods(request, setup_module):
|
||||
with patch.object(InstagramTbotExtractor, '_prepare_session_file', return_value=None), \
|
||||
patch.object(InstagramTbotExtractor, '_initialize_telegram_client', return_value=None):
|
||||
yield
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_sleep():
|
||||
"""Globally mock time.sleep to avoid delays."""
|
||||
with patch("time.sleep") as mock_sleep:
|
||||
yield mock_sleep
|
||||
def patch_extractor_methods(request, setup_module, mocker):
|
||||
mocker.patch.object(InstagramTbotExtractor, '_prepare_session_file', return_value=None)
|
||||
mocker.patch.object(InstagramTbotExtractor, '_initialize_telegram_client', return_value=None)
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -33,16 +26,16 @@ def metadata_sample():
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_telegram_client():
|
||||
def mock_telegram_client(mocker):
|
||||
"""Fixture to mock TelegramClient interactions."""
|
||||
with patch("auto_archiver.modules.instagram_tbot_extractor.client") as mock_client:
|
||||
instance = MagicMock()
|
||||
mock_client.return_value = instance
|
||||
yield instance
|
||||
mock_client = mocker.patch("auto_archiver.modules.instagram_tbot_extractor.client")
|
||||
instance = mocker.MagicMock()
|
||||
mock_client.return_value = instance
|
||||
return instance
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def extractor(setup_module, patch_extractor_methods):
|
||||
def extractor(setup_module, patch_extractor_methods, mocker):
|
||||
extractor_module = "instagram_tbot_extractor"
|
||||
config = {
|
||||
"api_id": 12345,
|
||||
|
@ -51,7 +44,7 @@ def extractor(setup_module, patch_extractor_methods):
|
|||
"timeout": 4
|
||||
}
|
||||
extractor = setup_module(extractor_module, config)
|
||||
extractor.client = MagicMock()
|
||||
extractor.client = mocker.MagicMock()
|
||||
extractor.session_file = "test_session"
|
||||
return extractor
|
||||
|
||||
|
@ -60,20 +53,20 @@ def test_non_instagram_url(extractor, metadata_sample):
|
|||
metadata_sample.set_url("https://www.youtube.com")
|
||||
assert extractor.download(metadata_sample) is False
|
||||
|
||||
def test_download_success(extractor, metadata_sample):
|
||||
with patch.object(extractor, "_send_url_to_bot", return_value=(MagicMock(), 101)), \
|
||||
patch.object(extractor, "_process_messages", return_value="Sample Instagram post caption"):
|
||||
result = extractor.download(metadata_sample)
|
||||
|
||||
def test_download_success(extractor, metadata_sample, mocker):
|
||||
mocker.patch.object(extractor, "_send_url_to_bot", return_value=(mocker.MagicMock(), 101))
|
||||
mocker.patch.object(extractor, "_process_messages", return_value="Sample Instagram post caption")
|
||||
result = extractor.download(metadata_sample)
|
||||
assert result.is_success()
|
||||
assert result.status == "insta-via-bot: success"
|
||||
assert result.metadata.get("title") == "Sample Instagram post caption"
|
||||
|
||||
|
||||
def test_download_invalid(extractor, metadata_sample):
|
||||
with patch.object(extractor, "_send_url_to_bot", return_value=(MagicMock(), 101)), \
|
||||
patch.object(extractor, "_process_messages", return_value="You must enter a URL to a post"):
|
||||
assert extractor.download(metadata_sample) is False
|
||||
|
||||
def test_download_invalid(extractor, metadata_sample, mocker):
|
||||
mocker.patch.object(extractor, "_send_url_to_bot", return_value=(mocker.MagicMock(), 101))
|
||||
mocker.patch.object(extractor, "_process_messages", return_value="You must enter a URL to a post")
|
||||
assert extractor.download(metadata_sample) is False
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Requires authentication.")
|
||||
|
@ -89,8 +82,12 @@ class TestInstagramTbotExtractorReal(TestExtractorBase):
|
|||
}
|
||||
|
||||
@pytest.mark.parametrize("url, expected_status, message, len_media", [
|
||||
("https://www.instagram.com/p/C4QgLbrIKXG", "insta-via-bot: success", "Are you new to Bellingcat? - The way we share our investigations is different. 💭\nWe want you to read our story but also learn ou", 6),
|
||||
("https://www.instagram.com/reel/DEVLK8qoIbg/", "insta-via-bot: success", "Our volunteer community is at the centre of many incredible Bellingcat investigations and tools. Stephanie Ladel is one such vol", 3),
|
||||
("https://www.instagram.com/p/C4QgLbrIKXG", "insta-via-bot: success",
|
||||
"Are you new to Bellingcat? - The way we share our investigations is different. 💭\nWe want you to read our story but also learn ou",
|
||||
6),
|
||||
("https://www.instagram.com/reel/DEVLK8qoIbg/", "insta-via-bot: success",
|
||||
"Our volunteer community is at the centre of many incredible Bellingcat investigations and tools. Stephanie Ladel is one such vol",
|
||||
3),
|
||||
# instagram tbot not working (potentially intermittently?) for stories - replace with a live story to retest
|
||||
# ("https://www.instagram.com/stories/bellingcatofficial/3556336382743057476/", False, "Media not found or unavailable"),
|
||||
# Seems to be working intermittently for highlights
|
||||
|
|
|
@ -2,27 +2,23 @@ from typing import Type
|
|||
|
||||
import gspread
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
from auto_archiver.modules.gsheet_feeder import GsheetsFeeder
|
||||
from auto_archiver.core import Metadata, Feeder
|
||||
|
||||
|
||||
def test_setup_without_sheet_and_sheet_id(setup_module):
|
||||
def test_setup_without_sheet_and_sheet_id(setup_module, mocker):
|
||||
# Ensure setup() raises AssertionError if neither sheet nor sheet_id is set.
|
||||
with patch("gspread.service_account"):
|
||||
with pytest.raises(AssertionError):
|
||||
setup_module(
|
||||
"gsheet_feeder",
|
||||
{"service_account": "dummy.json", "sheet": None, "sheet_id": None},
|
||||
)
|
||||
mocker.patch("gspread.service_account")
|
||||
with pytest.raises(AssertionError):
|
||||
setup_module(
|
||||
"gsheet_feeder",
|
||||
{"service_account": "dummy.json", "sheet": None, "sheet_id": None},
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def gsheet_feeder(setup_module) -> GsheetsFeeder:
|
||||
with patch("gspread.service_account"):
|
||||
feeder = setup_module(
|
||||
"gsheet_feeder",
|
||||
{
|
||||
def gsheet_feeder(setup_module, mocker) -> GsheetsFeeder:
|
||||
config: dict = {
|
||||
"service_account": "dummy.json",
|
||||
"sheet": "test-auto-archiver",
|
||||
"sheet_id": None,
|
||||
|
@ -46,9 +42,13 @@ def gsheet_feeder(setup_module) -> GsheetsFeeder:
|
|||
"allow_worksheets": set(),
|
||||
"block_worksheets": set(),
|
||||
"use_sheet_names_in_stored_paths": True,
|
||||
},
|
||||
)
|
||||
feeder.gsheets_client = MagicMock()
|
||||
}
|
||||
mocker.patch("gspread.service_account")
|
||||
feeder = setup_module(
|
||||
"gsheet_feeder",
|
||||
config
|
||||
)
|
||||
feeder.gsheets_client = mocker.MagicMock()
|
||||
return feeder
|
||||
|
||||
|
||||
|
@ -129,56 +129,56 @@ def test__set_metadata_with_folder(gsheet_feeder: GsheetsFeeder):
|
|||
],
|
||||
)
|
||||
def test_open_sheet_with_name_or_id(
|
||||
setup_module, sheet, sheet_id, expected_method, expected_arg, description
|
||||
setup_module, sheet, sheet_id, expected_method, expected_arg, description, mocker
|
||||
):
|
||||
"""Ensure open_sheet() correctly opens by name or ID based on configuration."""
|
||||
with patch("gspread.service_account") as mock_service_account:
|
||||
mock_client = MagicMock()
|
||||
mock_service_account.return_value = mock_client
|
||||
mock_client.open.return_value = "MockSheet"
|
||||
mock_client.open_by_key.return_value = "MockSheet"
|
||||
mock_service_account = mocker.patch("gspread.service_account")
|
||||
mock_client = mocker.MagicMock()
|
||||
mock_service_account.return_value = mock_client
|
||||
mock_client.open.return_value = "MockSheet"
|
||||
mock_client.open_by_key.return_value = "MockSheet"
|
||||
|
||||
# Setup module with parameterized values
|
||||
feeder = setup_module(
|
||||
"gsheet_feeder",
|
||||
{"service_account": "dummy.json", "sheet": sheet, "sheet_id": sheet_id},
|
||||
)
|
||||
sheet_result = feeder.open_sheet()
|
||||
# Validate the correct method was called
|
||||
getattr(mock_client, expected_method).assert_called_once_with(
|
||||
expected_arg
|
||||
), f"Failed: {description}"
|
||||
assert sheet_result == "MockSheet", f"Failed: {description}"
|
||||
# Setup module with parameterized values
|
||||
feeder = setup_module(
|
||||
"gsheet_feeder",
|
||||
{"service_account": "dummy.json", "sheet": sheet, "sheet_id": sheet_id},
|
||||
)
|
||||
sheet_result = feeder.open_sheet()
|
||||
# Validate the correct method was called
|
||||
getattr(mock_client, expected_method).assert_called_once_with(
|
||||
expected_arg
|
||||
), f"Failed: {description}"
|
||||
assert sheet_result == "MockSheet", f"Failed: {description}"
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_module")
|
||||
def test_open_sheet_with_sheet_id(setup_module):
|
||||
def test_open_sheet_with_sheet_id(setup_module, mocker):
|
||||
"""Ensure open_sheet() correctly opens a sheet by ID."""
|
||||
with patch("gspread.service_account") as mock_service_account:
|
||||
mock_client = MagicMock()
|
||||
mock_service_account.return_value = mock_client
|
||||
mock_client.open_by_key.return_value = "MockSheet"
|
||||
feeder = setup_module(
|
||||
"gsheet_feeder",
|
||||
{"service_account": "dummy.json", "sheet": None, "sheet_id": "ABC123"},
|
||||
)
|
||||
sheet = feeder.open_sheet()
|
||||
mock_client.open_by_key.assert_called_once_with("ABC123")
|
||||
assert sheet == "MockSheet"
|
||||
mock_service_account = mocker.patch("gspread.service_account")
|
||||
mock_client = mocker.MagicMock()
|
||||
mock_service_account.return_value = mock_client
|
||||
mock_client.open_by_key.return_value = "MockSheet"
|
||||
feeder = setup_module(
|
||||
"gsheet_feeder",
|
||||
{"service_account": "dummy.json", "sheet": None, "sheet_id": "ABC123"},
|
||||
)
|
||||
sheet = feeder.open_sheet()
|
||||
mock_client.open_by_key.assert_called_once_with("ABC123")
|
||||
assert sheet == "MockSheet"
|
||||
|
||||
|
||||
def test_should_process_sheet(setup_module):
|
||||
with patch("gspread.service_account"):
|
||||
gdb = setup_module(
|
||||
"gsheet_feeder",
|
||||
{
|
||||
"service_account": "dummy.json",
|
||||
"sheet": "TestSheet",
|
||||
"sheet_id": None,
|
||||
"allow_worksheets": {"TestSheet", "Sheet2"},
|
||||
"block_worksheets": {"Sheet3"},
|
||||
},
|
||||
)
|
||||
def test_should_process_sheet(setup_module, mocker):
|
||||
mocker.patch("gspread.service_account")
|
||||
gdb = setup_module(
|
||||
"gsheet_feeder",
|
||||
{
|
||||
"service_account": "dummy.json",
|
||||
"sheet": "TestSheet",
|
||||
"sheet_id": None,
|
||||
"allow_worksheets": {"TestSheet", "Sheet2"},
|
||||
"block_worksheets": {"Sheet3"},
|
||||
},
|
||||
)
|
||||
assert gdb.should_process_sheet("TestSheet") == True
|
||||
assert gdb.should_process_sheet("Sheet3") == False
|
||||
# False if allow_worksheets is set
|
||||
|
|
|
@ -1,14 +1,13 @@
|
|||
# Note this isn't a feeder, but contained as utility of the gsheet feeder module
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from auto_archiver.modules.gsheet_feeder import GWorksheet
|
||||
|
||||
|
||||
class TestGWorksheet:
|
||||
@pytest.fixture
|
||||
def mock_worksheet(self):
|
||||
mock_ws = MagicMock()
|
||||
def mock_worksheet(self, mocker):
|
||||
mock_ws = mocker.MagicMock()
|
||||
mock_ws.get_values.return_value = [
|
||||
["Link", "Archive Status", "Archive Location", "Archive Date"],
|
||||
["url1", "archived", "filepath1", "2023-01-01"],
|
||||
|
@ -137,8 +136,8 @@ class TestGWorksheet:
|
|||
assert gworksheet.to_a1(row, col) == expected
|
||||
|
||||
# Test empty worksheet
|
||||
def test_empty_worksheet_initialization(self):
|
||||
mock_ws = MagicMock()
|
||||
def test_empty_worksheet_initialization(self, mocker):
|
||||
mock_ws = mocker.MagicMock()
|
||||
mock_ws.get_values.return_value = []
|
||||
g = GWorksheet(mock_ws)
|
||||
assert g.headers == []
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
from typing import Type
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from auto_archiver.core import Media
|
||||
from auto_archiver.modules.s3_storage import S3Storage
|
||||
|
||||
|
@ -11,7 +10,6 @@ class TestS3Storage:
|
|||
"""
|
||||
module_name: str = "s3_storage"
|
||||
storage: Type[S3Storage]
|
||||
s3: MagicMock
|
||||
config: dict = {
|
||||
"path_generator": "flat",
|
||||
"filename_generator": "static",
|
||||
|
@ -25,13 +23,14 @@ class TestS3Storage:
|
|||
"private": False,
|
||||
}
|
||||
|
||||
@patch('boto3.client')
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_storage(self, setup_module):
|
||||
def setup_storage(self, setup_module, mocker):
|
||||
self.s3 = S3Storage()
|
||||
self.storage = setup_module(self.module_name, self.config)
|
||||
|
||||
def test_client_initialization(self):
|
||||
"""Test that S3 client is initialized with correct parameters"""
|
||||
|
||||
assert self.storage.s3 is not None
|
||||
assert self.storage.s3.meta.region_name == 'test-region'
|
||||
|
||||
|
@ -44,81 +43,63 @@ class TestS3Storage:
|
|||
media.key = "another/path.jpg"
|
||||
assert self.storage.get_cdn_url(media) == "https://cdn.example.com/another/path.jpg"
|
||||
|
||||
def test_uploadf_sets_acl_public(self):
|
||||
def test_uploadf_sets_acl_public(self, mocker):
|
||||
media = Media("test.txt")
|
||||
mock_file = MagicMock()
|
||||
with patch.object(self.storage.s3, 'upload_fileobj') as mock_s3_upload, \
|
||||
patch.object(self.storage, 'is_upload_needed', return_value=True):
|
||||
self.storage.uploadf(mock_file, media)
|
||||
mock_s3_upload.assert_called_once_with(
|
||||
mock_file,
|
||||
Bucket='test-bucket',
|
||||
Key=media.key,
|
||||
ExtraArgs={'ACL': 'public-read', 'ContentType': 'text/plain'}
|
||||
)
|
||||
mock_file = mocker.MagicMock()
|
||||
mock_s3_upload = mocker.patch.object(self.storage.s3, 'upload_fileobj')
|
||||
mocker.patch.object(self.storage, 'is_upload_needed', return_value=True)
|
||||
self.storage.uploadf(mock_file, media)
|
||||
mock_s3_upload.assert_called_once_with(
|
||||
mock_file,
|
||||
Bucket='test-bucket',
|
||||
Key=media.key,
|
||||
ExtraArgs={'ACL': 'public-read', 'ContentType': 'text/plain'}
|
||||
)
|
||||
|
||||
def test_upload_decision_logic(self):
|
||||
def test_upload_decision_logic(self, mocker):
|
||||
"""Test is_upload_needed under different conditions"""
|
||||
media = Media("test.txt")
|
||||
# Test default state (random_no_duplicate=False)
|
||||
assert self.storage.is_upload_needed(media) is True
|
||||
# Set duplicate checking config to true:
|
||||
|
||||
self.storage.random_no_duplicate = True
|
||||
with patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash') as mock_calc_hash, \
|
||||
patch.object(self.storage, 'file_in_folder') as mock_file_in_folder:
|
||||
mock_calc_hash.return_value = 'beepboop123beepboop123beepboop123'
|
||||
mock_file_in_folder.return_value = 'existing_key.txt'
|
||||
# Test duplicate result
|
||||
assert self.storage.is_upload_needed(media) is False
|
||||
assert media.key == 'existing_key.txt'
|
||||
mock_file_in_folder.assert_called_with(
|
||||
# (first 24 chars of hash)
|
||||
'no-dups/beepboop123beepboop123be'
|
||||
)
|
||||
mock_calc_hash = mocker.patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash', return_value='beepboop123beepboop123beepboop123')
|
||||
mock_file_in_folder = mocker.patch.object(self.storage, 'file_in_folder', return_value='existing_key.txt')
|
||||
assert self.storage.is_upload_needed(media) is False
|
||||
assert media.key == 'existing_key.txt'
|
||||
mock_file_in_folder.assert_called_with('no-dups/beepboop123beepboop123be')
|
||||
|
||||
|
||||
@patch.object(S3Storage, 'file_in_folder')
|
||||
def test_skips_upload_when_duplicate_exists(self, mock_file_in_folder):
|
||||
def test_skips_upload_when_duplicate_exists(self, mocker):
|
||||
"""Test that upload skips when file_in_folder finds existing object"""
|
||||
self.storage.random_no_duplicate = True
|
||||
mock_file_in_folder.return_value = "existing_folder/existing_file.txt"
|
||||
# Create test media with calculated hash
|
||||
mock_file_in_folder = mocker.patch.object(S3Storage, 'file_in_folder', return_value="existing_folder/existing_file.txt")
|
||||
media = Media("test.txt")
|
||||
media.key = "original_path.txt"
|
||||
with patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash') as mock_calculate_hash:
|
||||
mock_calculate_hash.return_value = "beepboop123beepboop123beepboop123"
|
||||
# Verify upload
|
||||
assert self.storage.is_upload_needed(media) is False
|
||||
assert media.key == "existing_folder/existing_file.txt"
|
||||
assert media.get("previously archived") is True
|
||||
with patch.object(self.storage.s3, 'upload_fileobj') as mock_upload:
|
||||
result = self.storage.uploadf(None, media)
|
||||
mock_upload.assert_not_called()
|
||||
assert result is True
|
||||
mock_calculate_hash = mocker.patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash', return_value="beepboop123beepboop123beepboop123")
|
||||
assert self.storage.is_upload_needed(media) is False
|
||||
assert media.key == "existing_folder/existing_file.txt"
|
||||
assert media.get("previously archived") is True
|
||||
mock_upload = mocker.patch.object(self.storage.s3, 'upload_fileobj')
|
||||
result = self.storage.uploadf(None, media)
|
||||
mock_upload.assert_not_called()
|
||||
assert result is True
|
||||
|
||||
@patch.object(S3Storage, 'is_upload_needed')
|
||||
def test_uploads_with_correct_parameters(self, mock_upload_needed):
|
||||
def test_uploads_with_correct_parameters(self, mocker):
|
||||
media = Media("test.txt")
|
||||
media.key = "original_key.txt"
|
||||
mock_upload_needed.return_value = True
|
||||
mocker.patch.object(S3Storage, 'is_upload_needed', return_value=True)
|
||||
media.mimetype = 'image/png'
|
||||
mock_file = MagicMock()
|
||||
mock_file = mocker.MagicMock()
|
||||
mock_upload = mocker.patch.object(self.storage.s3, 'upload_fileobj')
|
||||
self.storage.uploadf(mock_file, media)
|
||||
mock_upload.assert_called_once_with(
|
||||
mock_file,
|
||||
Bucket='test-bucket',
|
||||
Key='original_key.txt',
|
||||
ExtraArgs={
|
||||
'ACL': 'public-read',
|
||||
'ContentType': 'image/png'
|
||||
}
|
||||
)
|
||||
|
||||
with patch.object(self.storage.s3, 'upload_fileobj') as mock_upload:
|
||||
self.storage.uploadf(mock_file, media)
|
||||
# verify call occured with these params
|
||||
mock_upload.assert_called_once_with(
|
||||
mock_file,
|
||||
Bucket='test-bucket',
|
||||
Key='original_key.txt',
|
||||
ExtraArgs={
|
||||
'ACL': 'public-read',
|
||||
'ContentType': 'image/png'
|
||||
}
|
||||
)
|
||||
|
||||
def test_file_in_folder_exists(self):
|
||||
with patch.object(self.storage.s3, 'list_objects') as mock_list_objects:
|
||||
mock_list_objects.return_value = {'Contents': [{'Key': 'path/to/file.txt'}]}
|
||||
assert self.storage.file_in_folder('path/to/') == 'path/to/file.txt'
|
||||
def test_file_in_folder_exists(self, mocker):
|
||||
mock_list_objects = mocker.patch.object(self.storage.s3, 'list_objects', return_value={'Contents': [{'Key': 'path/to/file.txt'}]})
|
||||
assert self.storage.file_in_folder('path/to/') == 'path/to/file.txt'
|
||||
|
|
|
@ -1,44 +1,57 @@
|
|||
from typing import Type
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from oauth2client import service_account
|
||||
|
||||
from auto_archiver.core import Media
|
||||
from auto_archiver.modules.gdrive_storage import GDriveStorage
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
from tests.storages.test_storage_base import TestStorageBase
|
||||
|
||||
|
||||
class TestGDriveStorage:
|
||||
"""
|
||||
Test suite for GDriveStorage.
|
||||
"""
|
||||
|
||||
@pytest.fixture
|
||||
def gdrive_storage(setup_module, mocker):
|
||||
module_name: str = "gdrive_storage"
|
||||
storage: Type[GDriveStorage]
|
||||
storage: GDriveStorage
|
||||
config: dict = {'path_generator': 'url',
|
||||
'filename_generator': 'static',
|
||||
'root_folder_id': "fake_root_folder_id",
|
||||
'oauth_token': None,
|
||||
'service_account': 'fake_service_account.json'
|
||||
}
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def gdrive(self, setup_module):
|
||||
with patch('google.oauth2.service_account.Credentials.from_service_account_file') as mock_creds:
|
||||
self.storage = setup_module(self.module_name, self.config)
|
||||
|
||||
def test_initialize_fails_with_non_existent_creds(self):
|
||||
"""
|
||||
Test that the Google Drive service raises a FileNotFoundError when the service account file does not exist.
|
||||
"""
|
||||
# Act and Assert
|
||||
with pytest.raises(FileNotFoundError) as exc_info:
|
||||
self.storage.setup()
|
||||
assert "No such file or directory" in str(exc_info.value)
|
||||
mocker.patch('google.oauth2.service_account.Credentials.from_service_account_file')
|
||||
return setup_module(module_name, config)
|
||||
|
||||
|
||||
def test_path_parts(self):
|
||||
media = Media(filename="test.jpg")
|
||||
media.key = "folder1/folder2/test.jpg"
|
||||
def test_initialize_fails_with_non_existent_creds(setup_module):
|
||||
"""Test that the Google Drive service raises a FileNotFoundError when the service account file does not exist.
|
||||
(and isn't mocked)
|
||||
"""
|
||||
config: dict = {'path_generator': 'url',
|
||||
'filename_generator': 'static',
|
||||
'root_folder_id': "fake_root_folder_id",
|
||||
'oauth_token': None,
|
||||
'service_account': 'fake_service_account.json'
|
||||
}
|
||||
with pytest.raises(FileNotFoundError) as exc_info:
|
||||
setup_module("gdrive_storage", config)
|
||||
assert "No such file or directory" in str(exc_info.value)
|
||||
|
||||
|
||||
def test_get_id_from_parent_and_name(gdrive_storage, mocker):
|
||||
"""Test _get_id_from_parent_and_name returns correct id from an API result."""
|
||||
fake_list = mocker.MagicMock()
|
||||
fake_list.execute.return_value = {"files": [{"id": "123", "name": "testname"}]}
|
||||
fake_service = mocker.MagicMock()
|
||||
# mock the files.list return value
|
||||
fake_service.files.return_value.list.return_value = fake_list
|
||||
gdrive_storage.service = fake_service
|
||||
result = gdrive_storage._get_id_from_parent_and_name("parent", "mock", retries=1, use_mime_type=False)
|
||||
assert result == "123"
|
||||
|
||||
def test_path_parts():
|
||||
media = Media(filename="test.jpg")
|
||||
media.key = "folder1/folder2/test.jpg"
|
||||
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Requires real credentials")
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
import hashlib
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
@ -44,20 +43,19 @@ class TestURLExpansion:
|
|||
("https://example.com", "https://example.com"),
|
||||
("https://t.co/test", "https://expanded.url")
|
||||
])
|
||||
def test_expand_url(self, input_url, expected):
|
||||
mock_response = Mock()
|
||||
def test_expand_url(self, input_url, expected, mocker):
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.url = "https://expanded.url"
|
||||
with patch('requests.get', return_value=mock_response):
|
||||
mocker.patch('requests.get', return_value=mock_response)
|
||||
result = expand_url(input_url)
|
||||
assert result == expected
|
||||
|
||||
result = expand_url(input_url)
|
||||
assert result == expected
|
||||
|
||||
def test_expand_url_handles_errors(self, caplog):
|
||||
with patch('requests.get', side_effect=Exception("Connection error")):
|
||||
url = "https://t.co/error"
|
||||
result = expand_url(url)
|
||||
assert result == url
|
||||
assert f"Failed to expand url {url}" in caplog.text
|
||||
def test_expand_url_handles_errors(self, caplog, mocker):
|
||||
mocker.patch('requests.get', side_effect=Exception("Connection error"))
|
||||
url = "https://t.co/error"
|
||||
result = expand_url(url)
|
||||
assert result == url
|
||||
assert f"Failed to expand url {url}" in caplog.text
|
||||
|
||||
class TestAttributeHandling:
|
||||
class Sample:
|
||||
|
|
Ładowanie…
Reference in New Issue