Updated test, test metadata

pull/189/head
erinhmclark 2025-02-06 10:11:56 +00:00
rodzic 52542812dc
commit 5b0bad832f
5 zmienionych plików z 284 dodań i 36 usunięć

Wyświetl plik

@ -104,7 +104,6 @@ class GsheetsDb(Database):
if gsheet := item.get_context("gsheet"):
gw: GWorksheet = gsheet.get("worksheet")
row: int = gsheet.get("row")
# todo doesn't exist, should be passed from
elif self.sheet_id:
print(self.sheet_id)

Wyświetl plik

@ -37,41 +37,48 @@ class GsheetsFeeder(Feeder):
def __iter__(self) -> Metadata:
sh = self.open_sheet()
for ii, wks in enumerate(sh.worksheets()):
if not self.should_process_sheet(wks.title):
logger.debug(f"SKIPPED worksheet '{wks.title}' due to allow/block rules")
for ii, worksheet in enumerate(sh.worksheets()):
if not self.should_process_sheet(worksheet.title):
logger.debug(f"SKIPPED worksheet '{worksheet.title}' due to allow/block rules")
continue
logger.info(f'Opening worksheet {ii=}: {wks.title=} header={self.header}')
gw = GWorksheet(wks, header_row=self.header, columns=self.columns)
logger.info(f'Opening worksheet {ii=}: {worksheet.title=} header={self.header}')
gw = GWorksheet(worksheet, header_row=self.header, columns=self.columns)
if len(missing_cols := self.missing_required_columns(gw)):
logger.warning(f"SKIPPED worksheet '{wks.title}' due to missing required column(s) for {missing_cols}")
continue
for row in range(1 + self.header, gw.count_rows() + 1):
url = gw.get_cell(row, 'url').strip()
if not len(url): continue
# process and yield metadata here:
yield from self._process_rows(gw)
logger.success(f'Finished worksheet {worksheet.title}')
original_status = gw.get_cell(row, 'status')
status = gw.get_cell(row, 'status', fresh=original_status in ['', None])
# TODO: custom status parser(?) aka should_retry_from_status
if status not in ['', None]: continue
def _process_rows(self, gw: GWorksheet) -> Metadata:
for row in range(1 + self.header, gw.count_rows() + 1):
url = gw.get_cell(row, 'url').strip()
if not len(url): continue
original_status = gw.get_cell(row, 'status')
status = gw.get_cell(row, 'status', fresh=original_status in ['', None])
# TODO: custom status parser(?) aka should_retry_from_status
if status not in ['', None]: continue
# All checks done - archival process starts here
m = Metadata().set_url(url)
if gw.get_cell_or_default(row, 'folder', "") is None:
folder = ''
else:
folder = slugify(gw.get_cell_or_default(row, 'folder', "").strip())
if len(folder) and self.use_sheet_names_in_stored_paths:
folder = os.path.join(folder, slugify(self.sheet), slugify(wks.title))
# All checks done - archival process starts here
m = Metadata().set_url(url)
self._set_context(m, gw, row)
yield m
m.set_context('folder', folder)
m.set_context('worksheet', {"row": row, "worksheet": gw})
yield m
def _set_context(self, m: Metadata, gw: GWorksheet, row: int) -> Metadata:
# TODO: Check folder value not being recognised
m.set_context("gsheet", {"row": row, "worksheet": gw})
if gw.get_cell_or_default(row, 'folder', "") is None:
folder = ''
else:
folder = slugify(gw.get_cell_or_default(row, 'folder', "").strip())
if len(folder):
if self.use_sheet_names_in_stored_paths:
m.set_context("folder", os.path.join(folder, slugify(self.sheet), slugify(gw.wks.title)))
else:
m.set_context("folder", folder)
logger.success(f'Finished worksheet {wks.title}')
def should_process_sheet(self, sheet_name: str) -> bool:
if len(self.allow_worksheets) and sheet_name not in self.allow_worksheets:

Wyświetl plik

@ -9,6 +9,7 @@ from auto_archiver.modules.instagram_api_extractor.instagram_api_extractor impor
from .test_extractor_base import TestExtractorBase
@pytest.fixture
def mock_user_response():
return {
@ -71,11 +72,18 @@ class TestInstagramAPIExtractor(TestExtractorBase):
config = {
"access_token": "test_access_token",
"api_endpoint": "https://api.instagram.com/v1",
# "full_profile": False,
"full_profile": False,
# "full_profile_max_posts": 0,
# "minimize_json_output": True,
}
@pytest.fixture
def metadata(self):
m = Metadata()
m.set_url("https://instagram.com/test_user")
m.set("netloc", "instagram.com")
return m
@pytest.mark.parametrize("url,expected", [
("https://instagram.com/user", [("", "user", "")]),
("https://instagr.am/p/post_id", []),
@ -88,7 +96,6 @@ class TestInstagramAPIExtractor(TestExtractorBase):
assert self.extractor.valid_url.findall(url) == expected
def test_initialize(self):
self.extractor.initialise()
assert self.extractor.api_endpoint[-1] != "/"
@pytest.mark.parametrize("input_dict,expected", [
@ -98,11 +105,85 @@ class TestInstagramAPIExtractor(TestExtractorBase):
def test_cleanup_dict(self, input_dict, expected):
assert self.extractor.cleanup_dict(input_dict) == expected
def test_download_post(self):
def test_download(self):
pass
def test_download_post(self, metadata, mock_user_response):
# test with context=reel
# test with context=post
# test with multiple images
# test gets text (metadata title)
pass
def test_download_profile_basic(self, metadata, mock_user_response):
"""Test basic profile download without full_profile"""
with patch.object(self.extractor, 'call_api') as mock_call, \
patch.object(self.extractor, 'download_from_url') as mock_download:
# Mock API responses
mock_call.return_value = mock_user_response
mock_download.return_value = "profile.jpg"
pass
result = self.extractor.download_profile(metadata, "test_user")
assert result.status == "insta profile: success"
assert result.get_title() == "Test User"
assert result.get("data") == self.extractor.cleanup_dict(mock_user_response["user"])
# Verify profile picture download
mock_call.assert_called_once_with("v2/user/by/username", {"username": "test_user"})
mock_download.assert_called_once_with("http://example.com/profile.jpg")
assert len(result.media) == 1
assert result.media[0].filename == "profile.jpg"
def test_download_profile_full(self, metadata, mock_user_response, mock_story_response):
"""Test full profile download with stories/posts"""
with patch.object(self.extractor, 'call_api') as mock_call, \
patch.object(self.extractor, 'download_all_posts') as mock_posts, \
patch.object(self.extractor, 'download_all_highlights') as mock_highlights, \
patch.object(self.extractor, 'download_all_tagged') as mock_tagged, \
patch.object(self.extractor, '_download_stories_reusable') as mock_stories:
self.extractor.full_profile = True
mock_call.side_effect = [
mock_user_response,
mock_story_response
]
mock_highlights.return_value = None
mock_stories.return_value = mock_story_response
mock_posts.return_value = None
mock_tagged.return_value = None
result = self.extractor.download_profile(metadata, "test_user")
assert result.get("#stories") == len(mock_story_response)
mock_posts.assert_called_once_with(result, "123")
assert "errors" not in result.metadata
def test_download_profile_not_found(self, metadata):
"""Test profile not found error"""
with patch.object(self.extractor, 'call_api') as mock_call:
mock_call.return_value = {"user": None}
with pytest.raises(AssertionError) as exc_info:
self.extractor.download_profile(metadata, "invalid_user")
assert "User invalid_user not found" in str(exc_info.value)
def test_download_profile_error_handling(self, metadata, mock_user_response):
"""Test error handling in full profile mode"""
with (patch.object(self.extractor, 'call_api') as mock_call, \
patch.object(self.extractor, 'download_all_highlights') as mock_highlights, \
patch.object(self.extractor, 'download_all_tagged') as mock_tagged, \
patch.object(self.extractor, '_download_stories_reusable') as stories_tagged, \
patch.object(self.extractor, 'download_all_posts') as mock_posts
):
self.extractor.full_profile = True
mock_call.side_effect = [
mock_user_response,
Exception("Stories API failed"),
Exception("Posts API failed")
]
mock_highlights.return_value = None
mock_tagged.return_value = None
stories_tagged.return_value = None
mock_posts.return_value = None
result = self.extractor.download_profile(metadata, "test_user")
assert result.is_success()
assert "Error downloading stories for test_user" in result.metadata["errors"]
# assert "Error downloading posts for test_user" in result.metadata["errors"]

Wyświetl plik

@ -4,7 +4,7 @@ import gspread
import pytest
from unittest.mock import patch, MagicMock
from auto_archiver.modules.gsheet_feeder import GsheetsFeeder
from auto_archiver.core import Metadata, Feeder, ArchivingContext
from auto_archiver.core import Metadata, Feeder
def test_initialise_without_sheet_and_sheet_id(setup_module):
@ -100,21 +100,21 @@ def test__process_rows(gsheet_feeder: GsheetsFeeder):
def test__set_metadata(gsheet_feeder: GsheetsFeeder, worksheet):
gsheet_feeder._set_context(worksheet, 1)
assert ArchivingContext.get("gsheet") == {"row": 1, "worksheet": worksheet}
assert Metadata.get_context("gsheet") == {"row": 1, "worksheet": worksheet}
@pytest.mark.skip(reason="Not recognising folder column")
def test__set_metadata_with_folder_pickled(gsheet_feeder: GsheetsFeeder, worksheet):
gsheet_feeder._set_context(worksheet, 7)
assert ArchivingContext.get("gsheet") == {"row": 1, "worksheet": worksheet}
assert Metadata.get_context("gsheet") == {"row": 1, "worksheet": worksheet}
def test__set_metadata_with_folder(gsheet_feeder: GsheetsFeeder):
testworksheet = TestWorksheet()
testworksheet.wks.title = "TestSheet"
gsheet_feeder._set_context(testworksheet, 6)
assert ArchivingContext.get("gsheet") == {"row": 6, "worksheet": testworksheet}
assert ArchivingContext.get("folder") == "some-folder/test-auto-archiver/testsheet"
assert Metadata.get_context("gsheet") == {"row": 6, "worksheet": testworksheet}
assert Metadata.get_context("folder") == "some-folder/test-auto-archiver/testsheet"
@pytest.mark.usefixtures("setup_module")

Wyświetl plik

@ -0,0 +1,161 @@
import pytest
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import Any
from auto_archiver.core.metadata import Metadata
@pytest.fixture
def basic_metadata():
m = Metadata()
m.set_url("https://example.com")
m.set("title", "Test Page")
return m
@dataclass
class MockMedia:
filename: str = ""
mimetype: str = ""
data: dict = None
def get(self, key: str, default: Any = None) -> Any:
return self.data.get(key, default) if self.data else default
def set(self, key: str, value: Any) -> None:
if not self.data:
self.data = {}
self.data[key] = value
@pytest.fixture
def media_file():
def _create(filename="test.txt", mimetype="text/plain", hash_value=None):
m = MockMedia(filename=filename, mimetype=mimetype)
if hash_value:
m.set("hash", hash_value)
return m
return _create
def test_initial_state():
m = Metadata()
assert m.status == "no archiver"
assert m.metadata == {"_processed_at": m.get("_processed_at")}
assert m.media == []
assert isinstance(m.get("_processed_at"), datetime)
def test_url_properties(basic_metadata):
assert basic_metadata.get_url() == "https://example.com"
assert basic_metadata.netloc == "example.com"
def test_simple_merge(basic_metadata):
right = Metadata(status="success")
right.set("title", "Test Title")
basic_metadata.merge(right)
assert basic_metadata.status == "success"
assert basic_metadata.get("title") == "Test Title"
def test_left_merge():
left = (
Metadata()
.set("tags", ["a"])
.set("stats", {"views": 10})
.set("status", "success")
)
right = (
Metadata()
.set("tags", ["b"])
.set("stats", {"likes": 5})
.set("status", "no archiver")
)
left.merge(right, overwrite_left=True)
assert left.get("status") == "no archiver"
assert left.get("tags") == ["a", "b"]
assert left.get("stats") == {"views": 10, "likes": 5}
def test_media_management(basic_metadata, media_file):
media1 = media_file(hash_value="abc")
media2 = media_file(hash_value="abc") # Duplicate
media3 = media_file(hash_value="def")
basic_metadata.add_media(media1, "m1")
basic_metadata.add_media(media2, "m2")
basic_metadata.add_media(media3)
assert len(basic_metadata.media) == 3
basic_metadata.remove_duplicate_media_by_hash()
assert len(basic_metadata.media) == 2
assert basic_metadata.get_media_by_id("m1") == media1
def test_success():
m = Metadata()
assert not m.is_success()
m.success("context")
assert m.is_success()
assert m.status == "context: success"
def test_is_empty():
m = Metadata()
assert m.is_empty()
# meaningless ids
(
m.set("url", "example.com")
.set("total_bytes", 100)
.set("archive_duration_seconds", 10)
.set("_processed_at", datetime.now(timezone.utc))
)
assert m.is_empty()
def test_store():
pass
# Test Media operations
# Test custom getter/setters
def test_get_set_url():
m = Metadata()
m.set_url("http://example.com")
assert m.get_url() == "http://example.com"
with pytest.raises(AssertionError):
m.set_url("")
assert m.get("url") == "http://example.com"
def test_set_content():
m = Metadata()
m.set_content("Some content")
assert m.get("content") == "Some content"
# Test appending
m.set_content("New content")
# Do we want to add a line break to the method?
assert m.get("content") == "Some contentNew content"
def test_choose_most_complex():
pass
def test_get_context():
m = Metadata()
m.set_context("somekey", "somevalue")
assert m.get_context("somekey") == "somevalue"
assert m.get_context("nonexistent") is None
m.set_context("anotherkey", "anothervalue")
# check the previous is retained
assert m.get_context("somekey") == "somevalue"
assert m.get_context("anotherkey") == "anothervalue"
assert len(m._context) == 2