From 0f911543cd6da3cc1d2e3a883230c7d9171fd620 Mon Sep 17 00:00:00 2001 From: erinhmclark Date: Wed, 5 Mar 2025 13:49:11 +0000 Subject: [PATCH] Atlos refactor --- .../atlos_feeder_db_storage/__init__.py | 1 + .../atlos_feeder_db_storage.py | 192 +++++++++--------- 2 files changed, 92 insertions(+), 101 deletions(-) create mode 100644 src/auto_archiver/modules/atlos_feeder_db_storage/__init__.py diff --git a/src/auto_archiver/modules/atlos_feeder_db_storage/__init__.py b/src/auto_archiver/modules/atlos_feeder_db_storage/__init__.py new file mode 100644 index 0000000..8d62823 --- /dev/null +++ b/src/auto_archiver/modules/atlos_feeder_db_storage/__init__.py @@ -0,0 +1 @@ +from .atlos_feeder_db_storage import AtlosFeederDbStorage \ No newline at end of file diff --git a/src/auto_archiver/modules/atlos_feeder_db_storage/atlos_feeder_db_storage.py b/src/auto_archiver/modules/atlos_feeder_db_storage/atlos_feeder_db_storage.py index 7bcd74e..698cd41 100644 --- a/src/auto_archiver/modules/atlos_feeder_db_storage/atlos_feeder_db_storage.py +++ b/src/auto_archiver/modules/atlos_feeder_db_storage/atlos_feeder_db_storage.py @@ -1,70 +1,80 @@ import hashlib import os -from typing import IO, Optional -from typing import Union +from typing import IO, Iterator, Optional, Union import requests from loguru import logger -from auto_archiver.core import Database -from auto_archiver.core import Feeder -from auto_archiver.core import Media -from auto_archiver.core import Metadata -from auto_archiver.core import Storage +from auto_archiver.core import Database, Feeder, Media, Metadata, Storage +from auto_archiver.utils import calculate_file_hash class AtlosFeederDbStorage(Feeder, Database, Storage): - def __iter__(self) -> Metadata: - # Get all the urls from the Atlos API - count = 0 + @property + def session(self) -> requests.Session: + """create and return a persistent session.""" + if not hasattr(self, "_session"): + self._session = requests.Session() + return self._session + + def _get(self, endpoint: str, params: Optional[dict] = None) -> dict: + """Wrapper for GET requests to the Atlos API.""" + url = f"{self.atlos_url}{endpoint}" + response = self.session.get( + url, headers={"Authorization": f"Bearer {self.api_token}"}, params=params + ) + response.raise_for_status() + return response.json() + + def _post( + self, + endpoint: str, + json: Optional[dict] = None, + params: Optional[dict] = None, + files: Optional[dict] = None, + ) -> dict: + """Wrapper for POST requests to the Atlos API.""" + url = f"{self.atlos_url}{endpoint}" + response = self.session.post( + url, + headers={"Authorization": f"Bearer {self.api_token}"}, + json=json, + params=params, + files=files, + ) + response.raise_for_status() + return response.json() + + def __iter__(self) -> Iterator[Metadata]: + """Iterate over unprocessed, visible source materials from Atlos.""" cursor = None while True: - response = requests.get( - f"{self.atlos_url}/api/v2/source_material", - headers={"Authorization": f"Bearer {self.api_token}"}, - params={"cursor": cursor}, - ) - data = response.json() - response.raise_for_status() - cursor = data["next"] - - for item in data["results"]: + data = self._get("/api/v2/source_material", params={"cursor": cursor}) + cursor = data.get("next") + results = data.get("results", []) + for item in results: if ( - item["source_url"] not in [None, ""] - and ( - item["metadata"] - .get("auto_archiver", {}) - .get("processed", False) - != True - ) - and item["visibility"] == "visible" - and item["status"] not in ["processing", "pending"] + item.get("source_url") not in [None, ""] + and not item.get("metadata", {}).get("auto_archiver", {}).get("processed", False) + and item.get("visibility") == "visible" + and item.get("status") not in ["processing", "pending"] ): - yield Metadata().set_url(item["source_url"]).set( - "atlos_id", item["id"] - ) - count += 1 - - if len(data["results"]) == 0 or cursor is None: + yield Metadata().set_url(item["source_url"]).set("atlos_id", item["id"]) + if not results or cursor is None: break - def failed(self, item: Metadata, reason: str) -> None: - """Update DB accordingly for failure""" - # If the item has no Atlos ID, there's nothing for us to do - if not item.metadata.get("atlos_id"): + """Mark an item as failed in Atlos, if the ID exists.""" + atlos_id = item.metadata.get("atlos_id") + if not atlos_id: logger.info(f"Item {item.get_url()} has no Atlos ID, skipping") return - - requests.post( - f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver", - headers={"Authorization": f"Bearer {self.api_token}"}, + self._post( + f"/api/v2/source_material/metadata/{atlos_id}/auto_archiver", json={"metadata": {"processed": True, "status": "error", "error": reason}}, - ).raise_for_status() - logger.info( - f"Stored failure for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos: {reason}" ) + logger.info(f"Stored failure for {item.get_url()} (ID {atlos_id}) on Atlos: {reason}") def fetch(self, item: Metadata) -> Union[Metadata, bool]: """check and fetch if the given item has been archived already, each @@ -74,87 +84,67 @@ class AtlosFeederDbStorage(Feeder, Database, Storage): def _process_metadata(self, item: Metadata) -> dict: """Process metadata for storage on Atlos. Will convert any datetime objects to ISO format.""" - return { k: v.isoformat() if hasattr(v, "isoformat") else v for k, v in item.metadata.items() } def done(self, item: Metadata, cached: bool = False) -> None: - """archival result ready - should be saved to DB""" - - if not item.metadata.get("atlos_id"): + """Mark an item as successfully archived in Atlos.""" + atlos_id = item.metadata.get("atlos_id") + if not atlos_id: logger.info(f"Item {item.get_url()} has no Atlos ID, skipping") return - - requests.post( - f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver", - headers={"Authorization": f"Bearer {self.api_token}"}, + self._post( + f"/api/v2/source_material/metadata/{atlos_id}/auto_archiver", json={ - "metadata": dict( - processed=True, - status="success", - results=self._process_metadata(item), - ) + "metadata": { + "processed": True, + "status": "success", + "results": self._process_metadata(item), + } }, - ).raise_for_status() - - logger.info( - f"Stored success for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos" ) + logger.info(f"Stored success for {item.get_url()} (ID {atlos_id}) on Atlos") def get_cdn_url(self, _media: Media) -> str: - # It's not always possible to provide an exact URL, because it's - # possible that the media once uploaded could have been copied to - # another project. + """Return the base Atlos URL as the CDN URL.""" return self.atlos_url - def _hash(self, media: Media) -> str: - # Hash the media file using sha-256. We don't use the existing auto archiver - # hash because there's no guarantee that the configuerer is using sha-256, which - # is how Atlos hashes files. - - sha256 = hashlib.sha256() - with open(media.filename, "rb") as f: - while True: - buf = f.read(4096) - if not buf: break - sha256.update(buf) - return sha256.hexdigest() - def upload(self, media: Media, metadata: Optional[Metadata] = None, **_kwargs) -> bool: - atlos_id = metadata.get("atlos_id") - if atlos_id is None: - logger.error(f"No Atlos ID found in metadata; can't store {media.filename} on Atlos") + """Upload a media file to Atlos if it has not been uploaded already.""" + if metadata is None: + logger.error(f"No metadata provided for {media.filename}") return False - media_hash = self._hash(media) - # media_hash = calculate_file_hash(media.filename, hash_algo=hashlib.sha256, chunksize=4096) + atlos_id = metadata.get("atlos_id") + if not atlos_id: + logger.error(f"No Atlos ID found in metadata; can't store {media.filename} in Atlos.") + return False + + media_hash = calculate_file_hash(media.filename, hash_algo=hashlib.sha256, chunksize=4096) # Check whether the media has already been uploaded - source_material = requests.get( - f"{self.atlos_url}/api/v2/source_material/{atlos_id}", - headers={"Authorization": f"Bearer {self.api_token}"}, - ).json()["result"] - existing_media = [x["file_hash_sha256"] for x in source_material.get("artifacts", [])] + source_material = self._get(f"/api/v2/source_material/{atlos_id}")["result"] + existing_media = [ + artifact.get("file_hash_sha256") + for artifact in source_material.get("artifacts", []) + ] if media_hash in existing_media: logger.info(f"{media.filename} with SHA256 {media_hash} already uploaded to Atlos") return True # Upload the media to the Atlos API - requests.post( - f"{self.atlos_url}/api/v2/source_material/upload/{atlos_id}", - headers={"Authorization": f"Bearer {self.api_token}"}, - params={ - "title": media.properties - }, - files={"file": (os.path.basename(media.filename), open(media.filename, "rb"))}, - ).raise_for_status() - + with open(media.filename, "rb") as file_obj: + self._post( + f"/api/v2/source_material/upload/{atlos_id}", + params={"title": media.properties}, + files={"file": (os.path.basename(media.filename), file_obj)}, + ) logger.info(f"Uploaded {media.filename} to Atlos with ID {atlos_id} and title {media.key}") - return True - # must be implemented even if unused def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: - pass + """Upload a file-like object; not implemented.""" + raise NotImplementedError("uploadf method is not implemented yet.") +