kopia lustrzana https://github.com/bellingcat/auto-archiver
Update Atlos tests
rodzic
0f911543cd
commit
b9c2f98f46
|
@ -1 +0,0 @@
|
|||
from .atlos_db import AtlosDb
|
|
@ -1,38 +0,0 @@
|
|||
{
|
||||
"name": "Atlos Database",
|
||||
"type": ["database"],
|
||||
"entry_point": "atlos_db::AtlosDb",
|
||||
"requires_setup": True,
|
||||
"dependencies":
|
||||
{"python": ["loguru",
|
||||
""],
|
||||
"bin": [""]},
|
||||
"configs": {
|
||||
"api_token": {
|
||||
"default": None,
|
||||
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
|
||||
"required": True,
|
||||
"type": "str",
|
||||
},
|
||||
"atlos_url": {
|
||||
"default": "https://platform.atlos.org",
|
||||
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
|
||||
"type": "str"
|
||||
},
|
||||
},
|
||||
"description": """
|
||||
Handles integration with the Atlos platform for managing archival results.
|
||||
|
||||
### Features
|
||||
- Outputs archival results to the Atlos API for storage and tracking.
|
||||
- Updates failure status with error details when archiving fails.
|
||||
- Processes and formats metadata, including ISO formatting for datetime fields.
|
||||
- Skips processing for items without an Atlos ID.
|
||||
|
||||
### Setup
|
||||
Required configs:
|
||||
- atlos_url: Base URL for the Atlos API.
|
||||
- api_token: Authentication token for API access.
|
||||
"""
|
||||
,
|
||||
}
|
|
@ -1,66 +0,0 @@
|
|||
from typing import Union
|
||||
|
||||
import requests
|
||||
from loguru import logger
|
||||
|
||||
from auto_archiver.core import Database
|
||||
from auto_archiver.core import Metadata
|
||||
|
||||
|
||||
class AtlosDb(Database):
|
||||
"""
|
||||
Outputs results to Atlos
|
||||
"""
|
||||
|
||||
def failed(self, item: Metadata, reason: str) -> None:
|
||||
"""Update DB accordingly for failure"""
|
||||
# If the item has no Atlos ID, there's nothing for us to do
|
||||
if not item.metadata.get("atlos_id"):
|
||||
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
|
||||
return
|
||||
|
||||
requests.post(
|
||||
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
|
||||
headers={"Authorization": f"Bearer {self.api_token}"},
|
||||
json={"metadata": {"processed": True, "status": "error", "error": reason}},
|
||||
).raise_for_status()
|
||||
logger.info(
|
||||
f"Stored failure for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos: {reason}"
|
||||
)
|
||||
|
||||
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
|
||||
"""check and fetch if the given item has been archived already, each
|
||||
database should handle its own caching, and configuration mechanisms"""
|
||||
return False
|
||||
|
||||
def _process_metadata(self, item: Metadata) -> dict:
|
||||
"""Process metadata for storage on Atlos. Will convert any datetime
|
||||
objects to ISO format."""
|
||||
|
||||
return {
|
||||
k: v.isoformat() if hasattr(v, "isoformat") else v
|
||||
for k, v in item.metadata.items()
|
||||
}
|
||||
|
||||
def done(self, item: Metadata, cached: bool = False) -> None:
|
||||
"""archival result ready - should be saved to DB"""
|
||||
|
||||
if not item.metadata.get("atlos_id"):
|
||||
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
|
||||
return
|
||||
|
||||
requests.post(
|
||||
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
|
||||
headers={"Authorization": f"Bearer {self.api_token}"},
|
||||
json={
|
||||
"metadata": dict(
|
||||
processed=True,
|
||||
status="success",
|
||||
results=self._process_metadata(item),
|
||||
)
|
||||
},
|
||||
).raise_for_status()
|
||||
|
||||
logger.info(
|
||||
f"Stored success for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos"
|
||||
)
|
|
@ -1 +0,0 @@
|
|||
from .atlos_feeder import AtlosFeeder
|
|
@ -1,34 +0,0 @@
|
|||
{
|
||||
"name": "Atlos Feeder",
|
||||
"type": ["feeder"],
|
||||
"requires_setup": True,
|
||||
"dependencies": {
|
||||
"python": ["loguru", "requests"],
|
||||
},
|
||||
"configs": {
|
||||
"api_token": {
|
||||
"type": "str",
|
||||
"required": True,
|
||||
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
|
||||
},
|
||||
"atlos_url": {
|
||||
"default": "https://platform.atlos.org",
|
||||
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
|
||||
"type": "str"
|
||||
},
|
||||
},
|
||||
"description": """
|
||||
AtlosFeeder: A feeder module that integrates with the Atlos API to fetch source material URLs for archival.
|
||||
|
||||
### Features
|
||||
- Connects to the Atlos API to retrieve a list of source material URLs.
|
||||
- Filters source materials based on visibility, processing status, and metadata.
|
||||
- Converts filtered source materials into `Metadata` objects with the relevant `atlos_id` and URL.
|
||||
- Iterates through paginated results using a cursor for efficient API interaction.
|
||||
|
||||
### Notes
|
||||
- Requires an Atlos API endpoint and a valid API token for authentication.
|
||||
- Ensures only unprocessed, visible, and ready-to-archive URLs are returned.
|
||||
- Handles pagination transparently when retrieving data from the Atlos API.
|
||||
"""
|
||||
}
|
|
@ -1,42 +0,0 @@
|
|||
import requests
|
||||
from loguru import logger
|
||||
|
||||
from auto_archiver.core import Feeder
|
||||
from auto_archiver.core import Metadata
|
||||
|
||||
|
||||
class AtlosFeeder(Feeder):
|
||||
|
||||
def __iter__(self) -> Metadata:
|
||||
# Get all the urls from the Atlos API
|
||||
count = 0
|
||||
cursor = None
|
||||
while True:
|
||||
response = requests.get(
|
||||
f"{self.atlos_url}/api/v2/source_material",
|
||||
headers={"Authorization": f"Bearer {self.api_token}"},
|
||||
params={"cursor": cursor},
|
||||
)
|
||||
data = response.json()
|
||||
response.raise_for_status()
|
||||
cursor = data["next"]
|
||||
|
||||
for item in data["results"]:
|
||||
if (
|
||||
item["source_url"] not in [None, ""]
|
||||
and (
|
||||
item["metadata"]
|
||||
.get("auto_archiver", {})
|
||||
.get("processed", False)
|
||||
!= True
|
||||
)
|
||||
and item["visibility"] == "visible"
|
||||
and item["status"] not in ["processing", "pending"]
|
||||
):
|
||||
yield Metadata().set_url(item["source_url"]).set(
|
||||
"atlos_id", item["id"]
|
||||
)
|
||||
count += 1
|
||||
|
||||
if len(data["results"]) == 0 or cursor is None:
|
||||
break
|
|
@ -1 +0,0 @@
|
|||
from .atlos_feeder import AtlosFeeder
|
|
@ -11,12 +11,9 @@ from auto_archiver.utils import calculate_file_hash
|
|||
|
||||
class AtlosFeederDbStorage(Feeder, Database, Storage):
|
||||
|
||||
@property
|
||||
def session(self) -> requests.Session:
|
||||
def setup(self) -> requests.Session:
|
||||
"""create and return a persistent session."""
|
||||
if not hasattr(self, "_session"):
|
||||
self._session = requests.Session()
|
||||
return self._session
|
||||
self.session = requests.Session()
|
||||
|
||||
def _get(self, endpoint: str, params: Optional[dict] = None) -> dict:
|
||||
"""Wrapper for GET requests to the Atlos API."""
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
from .atlos_storage import AtlosStorage
|
|
@ -1,32 +0,0 @@
|
|||
{
|
||||
"name": "Atlos Storage",
|
||||
"type": ["storage"],
|
||||
"requires_setup": True,
|
||||
"dependencies": {
|
||||
"python": ["loguru", "boto3"],
|
||||
"bin": []
|
||||
},
|
||||
"description": """
|
||||
Stores media files in a [Atlos](https://www.atlos.org/).
|
||||
|
||||
### Features
|
||||
- Saves media files to Atlos, organizing them into folders based on the provided path structure.
|
||||
|
||||
### Notes
|
||||
- Requires setup with Atlos credentials.
|
||||
- Files are uploaded to the specified `root_folder_id` and organized by the `media.key` structure.
|
||||
""",
|
||||
"configs": {
|
||||
"api_token": {
|
||||
"default": None,
|
||||
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
|
||||
"required": True,
|
||||
"type": "str"
|
||||
},
|
||||
"atlos_url": {
|
||||
"default": "https://platform.atlos.org",
|
||||
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
|
||||
"type": "str"
|
||||
},
|
||||
}
|
||||
}
|
|
@ -1,68 +0,0 @@
|
|||
import hashlib
|
||||
import os
|
||||
from typing import IO, Optional
|
||||
|
||||
import requests
|
||||
from loguru import logger
|
||||
|
||||
from auto_archiver.core import Media, Metadata
|
||||
from auto_archiver.core import Storage
|
||||
from auto_archiver.utils import calculate_file_hash
|
||||
|
||||
|
||||
class AtlosStorage(Storage):
|
||||
|
||||
def get_cdn_url(self, _media: Media) -> str:
|
||||
# It's not always possible to provide an exact URL, because it's
|
||||
# possible that the media once uploaded could have been copied to
|
||||
# another project.
|
||||
return self.atlos_url
|
||||
|
||||
def _hash(self, media: Media) -> str:
|
||||
# Hash the media file using sha-256. We don't use the existing auto archiver
|
||||
# hash because there's no guarantee that the configuerer is using sha-256, which
|
||||
# is how Atlos hashes files.
|
||||
|
||||
sha256 = hashlib.sha256()
|
||||
with open(media.filename, "rb") as f:
|
||||
while True:
|
||||
buf = f.read(4096)
|
||||
if not buf: break
|
||||
sha256.update(buf)
|
||||
return sha256.hexdigest()
|
||||
|
||||
def upload(self, media: Media, metadata: Optional[Metadata]=None, **_kwargs) -> bool:
|
||||
atlos_id = metadata.get("atlos_id")
|
||||
if atlos_id is None:
|
||||
logger.error(f"No Atlos ID found in metadata; can't store {media.filename} on Atlos")
|
||||
return False
|
||||
|
||||
media_hash = self._hash(media)
|
||||
# media_hash = calculate_file_hash(media.filename, hash_algo=hashlib.sha256, chunksize=4096)
|
||||
|
||||
# Check whether the media has already been uploaded
|
||||
source_material = requests.get(
|
||||
f"{self.atlos_url}/api/v2/source_material/{atlos_id}",
|
||||
headers={"Authorization": f"Bearer {self.api_token}"},
|
||||
).json()["result"]
|
||||
existing_media = [x["file_hash_sha256"] for x in source_material.get("artifacts", [])]
|
||||
if media_hash in existing_media:
|
||||
logger.info(f"{media.filename} with SHA256 {media_hash} already uploaded to Atlos")
|
||||
return True
|
||||
|
||||
# Upload the media to the Atlos API
|
||||
requests.post(
|
||||
f"{self.atlos_url}/api/v2/source_material/upload/{atlos_id}",
|
||||
headers={"Authorization": f"Bearer {self.api_token}"},
|
||||
params={
|
||||
"title": media.properties
|
||||
},
|
||||
files={"file": (os.path.basename(media.filename), open(media.filename, "rb"))},
|
||||
).raise_for_status()
|
||||
|
||||
logger.info(f"Uploaded {media.filename} to Atlos with ID {atlos_id} and title {media.key}")
|
||||
|
||||
return True
|
||||
|
||||
# must be implemented even if unused
|
||||
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass
|
|
@ -2,7 +2,7 @@ import pytest
|
|||
from datetime import datetime
|
||||
|
||||
from auto_archiver.core import Metadata
|
||||
from auto_archiver.modules.atlos_db import AtlosDb
|
||||
from auto_archiver.modules.atlos_feeder_db_storage import AtlosFeederDbStorage as AtlosDb
|
||||
|
||||
|
||||
class FakeAPIResponse:
|
||||
|
@ -12,19 +12,28 @@ class FakeAPIResponse:
|
|||
self._data = data
|
||||
self.raise_error = raise_error
|
||||
|
||||
def json(self) -> dict:
|
||||
return self._data
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
if self.raise_error:
|
||||
raise Exception("HTTP error")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def atlos_db(setup_module) -> AtlosDb:
|
||||
def atlos_db(setup_module, mocker) -> AtlosDb:
|
||||
"""Fixture for AtlosDb."""
|
||||
configs: dict = {
|
||||
"api_token": "abc123",
|
||||
"atlos_url": "https://platform.atlos.org",
|
||||
}
|
||||
return setup_module("atlos_db", configs)
|
||||
mocker.patch("requests.Session")
|
||||
atlos_feeder = setup_module("atlos_feeder_db_storage", configs)
|
||||
fake_session = mocker.MagicMock()
|
||||
# Configure the default response to have no results so that __iter__ terminates
|
||||
fake_session.get.return_value = FakeAPIResponse({"next": None, "results": []})
|
||||
atlos_feeder.session = fake_session
|
||||
return atlos_feeder
|
||||
|
||||
|
||||
def test_failed_no_atlos_id(atlos_db, metadata, mocker):
|
||||
|
@ -38,25 +47,20 @@ def test_failed_with_atlos_id(atlos_db, metadata, mocker):
|
|||
"""Test failed() posts failure when atlos_id is present."""
|
||||
metadata.set("atlos_id", 42)
|
||||
fake_resp = FakeAPIResponse({}, raise_error=False)
|
||||
post_mock = mocker.patch("requests.post", return_value=fake_resp)
|
||||
post_mock = mocker.patch.object(atlos_db, "_post", return_value=fake_resp)
|
||||
atlos_db.failed(metadata, "failure reason")
|
||||
expected_url = (
|
||||
f"{atlos_db.atlos_url}/api/v2/source_material/metadata/42/auto_archiver"
|
||||
)
|
||||
expected_headers = {"Authorization": f"Bearer {atlos_db.api_token}"}
|
||||
expected_endpoint = f"/api/v2/source_material/metadata/42/auto_archiver"
|
||||
expected_json = {
|
||||
"metadata": {"processed": True, "status": "error", "error": "failure reason"}
|
||||
}
|
||||
post_mock.assert_called_once_with(
|
||||
expected_url, headers=expected_headers, json=expected_json
|
||||
)
|
||||
post_mock.assert_called_once_with(expected_endpoint, json=expected_json)
|
||||
|
||||
|
||||
def test_failed_http_error(atlos_db, metadata, mocker):
|
||||
"""Test failed() raises exception on HTTP error."""
|
||||
metadata.set("atlos_id", 42)
|
||||
fake_resp = FakeAPIResponse({}, raise_error=True)
|
||||
mocker.patch("requests.post", return_value=fake_resp)
|
||||
# Patch _post to raise an exception instead of returning a fake response.
|
||||
mocker.patch.object(atlos_db, "_post", side_effect=Exception("HTTP error"))
|
||||
with pytest.raises(Exception, match="HTTP error"):
|
||||
atlos_db.failed(metadata, "failure reason")
|
||||
|
||||
|
@ -81,12 +85,9 @@ def test_done_with_atlos_id(atlos_db, metadata, mocker):
|
|||
now = datetime.now()
|
||||
metadata.set("timestamp", now)
|
||||
fake_resp = FakeAPIResponse({}, raise_error=False)
|
||||
post_mock = mocker.patch("requests.post", return_value=fake_resp)
|
||||
post_mock = mocker.patch.object(atlos_db, "_post", return_value=fake_resp)
|
||||
atlos_db.done(metadata)
|
||||
expected_url = (
|
||||
f"{atlos_db.atlos_url}/api/v2/source_material/metadata/99/auto_archiver"
|
||||
)
|
||||
expected_headers = {"Authorization": f"Bearer {atlos_db.api_token}"}
|
||||
expected_endpoint = f"/api/v2/source_material/metadata/99/auto_archiver"
|
||||
expected_results = metadata.metadata.copy()
|
||||
expected_results["timestamp"] = now.isoformat()
|
||||
expected_json = {
|
||||
|
@ -96,15 +97,13 @@ def test_done_with_atlos_id(atlos_db, metadata, mocker):
|
|||
"results": expected_results,
|
||||
}
|
||||
}
|
||||
post_mock.assert_called_once_with(
|
||||
expected_url, headers=expected_headers, json=expected_json
|
||||
)
|
||||
post_mock.assert_called_once_with(expected_endpoint, json=expected_json)
|
||||
|
||||
|
||||
def test_done_http_error(atlos_db, metadata, mocker):
|
||||
"""Test done() raises exception on HTTP error."""
|
||||
"""Test done() raises an exception on HTTP error."""
|
||||
metadata.set("atlos_id", 123)
|
||||
fake_resp = FakeAPIResponse({}, raise_error=True)
|
||||
mocker.patch("requests.post", return_value=fake_resp)
|
||||
# Patch _post to raise an exception.
|
||||
mocker.patch.object(atlos_db, "_post", side_effect=Exception("HTTP error"))
|
||||
with pytest.raises(Exception, match="HTTP error"):
|
||||
atlos_db.done(metadata)
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import pytest
|
||||
from auto_archiver.modules.atlos_feeder import AtlosFeeder
|
||||
from auto_archiver.modules.atlos_feeder_db_storage import AtlosFeederDbStorage as AtlosFeeder
|
||||
|
||||
|
||||
class FakeAPIResponse:
|
||||
|
@ -18,23 +18,26 @@ class FakeAPIResponse:
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def atlos_feeder(setup_module) -> AtlosFeeder:
|
||||
def atlos_feeder(setup_module, mocker) -> AtlosFeeder:
|
||||
"""Fixture for AtlosFeeder."""
|
||||
configs: dict = {
|
||||
"api_token": "abc123",
|
||||
"atlos_url": "https://platform.atlos.org",
|
||||
}
|
||||
return setup_module("atlos_feeder", configs)
|
||||
mocker.patch("requests.Session")
|
||||
atlos_feeder = setup_module("atlos_feeder_db_storage", configs)
|
||||
fake_session = mocker.MagicMock()
|
||||
# Configure the default response to have no results so that __iter__ terminates
|
||||
fake_session.get.return_value = FakeAPIResponse({"next": None, "results": []})
|
||||
atlos_feeder.session = fake_session
|
||||
return atlos_feeder
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_atlos_api(mocker):
|
||||
"""Fixture to mock requests to Atlos API."""
|
||||
def mock_atlos_api(atlos_feeder):
|
||||
"""Fixture to update the atlos_feeder.session.get side_effect."""
|
||||
def _mock_responses(responses):
|
||||
mocker.patch(
|
||||
"requests.get",
|
||||
side_effect=[FakeAPIResponse(data) for data in responses],
|
||||
)
|
||||
atlos_feeder.session.get.side_effect = [FakeAPIResponse(data) for data in responses]
|
||||
return _mock_responses
|
||||
|
||||
|
||||
|
@ -100,9 +103,7 @@ def test_atlos_feeder_no_results(atlos_feeder, mock_atlos_api):
|
|||
|
||||
def test_atlos_feeder_http_error(atlos_feeder, mocker):
|
||||
"""Test raises an exception on HTTP error."""
|
||||
mocker.patch(
|
||||
"requests.get",
|
||||
return_value=FakeAPIResponse({"next": None, "results": []}, raise_error=True),
|
||||
)
|
||||
fake_response = FakeAPIResponse({"next": None, "results": []}, raise_error=True)
|
||||
atlos_feeder.session.get.side_effect = [fake_response]
|
||||
with pytest.raises(Exception, match="HTTP error"):
|
||||
list(atlos_feeder)
|
||||
|
|
|
@ -2,7 +2,7 @@ import os
|
|||
import hashlib
|
||||
import pytest
|
||||
from auto_archiver.core import Media, Metadata
|
||||
from auto_archiver.modules.atlos_storage import AtlosStorage
|
||||
from auto_archiver.modules.atlos_feeder_db_storage import AtlosFeederDbStorage as AtlosStorage
|
||||
|
||||
|
||||
class FakeAPIResponse:
|
||||
|
@ -21,13 +21,19 @@ class FakeAPIResponse:
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def atlos_storage(setup_module) -> AtlosStorage:
|
||||
def atlos_storage(setup_module, mocker) -> AtlosStorage:
|
||||
"""Fixture for AtlosStorage."""
|
||||
configs: dict = {
|
||||
"api_token": "abc123",
|
||||
"atlos_url": "https://platform.atlos.org",
|
||||
}
|
||||
return setup_module("atlos_storage", configs)
|
||||
mocker.patch("requests.Session")
|
||||
atlos_feeder = setup_module("atlos_feeder_db_storage", configs)
|
||||
mock_session = mocker.MagicMock()
|
||||
# Configure the default response to have no results so that __iter__ terminates
|
||||
mock_session.get.return_value = FakeAPIResponse({"next": None, "results": []})
|
||||
atlos_feeder.session = mock_session
|
||||
return atlos_feeder
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -49,17 +55,6 @@ def test_get_cdn_url(atlos_storage: AtlosStorage) -> None:
|
|||
assert url == atlos_storage.atlos_url
|
||||
|
||||
|
||||
def test_hash(tmp_path, atlos_storage: AtlosStorage) -> None:
|
||||
"""Test _hash() computes the correct SHA-256 hash of a file."""
|
||||
content = b"hello world"
|
||||
file_path = tmp_path / "test.txt"
|
||||
file_path.write_bytes(content)
|
||||
media = Media(filename="dummy.mp4")
|
||||
media.filename = str(file_path)
|
||||
expected_hash = hashlib.sha256(content).hexdigest()
|
||||
assert atlos_storage._hash(media) == expected_hash
|
||||
|
||||
|
||||
def test_upload_no_atlos_id(tmp_path, atlos_storage: AtlosStorage, media: Media, mocker) -> None:
|
||||
"""Test upload() returns False when metadata lacks atlos_id."""
|
||||
metadata = Metadata() # atlos_id not set
|
||||
|
@ -69,74 +64,49 @@ def test_upload_no_atlos_id(tmp_path, atlos_storage: AtlosStorage, media: Media,
|
|||
post_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_upload_already_uploaded(atlos_storage: AtlosStorage,
|
||||
metadata: Metadata,
|
||||
media: Media,
|
||||
tmp_path,
|
||||
mocker) -> None:
|
||||
def test_upload_already_uploaded(atlos_storage: AtlosStorage, metadata: Metadata, media: Media, mocker) -> None:
|
||||
"""Test upload() returns True if media hash already exists."""
|
||||
content = b"media content"
|
||||
metadata.set("atlos_id", 101)
|
||||
media_hash = hashlib.sha256(content).hexdigest()
|
||||
fake_get = FakeAPIResponse({
|
||||
"result": {"artifacts": [{"file_hash_sha256": media_hash}]}
|
||||
})
|
||||
get_mock = mocker.patch("requests.get", return_value=fake_get)
|
||||
post_mock = mocker.patch("requests.post")
|
||||
fake_get_response = {"result": {"artifacts": [{"file_hash_sha256": media_hash}]}}
|
||||
get_mock = mocker.patch.object(atlos_storage, "_get", return_value=fake_get_response)
|
||||
post_mock = mocker.patch.object(atlos_storage, "_post")
|
||||
result = atlos_storage.upload(media, metadata)
|
||||
assert result is True
|
||||
get_mock.assert_called_once()
|
||||
post_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_upload_not_uploaded(tmp_path, atlos_storage: AtlosStorage,
|
||||
metadata: Metadata,
|
||||
media: Media,
|
||||
mocker) -> None:
|
||||
def test_upload_not_uploaded(tmp_path, atlos_storage: AtlosStorage, metadata: Metadata, media: Media, mocker) -> None:
|
||||
"""Test upload() uploads media when not already present."""
|
||||
metadata.set("atlos_id", 202)
|
||||
fake_get = FakeAPIResponse({
|
||||
"result": {"artifacts": [{"file_hash_sha256": "different_hash"}]}
|
||||
})
|
||||
get_mock = mocker.patch("requests.get", return_value=fake_get)
|
||||
fake_post = FakeAPIResponse({}, raise_error=False)
|
||||
post_mock = mocker.patch("requests.post", return_value=fake_post)
|
||||
fake_get_response = {"result": {"artifacts": [{"file_hash_sha256": "different_hash"}]}}
|
||||
get_mock = mocker.patch.object(atlos_storage, "_get", return_value=fake_get_response)
|
||||
fake_post_response = {"result": "uploaded"}
|
||||
post_mock = mocker.patch.object(atlos_storage, "_post", return_value=fake_post_response)
|
||||
result = atlos_storage.upload(media, metadata)
|
||||
assert result is True
|
||||
|
||||
get_mock.assert_called_once()
|
||||
post_mock.assert_called_once()
|
||||
expected_url = f"{atlos_storage.atlos_url}/api/v2/source_material/upload/202"
|
||||
expected_endpoint = f"/api/v2/source_material/upload/202"
|
||||
call_args = post_mock.call_args[0]
|
||||
assert call_args[0] == expected_endpoint
|
||||
call_kwargs = post_mock.call_args[1]
|
||||
expected_headers = {"Authorization": f"Bearer {atlos_storage.api_token}"}
|
||||
expected_params = {"title": media.properties}
|
||||
call_kwargs = post_mock.call_args.kwargs
|
||||
assert call_kwargs["headers"] == expected_headers
|
||||
assert call_kwargs["params"] == expected_params
|
||||
# Verify the URL passed to requests.post.
|
||||
posted_url = call_kwargs.get("url") or post_mock.call_args.args[0]
|
||||
assert posted_url == expected_url
|
||||
# Verify files parameter contains the correct filename.
|
||||
file_tuple = call_kwargs["files"]["file"]
|
||||
assert file_tuple[0] == os.path.basename(media.filename)
|
||||
|
||||
|
||||
def test_upload_post_http_error(tmp_path,
|
||||
atlos_storage: AtlosStorage,
|
||||
metadata: Metadata,
|
||||
media: Media,
|
||||
mocker) -> None:
|
||||
def test_upload_post_http_error(tmp_path, atlos_storage: AtlosStorage, metadata: Metadata, media: Media, mocker) -> None:
|
||||
"""Test upload() propagates HTTP error during POST."""
|
||||
metadata.set("atlos_id", 303)
|
||||
fake_get = FakeAPIResponse({
|
||||
"result": {"artifacts": []}
|
||||
})
|
||||
mocker.patch("requests.get", return_value=fake_get)
|
||||
fake_post = FakeAPIResponse({}, raise_error=True)
|
||||
mocker.patch("requests.post", return_value=fake_post)
|
||||
fake_get_response = {"result": {"artifacts": []}}
|
||||
mocker.patch.object(atlos_storage, "_get", return_value=fake_get_response)
|
||||
mocker.patch.object(atlos_storage, "_post", side_effect=Exception("HTTP error"))
|
||||
with pytest.raises(Exception, match="HTTP error"):
|
||||
atlos_storage.upload(media, metadata)
|
||||
|
||||
|
||||
def test_uploadf_not_implemented(atlos_storage: AtlosStorage) -> None:
|
||||
"""Test uploadf() returns None (not implemented)."""
|
||||
result = atlos_storage.uploadf(None, "dummy")
|
||||
assert result is None
|
||||
|
|
Ładowanie…
Reference in New Issue