Add direct Atlos integration (#137)

* Add Atlos feeder

* Add Atlos db

* Add Atlos storage

* Fix Atlos storages

* Fix Atlos feeder

* Only include URLs in Atlos feeder once they're processed

* Remove print

* Add Atlos documentation to README

* Formatting fixes

* Don't archive existing material

* avoid KeyError in atlos_db

* version bump

---------

Co-authored-by: msramalho <19508417+msramalho@users.noreply.github.com>
pull/138/head v0.11.1
R. Miles McCain 2024-04-15 22:25:17 +04:00 zatwierdzone przez GitHub
rodzic eb37f0b45b
commit f603400d0d
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
16 zmienionych plików z 278 dodań i 18 usunięć

1
.gitignore vendored
Wyświetl plik

@ -28,3 +28,4 @@ orchestration.yaml
auto_archiver.egg-info* auto_archiver.egg-info*
logs* logs*
*.csv *.csv
archived/

Wyświetl plik

@ -177,6 +177,38 @@ To use Google Drive storage you need the id of the shared folder in the `config.
#### Telethon + Instagram with telegram bot #### Telethon + Instagram with telegram bot
The first time you run, you will be prompted to do a authentication with the phone number associated, alternatively you can put your `anon.session` in the root. The first time you run, you will be prompted to do a authentication with the phone number associated, alternatively you can put your `anon.session` in the root.
#### Atlos
When integrating with [Atlos](https://atlos.org), you will need to provide an API token in your configuration. You can learn more about Atlos and how to get an API token [here](https://docs.atlos.org/technical/api). You will have to provide this token to the `atlos_feeder`, `atlos_storage`, and `atlos_db` steps in your orchestration file. If you use a custom or self-hosted Atlos instance, you can also specify the `atlos_url` option to point to your custom instance's URL. For example:
```yaml
# orchestration.yaml content
steps:
feeder: atlos_feeder
archivers: # order matters
- youtubedl_archiver
enrichers:
- thumbnail_enricher
- hash_enricher
formatter: html_formatter
storages:
- atlos_storage
databases:
- console_db
- atlos_db
configurations:
atlos_feeder:
atlos_url: "https://platform.atlos.org" # optional
api_token: "...your API token..."
atlos_db:
atlos_url: "https://platform.atlos.org" # optional
api_token: "...your API token..."
atlos_storage:
atlos_url: "https://platform.atlos.org" # optional
api_token: "...your API token..."
hash_enricher:
algorithm: "SHA-256"
```
## Running on Google Sheets Feeder (gsheet_feeder) ## Running on Google Sheets Feeder (gsheet_feeder)
The `--gsheet_feeder.sheet` property is the name of the Google Sheet to check for URLs. The `--gsheet_feeder.sheet` property is the name of the Google Sheet to check for URLs.

Wyświetl plik

@ -25,10 +25,11 @@ class Media:
_mimetype: str = None # eg: image/jpeg _mimetype: str = None # eg: image/jpeg
_stored: bool = field(default=False, repr=False, metadata=config(exclude=lambda _: True)) # always exclude _stored: bool = field(default=False, repr=False, metadata=config(exclude=lambda _: True)) # always exclude
def store(self: Media, override_storages: List = None, url: str = "url-not-available"): def store(self: Media, override_storages: List = None, url: str = "url-not-available", metadata: Any = None):
# stores the media into the provided/available storages [Storage] # 'Any' typing for metadata to avoid circular imports. Stores the media
# repeats the process for its properties, in case they have inner media themselves # into the provided/available storages [Storage] repeats the process for
# for now it only goes down 1 level but it's easy to make it recursive if needed # its properties, in case they have inner media themselves for now it
# only goes down 1 level but it's easy to make it recursive if needed.
storages = override_storages or ArchivingContext.get("storages") storages = override_storages or ArchivingContext.get("storages")
if not len(storages): if not len(storages):
logger.warning(f"No storages found in local context or provided directly for {self.filename}.") logger.warning(f"No storages found in local context or provided directly for {self.filename}.")
@ -36,7 +37,7 @@ class Media:
for s in storages: for s in storages:
for any_media in self.all_inner_media(include_self=True): for any_media in self.all_inner_media(include_self=True):
s.store(any_media, url) s.store(any_media, url, metadata=metadata)
def all_inner_media(self, include_self=False): def all_inner_media(self, include_self=False):
""" Media can be inside media properties, examples include transformations on original media. """ Media can be inside media properties, examples include transformations on original media.

Wyświetl plik

@ -48,7 +48,7 @@ class Metadata:
self.remove_duplicate_media_by_hash() self.remove_duplicate_media_by_hash()
storages = override_storages or ArchivingContext.get("storages") storages = override_storages or ArchivingContext.get("storages")
for media in self.media: for media in self.media:
media.store(override_storages=storages, url=self.get_url()) media.store(override_storages=storages, url=self.get_url(), metadata=self)
def set(self, key: str, val: Any) -> Metadata: def set(self, key: str, val: Any) -> Metadata:
self.metadata[key] = val self.metadata[key] = val

Wyświetl plik

@ -120,7 +120,7 @@ class ArchivingOrchestrator:
# 6 - format and store formatted if needed # 6 - format and store formatted if needed
if (final_media := self.formatter.format(result)): if (final_media := self.formatter.format(result)):
final_media.store(url=url) final_media.store(url=url, metadata=result)
result.set_final_media(final_media) result.set_final_media(final_media)
if result.is_empty(): if result.is_empty():

Wyświetl plik

@ -3,3 +3,4 @@ from .gsheet_db import GsheetsDb
from .console_db import ConsoleDb from .console_db import ConsoleDb
from .csv_db import CSVDb from .csv_db import CSVDb
from .api_db import AAApiDb from .api_db import AAApiDb
from .atlos_db import AtlosDb

Wyświetl plik

@ -0,0 +1,79 @@
import os
from typing import Union
from loguru import logger
from csv import DictWriter
from dataclasses import asdict
import requests
from . import Database
from ..core import Metadata
from ..utils import get_atlos_config_options
class AtlosDb(Database):
"""
Outputs results to Atlos
"""
name = "atlos_db"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
@staticmethod
def configs() -> dict:
return get_atlos_config_options()
def failed(self, item: Metadata, reason: str) -> None:
"""Update DB accordingly for failure"""
# If the item has no Atlos ID, there's nothing for us to do
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={"metadata": {"processed": True, "status": "error", "error": reason}},
).raise_for_status()
logger.info(
f"Stored failure for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos: {reason}"
)
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check and fetch if the given item has been archived already, each
database should handle its own caching, and configuration mechanisms"""
return False
def _process_metadata(self, item: Metadata) -> dict:
"""Process metadata for storage on Atlos. Will convert any datetime
objects to ISO format."""
return {
k: v.isoformat() if hasattr(v, "isoformat") else v
for k, v in item.metadata.items()
}
def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={
"metadata": dict(
processed=True,
status="success",
results=self._process_metadata(item),
)
},
).raise_for_status()
logger.info(
f"Stored success for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos"
)

Wyświetl plik

@ -44,7 +44,7 @@ class WhisperEnricher(Enricher):
job_results = {} job_results = {}
for i, m in enumerate(to_enrich.media): for i, m in enumerate(to_enrich.media):
if m.is_video() or m.is_audio(): if m.is_video() or m.is_audio():
m.store(url=url) m.store(url=url, metadata=to_enrich)
try: try:
job_id = self.submit_job(m) job_id = self.submit_job(m)
job_results[job_id] = False job_results[job_id] = False

Wyświetl plik

@ -1,3 +1,4 @@
from.feeder import Feeder from.feeder import Feeder
from .gsheet_feeder import GsheetsFeeder from .gsheet_feeder import GsheetsFeeder
from .cli_feeder import CLIFeeder from .cli_feeder import CLIFeeder
from .atlos_feeder import AtlosFeeder

Wyświetl plik

@ -0,0 +1,56 @@
from loguru import logger
import requests
from . import Feeder
from ..core import Metadata, ArchivingContext
from ..utils import get_atlos_config_options
class AtlosFeeder(Feeder):
name = "atlos_feeder"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
if type(self.api_token) != str:
raise Exception("Atlos Feeder did not receive an Atlos API token")
@staticmethod
def configs() -> dict:
return get_atlos_config_options()
def __iter__(self) -> Metadata:
# Get all the urls from the Atlos API
count = 0
cursor = None
while True:
response = requests.get(
f"{self.atlos_url}/api/v2/source_material",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"cursor": cursor},
)
data = response.json()
response.raise_for_status()
cursor = data["next"]
for item in data["results"]:
if (
item["source_url"] not in [None, ""]
and (
item["metadata"]
.get("auto_archiver", {})
.get("processed", False)
!= True
)
and item["visibility"] == "visible"
and item["status"] not in ["processing", "pending"]
):
yield Metadata().set_url(item["source_url"]).set(
"atlos_id", item["id"]
)
count += 1
if len(data["results"]) == 0 or cursor is None:
break
logger.success(f"Processed {count} URL(s)")

Wyświetl plik

@ -2,3 +2,4 @@ from .storage import Storage
from .s3 import S3Storage from .s3 import S3Storage
from .local import LocalStorage from .local import LocalStorage
from .gd import GDriveStorage from .gd import GDriveStorage
from .atlos import AtlosStorage

Wyświetl plik

@ -0,0 +1,74 @@
import os
from typing import IO, List, Optional
from loguru import logger
import requests
import hashlib
from ..core import Media, Metadata
from ..storages import Storage
from ..utils import get_atlos_config_options
class AtlosStorage(Storage):
name = "atlos_storage"
def __init__(self, config: dict) -> None:
super().__init__(config)
@staticmethod
def configs() -> dict:
return dict(Storage.configs(), **get_atlos_config_options())
def get_cdn_url(self, _media: Media) -> str:
# It's not always possible to provide an exact URL, because it's
# possible that the media once uploaded could have been copied to
# another project.
return self.atlos_url
def _hash(self, media: Media) -> str:
# Hash the media file using sha-256. We don't use the existing auto archiver
# hash because there's no guarantee that the configuerer is using sha-256, which
# is how Atlos hashes files.
sha256 = hashlib.sha256()
with open(media.filename, "rb") as f:
while True:
buf = f.read(4096)
if not buf: break
sha256.update(buf)
return sha256.hexdigest()
def upload(self, media: Media, metadata: Optional[Metadata]=None, **_kwargs) -> bool:
atlos_id = metadata.get("atlos_id")
if atlos_id is None:
logger.error(f"No Atlos ID found in metadata; can't store {media.filename} on Atlos")
return False
media_hash = self._hash(media)
# Check whether the media has already been uploaded
source_material = requests.get(
f"{self.atlos_url}/api/v2/source_material/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
).json()["result"]
existing_media = [x["file_hash_sha256"] for x in source_material.get("artifacts", [])]
if media_hash in existing_media:
logger.info(f"{media.filename} with SHA256 {media_hash} already uploaded to Atlos")
return True
# Upload the media to the Atlos API
requests.post(
f"{self.atlos_url}/api/v2/source_material/upload/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
params={
"title": media.properties
},
files={"file": (os.path.basename(media.filename), open(media.filename, "rb"))},
).raise_for_status()
logger.info(f"Uploaded {media.filename} to Atlos with ID {atlos_id} and title {media.key}")
return True
# must be implemented even if unused
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass

Wyświetl plik

@ -1,12 +1,12 @@
from __future__ import annotations from __future__ import annotations
from abc import abstractmethod from abc import abstractmethod
from dataclasses import dataclass from dataclasses import dataclass
from typing import IO from typing import IO, Optional
import os import os
from ..utils.misc import random_str from ..utils.misc import random_str
from ..core import Media, Step, ArchivingContext from ..core import Media, Step, ArchivingContext, Metadata
from ..enrichers import HashEnricher from ..enrichers import HashEnricher
from loguru import logger from loguru import logger
from slugify import slugify from slugify import slugify
@ -43,12 +43,12 @@ class Storage(Step):
# only for typing... # only for typing...
return Step.init(name, config, Storage) return Step.init(name, config, Storage)
def store(self, media: Media, url: str) -> None: def store(self, media: Media, url: str, metadata: Optional[Metadata]=None) -> None:
if media.is_stored(): if media.is_stored():
logger.debug(f"{media.key} already stored, skipping") logger.debug(f"{media.key} already stored, skipping")
return return
self.set_key(media, url) self.set_key(media, url)
self.upload(media) self.upload(media, metadata=metadata)
media.add_url(self.get_cdn_url(media)) media.add_url(self.get_cdn_url(media))
@abstractmethod @abstractmethod

Wyświetl plik

@ -4,3 +4,4 @@ from .misc import *
from .webdriver import Webdriver from .webdriver import Webdriver
from .gsheet import Gsheets from .gsheet import Gsheets
from .url import UrlUtil from .url import UrlUtil
from .atlos import get_atlos_config_options

Wyświetl plik

@ -0,0 +1,13 @@
def get_atlos_config_options():
return {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"cli_set": lambda cli_val, _: cli_val
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"cli_set": lambda cli_val, _: cli_val
},
}

Wyświetl plik

@ -3,7 +3,7 @@ _MAJOR = "0"
_MINOR = "11" _MINOR = "11"
# On main and in a nightly release the patch should be one ahead of the last # On main and in a nightly release the patch should be one ahead of the last
# released build. # released build.
_PATCH = "0" _PATCH = "1"
# This is mainly for nightly builds which have the suffix ".dev$DATE". See # This is mainly for nightly builds which have the suffix ".dev$DATE". See
# https://semver.org/#is-v123-a-semantic-version for the semantics. # https://semver.org/#is-v123-a-semantic-version for the semantics.
_SUFFIX = "" _SUFFIX = ""