Initial merge of Atlos Feeder and DB

pull/226/head
erinhmclark 2025-02-27 11:18:10 +00:00
rodzic d775e4612e
commit d1c8d4ba0e
4 zmienionych plików z 146 dodań i 1 usunięć

Wyświetl plik

@ -0,0 +1 @@
from .atlos_feeder import AtlosFeeder

Wyświetl plik

@ -0,0 +1,42 @@
{
"name": "Atlos Feeder Database",
"type": ["feeder", "database"],
"entry_point": "atlos_feeder_db::AtlosFeederDb",
"requires_setup": True,
"dependencies": {
"python": ["loguru", "requests"],
},
"configs": {
"api_token": {
"type": "str",
"required": True,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"type": "str"
},
},
"description": """
AtlosFeederDb: A feeder module that integrates with the Atlos API to fetch source material URLs for archival,
along with a database option to output archival results.
Feeder: A feeder module that integrates with the Atlos API to fetch source material URLs for archival.
### Features
- Connects to the Atlos API to retrieve a list of source material URLs.
- Filters source materials based on visibility, processing status, and metadata.
- Converts filtered source materials into `Metadata` objects with the relevant `atlos_id` and URL.
- Iterates through paginated results using a cursor for efficient API interaction.
- Outputs archival results to the Atlos API for storage and tracking.
- Updates failure status with error details when archiving fails.
- Processes and formats metadata, including ISO formatting for datetime fields.
- Skips processing for items without an Atlos ID.
### Notes
- Requires an Atlos API endpoint and a valid API token for authentication.
- Ensures only unprocessed, visible, and ready-to-archive URLs are returned.
- Handles pagination transparently when retrieving data from the Atlos API.
"""
}

Wyświetl plik

@ -0,0 +1,100 @@
import requests
from typing import Union
from loguru import logger
from auto_archiver.core import Database
from auto_archiver.core import Feeder
from auto_archiver.core import Metadata
class AtlosFeederDb(Feeder, Database):
def __iter__(self) -> Metadata:
# Get all the urls from the Atlos API
count = 0
cursor = None
while True:
response = requests.get(
f"{self.atlos_url}/api/v2/source_material",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"cursor": cursor},
)
data = response.json()
response.raise_for_status()
cursor = data["next"]
for item in data["results"]:
if (
item["source_url"] not in [None, ""]
and (
item["metadata"]
.get("auto_archiver", {})
.get("processed", False)
!= True
)
and item["visibility"] == "visible"
and item["status"] not in ["processing", "pending"]
):
yield Metadata().set_url(item["source_url"]).set(
"atlos_id", item["id"]
)
count += 1
if len(data["results"]) == 0 or cursor is None:
break
def failed(self, item: Metadata, reason: str) -> None:
"""Update DB accordingly for failure"""
# If the item has no Atlos ID, there's nothing for us to do
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={"metadata": {"processed": True, "status": "error", "error": reason}},
).raise_for_status()
logger.info(
f"Stored failure for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos: {reason}"
)
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check and fetch if the given item has been archived already, each
database should handle its own caching, and configuration mechanisms"""
return False
def _process_metadata(self, item: Metadata) -> dict:
"""Process metadata for storage on Atlos. Will convert any datetime
objects to ISO format."""
return {
k: v.isoformat() if hasattr(v, "isoformat") else v
for k, v in item.metadata.items()
}
def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={
"metadata": dict(
processed=True,
status="success",
results=self._process_metadata(item),
)
},
).raise_for_status()
logger.info(
f"Stored success for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos"
)

Wyświetl plik

@ -7,6 +7,7 @@ from loguru import logger
from auto_archiver.core import Media, Metadata
from auto_archiver.core import Storage
from auto_archiver.utils import calculate_file_hash
class AtlosStorage(Storage):
@ -37,7 +38,8 @@ class AtlosStorage(Storage):
return False
media_hash = self._hash(media)
# media_hash = calculate_file_hash(media.filename, hash_algo=hashlib.sha256, chunksize=4096)
# Check whether the media has already been uploaded
source_material = requests.get(
f"{self.atlos_url}/api/v2/source_material/{atlos_id}",