Merge branch 'more_mainifests' into load_modules

pull/224/head
Patrick Robertson 2025-01-27 11:05:56 +01:00
commit 14e2479599
140 zmienionych plików z 1183 dodań i 615 usunięć

Wyświetl plik

@ -218,7 +218,7 @@ configurations:
## Running on Google Sheets Feeder (gsheet_feeder)
The `--gsheet_feeder.sheet` property is the name of the Google Sheet to check for URLs.
This sheet must have been shared with the Google Service account used by `gspread`.
This sheet must also have specific columns (case-insensitive) in the `header` as specified in [Gsheet.configs](src/auto_archiver/utils/gsheet.py). The default names of these columns and their purpose is:
This sheet must also have specific columns (case-insensitive) in the `header` as specified in [gsheet_feeder.__manifest__.py](src/auto_archiver/modules/gsheet_feeder/__manifest__.py). The default names of these columns and their purpose is:
Inputs:

Wyświetl plik

@ -1,8 +0,0 @@
"""
Archivers are responsible for retrieving the content from various external platforms.
They act as specialized modules, each tailored to interact with a specific platform,
service, or data source. The archivers collectively enable the tool to comprehensively
collect and preserve a variety of content types, such as posts, images, videos and metadata.
"""
from .archiver import Archiver

Wyświetl plik

@ -0,0 +1,6 @@
from .database import Database
from .enricher import Enricher
from .feeder import Feeder
from .storage import Storage
from .extractor import Extractor
from .formatter import Formatter

Wyświetl plik

@ -3,13 +3,13 @@ from dataclasses import dataclass
from abc import abstractmethod, ABC
from typing import Union
from ..core import Metadata, Step
from auto_archiver.core import Metadata, Step
@dataclass
class Database(Step, ABC):
name = "database"
name = "database"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)

Wyświetl plik

@ -0,0 +1,31 @@
"""
Enrichers are modular components that enhance archived content by adding
context, metadata, or additional processing.
These add additional information to the context, such as screenshots, hashes, and metadata.
They are designed to work within the archiving pipeline, operating on `Metadata` objects after
the archiving step and before storage or formatting.
Enrichers are optional but highly useful for making the archived data more powerful.
"""
from __future__ import annotations
from dataclasses import dataclass
from abc import abstractmethod, ABC
from auto_archiver.core import Metadata, Step
@dataclass
class Enricher(Step, ABC):
"""Base classes and utilities for enrichers in the Auto-Archiver system."""
name = "enricher"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
# only for typing...
def init(name: str, config: dict) -> Enricher:
return Step.init(name, config, Enricher)
@abstractmethod
def enrich(self, to_enrich: Metadata) -> None: pass

Wyświetl plik

@ -1,7 +1,7 @@
""" The `archiver` module defines the base functionality for implementing archivers in the media archiving framework.
This class provides common utility methods and a standard interface for archivers.
""" The `extractor` module defines the base functionality for implementing extractors in the media archiving framework.
This class provides common utility methods and a standard interface for extractors.
Factory method to initialize an archiver instance based on its name.
Factory method to initialize an extractor instance based on its name.
"""
@ -15,32 +15,32 @@ import mimetypes, requests
from loguru import logger
from retrying import retry
from ..core import Metadata, Step, ArchivingContext
from ..core import Metadata, ArchivingContext
@dataclass
class Archiver:
class Extractor:
"""
Base class for implementing archivers in the media archiving framework.
Base class for implementing extractors in the media archiving framework.
Subclasses must implement the `download` method to define platform-specific behavior.
"""
def setup(self) -> None:
# used when archivers need to login or do other one-time setup
# used when extractors need to login or do other one-time setup
pass
def cleanup(self) -> None:
# called when archivers are done, or upon errors, cleanup any resources
# called when extractors are done, or upon errors, cleanup any resources
pass
def sanitize_url(self, url: str) -> str:
# used to clean unnecessary URL parameters OR unfurl redirect links
return url
def suitable(self, url: str) -> bool:
"""
Returns True if this archiver can handle the given URL
Returns True if this extractor can handle the given URL
Should be overridden by subclasses
"""
return True
@ -84,10 +84,10 @@ class Archiver:
for chunk in d.iter_content(chunk_size=8192):
f.write(chunk)
return to_filename
except requests.RequestException as e:
logger.warning(f"Failed to fetch the Media URL: {e}")
@abstractmethod
def download(self, item: Metadata) -> Metadata:
pass
pass

Wyświetl plik

@ -1,8 +1,8 @@
from __future__ import annotations
from dataclasses import dataclass
from abc import abstractmethod
from ..core import Metadata
from ..core import Step
from auto_archiver.core import Metadata
from auto_archiver.core import Step
@dataclass

Wyświetl plik

@ -1,7 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass
from abc import abstractmethod
from ..core import Metadata, Media, Step
from auto_archiver.core import Metadata, Media, Step
@dataclass

Wyświetl plik

@ -4,10 +4,10 @@ from dataclasses import dataclass
from typing import IO, Optional
import os
from ..utils.misc import random_str
from auto_archiver.utils.misc import random_str
from ..core import Media, Step, ArchivingContext, Metadata
from ..enrichers import HashEnricher
from auto_archiver.core import Media, Step, ArchivingContext, Metadata
from auto_archiver.modules.hash_enricher.hash_enricher import HashEnricher
from loguru import logger
from slugify import slugify
@ -15,29 +15,6 @@ from slugify import slugify
@dataclass
class Storage(Step):
name = "storage"
PATH_GENERATOR_OPTIONS = ["flat", "url", "random"]
FILENAME_GENERATOR_CHOICES = ["random", "static"]
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
assert self.path_generator in Storage.PATH_GENERATOR_OPTIONS, f"path_generator must be one of {Storage.PATH_GENERATOR_OPTIONS}"
assert self.filename_generator in Storage.FILENAME_GENERATOR_CHOICES, f"filename_generator must be one of {Storage.FILENAME_GENERATOR_CHOICES}"
@staticmethod
def configs() -> dict:
return {
"path_generator": {
"default": "url",
"help": "how to store the file in terms of directory structure: 'flat' sets to root; 'url' creates a directory based on the provided URL; 'random' creates a random directory.",
"choices": Storage.PATH_GENERATOR_OPTIONS
},
"filename_generator": {
"default": "random",
"help": "how to name stored files: 'random' creates a random string; 'static' uses a replicable strategy such as a hash.",
"choices": Storage.FILENAME_GENERATOR_CHOICES
}
}
def init(name: str, config: dict) -> Storage:
# only for typing...
@ -68,19 +45,27 @@ class Storage(Step):
folder = ArchivingContext.get("folder", "")
filename, ext = os.path.splitext(media.filename)
# path_generator logic
if self.path_generator == "flat":
# Handle path_generator logic
path_generator = ArchivingContext.get("path_generator", "url")
if path_generator == "flat":
path = ""
filename = slugify(filename) # in case it comes with os.sep
elif self.path_generator == "url": path = slugify(url)
elif self.path_generator == "random":
filename = slugify(filename) # Ensure filename is slugified
elif path_generator == "url":
path = slugify(url)
elif path_generator == "random":
path = ArchivingContext.get("random_path", random_str(24), True)
else:
raise ValueError(f"Invalid path_generator: {path_generator}")
# filename_generator logic
if self.filename_generator == "random": filename = random_str(24)
elif self.filename_generator == "static":
# Handle filename_generator logic
filename_generator = ArchivingContext.get("filename_generator", "random")
if filename_generator == "random":
filename = random_str(24)
elif filename_generator == "static":
he = HashEnricher({"hash_enricher": {"algorithm": ArchivingContext.get("hash_enricher.algorithm"), "chunksize": 1.6e7}})
hd = he.calculate_hash(media.filename)
filename = hd[:24]
else:
raise ValueError(f"Invalid filename_generator: {filename_generator}")
media.key = os.path.join(folder, path, f"{filename}{ext}")

Wyświetl plik

@ -20,7 +20,7 @@ from typing import Any, List
# configurable_parents = [
# Feeder,
# Enricher,
# Archiver,
# Extractor,
# Database,
# Storage,
# Formatter
@ -28,7 +28,7 @@ from typing import Any, List
# ]
# feeder: Feeder
# formatter: Formatter
# archivers: List[Archiver] = field(default_factory=[])
# extractors: List[Extractor] = field(default_factory=[])
# enrichers: List[Enricher] = field(default_factory=[])
# storages: List[Storage] = field(default_factory=[])
# databases: List[Database] = field(default_factory=[])

Wyświetl plik

@ -1,4 +0,0 @@
""" Databases are used to store the outputs from running the Autp Archiver.
"""

Wyświetl plik

@ -1,12 +0,0 @@
"""
Enrichers are modular components that enhance archived content by adding
context, metadata, or additional processing.
These add additional information to the context, such as screenshots, hashes, and metadata.
They are designed to work within the archiving pipeline, operating on `Metadata` objects after
the archiving step and before storage or formatting.
Enrichers are optional but highly useful for making the archived data more powerful.
"""

Wyświetl plik

@ -1,22 +0,0 @@
""" Base classes and utilities for enrichers in the Auto-Archiver system.
"""
from __future__ import annotations
from dataclasses import dataclass
from abc import abstractmethod, ABC
from ..core import Metadata, Step
@dataclass
class Enricher(Step, ABC):
name = "enricher"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
# only for typing...
def init(name: str, config: dict) -> Enricher:
return Step.init(name, config, Enricher)
@abstractmethod
def enrich(self, to_enrich: Metadata) -> None: pass

Wyświetl plik

@ -1,3 +0,0 @@
""" Feeders handle the input of media into the Auto Archiver.
"""

Wyświetl plik

@ -1,41 +0,0 @@
from loguru import logger
import csv
from . import Feeder
from ..core import Metadata, ArchivingContext
from ..utils import url_or_none
class CSVFeeder(Feeder):
@staticmethod
def configs() -> dict:
return {
"files": {
"default": None,
"help": "Path to the input file(s) to read the URLs from, comma separated. \
Input files should be formatted with one URL per line",
"cli_set": lambda cli_val, cur_val: list(set(cli_val.split(",")))
},
"column": {
"default": None,
"help": "Column number or name to read the URLs from, 0-indexed",
}
}
def __iter__(self) -> Metadata:
url_column = self.column or 0
for file in self.files:
with open(file, "r") as f:
reader = csv.reader(f)
first_row = next(reader)
if not(url_or_none(first_row[url_column])):
# it's a header row, skip it
logger.debug(f"Skipping header row: {first_row}")
for row in reader:
url = row[0]
logger.debug(f"Processing {url}")
yield Metadata().set_url(url)
ArchivingContext.set("folder", "cli")
logger.success(f"Processed {len(self.urls)} URL(s)")

Wyświetl plik

@ -1 +0,0 @@
""" Formatters for the output of the content. """

Wyświetl plik

@ -0,0 +1 @@
from api_db import AAApiDb

Wyświetl plik

@ -0,0 +1,33 @@
{
"name": "Auto-Archiver API Database",
"type": ["database"],
"entry_point": "api_db:AAApiDb",
"requires_setup": True,
"external_dependencies": {
"python": ["requests",
"loguru"],
},
"configs": {
"api_endpoint": {"default": None, "help": "API endpoint where calls are made to"},
"api_token": {"default": None, "help": "API Bearer token."},
"public": {"default": False, "help": "whether the URL should be publicly available via the API"},
"author_id": {"default": None, "help": "which email to assign as author"},
"group_id": {"default": None, "help": "which group of users have access to the archive in case public=false as author"},
"allow_rearchive": {"default": True, "help": "if False then the API database will be queried prior to any archiving operations and stop if the link has already been archived", "type": "bool",},
"store_results": {"default": True, "help": "when set, will send the results to the API database.", "type": "bool",},
"tags": {"default": [], "help": "what tags to add to the archived URL",}
},
"description": """
Provides integration with the Auto-Archiver API for querying and storing archival data.
### Features
- **API Integration**: Supports querying for existing archives and submitting results.
- **Duplicate Prevention**: Avoids redundant archiving when `allow_rearchive` is disabled.
- **Configurable**: Supports settings like API endpoint, authentication token, tags, and permissions.
- **Tagging and Metadata**: Adds tags and manages metadata for archives.
- **Optional Storage**: Archives results conditionally based on configuration.
### Setup
Requires access to an Auto-Archiver API instance and a valid API token.
""",
}

Wyświetl plik

@ -2,8 +2,8 @@ from typing import Union
import requests, os
from loguru import logger
from . import Database
from ..core import Metadata
from auto_archiver.base_processors import Database
from auto_archiver.core import Metadata
class AAApiDb(Database):
@ -19,18 +19,7 @@ class AAApiDb(Database):
self.store_results = bool(self.store_results)
self.assert_valid_string("api_endpoint")
@staticmethod
def configs() -> dict:
return {
"api_endpoint": {"default": None, "help": "API endpoint where calls are made to"},
"api_token": {"default": None, "help": "API Bearer token."},
"public": {"default": False, "help": "whether the URL should be publicly available via the API"},
"author_id": {"default": None, "help": "which email to assign as author"},
"group_id": {"default": None, "help": "which group of users have access to the archive in case public=false as author"},
"allow_rearchive": {"default": True, "help": "if False then the API database will be queried prior to any archiving operations and stop if the link has already been archived"},
"store_results": {"default": True, "help": "when set, will send the results to the API database."},
"tags": {"default": [], "help": "what tags to add to the archived URL", "cli_set": lambda cli_val, cur_val: set(cli_val.split(","))},
}
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
""" query the database for the existence of this item.
Helps avoid re-archiving the same URL multiple times.

Wyświetl plik

@ -0,0 +1 @@
from .atlos import AtlosStorage

Wyświetl plik

@ -0,0 +1,40 @@
{
"name": "atlos_storage",
"type": ["storage"],
"requires_setup": True,
"external_dependencies": {"python": ["loguru", "requests"], "bin": [""]},
"configs": {
"path_generator": {
"default": "url",
"help": "how to store the file in terms of directory structure: 'flat' sets to root; 'url' creates a directory based on the provided URL; 'random' creates a random directory.",
},
"filename_generator": {
"default": "random",
"help": "how to name stored files: 'random' creates a random string; 'static' uses a replicable strategy such as a hash.",
},
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"type": "str",
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"type": "str",
},
},
"description": """
AtlosStorage: A storage module for saving media files to the Atlos platform.
### Features
- Uploads media files to Atlos using Atlos-specific APIs.
- Automatically calculates SHA-256 hashes of media files for integrity verification.
- Skips uploads for files that already exist on Atlos with the same hash.
- Supports attaching metadata, such as `atlos_id`, to the uploaded files.
- Provides CDN-like URLs for accessing uploaded media.
### Notes
- Requires Atlos API configuration, including `atlos_url` and `api_token`.
- Files are linked to an `atlos_id` in the metadata, ensuring proper association with Atlos source materials.
""",
}

Wyświetl plik

@ -4,9 +4,9 @@ from loguru import logger
import requests
import hashlib
from ..core import Media, Metadata
from ..storages import Storage
from ..utils import get_atlos_config_options
from auto_archiver.core import Media, Metadata
from auto_archiver.base_processors import Storage
from auto_archiver.utils import get_atlos_config_options
class AtlosStorage(Storage):
@ -15,10 +15,6 @@ class AtlosStorage(Storage):
def __init__(self, config: dict) -> None:
super().__init__(config)
@staticmethod
def configs() -> dict:
return dict(Storage.configs(), **get_atlos_config_options())
def get_cdn_url(self, _media: Media) -> str:
# It's not always possible to provide an exact URL, because it's
# possible that the media once uploaded could have been copied to

Wyświetl plik

@ -0,0 +1 @@
from atlos_db import AtlosDb

Wyświetl plik

@ -0,0 +1,36 @@
{
"name": "Atlos Database",
"type": ["database"],
"entry_point": "atlos_db:AtlosDb",
"requires_setup": True,
"external_dependencies":
{"python": ["loguru",
""],
"bin": [""]},
"configs": {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"type": "str"
},
},
"description": """
Handles integration with the Atlos platform for managing archival results.
### Features
- Outputs archival results to the Atlos API for storage and tracking.
- Updates failure status with error details when archiving fails.
- Processes and formats metadata, including ISO formatting for datetime fields.
- Skips processing for items without an Atlos ID.
### Setup
Required configs:
- atlos_url: Base URL for the Atlos API.
- api_token: Authentication token for API access.
"""
,
}

Wyświetl plik

@ -1,13 +1,14 @@
import os
from typing import Union
from loguru import logger
from csv import DictWriter
from dataclasses import asdict
import requests
from . import Database
from ..core import Metadata
from ..utils import get_atlos_config_options
from auto_archiver.base_processors import Database
from auto_archiver.core import Metadata
from auto_archiver.utils import get_atlos_config_options
class AtlosDb(Database):
@ -21,10 +22,6 @@ class AtlosDb(Database):
# without this STEP.__init__ is not called
super().__init__(config)
@staticmethod
def configs() -> dict:
return get_atlos_config_options()
def failed(self, item: Metadata, reason: str) -> None:
"""Update DB accordingly for failure"""
# If the item has no Atlos ID, there's nothing for us to do

Wyświetl plik

@ -0,0 +1,13 @@
def get_atlos_config_options():
return {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"type": str
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"type": str
},
}

Wyświetl plik

@ -0,0 +1 @@
from .atlos_feeder import AtlosFeeder

Wyświetl plik

@ -0,0 +1,34 @@
{
"name": "Atlos Feeder",
"type": ["feeder"],
"requires_setup": True,
"external_dependencies": {
"python": ["loguru", "requests"],
},
"configs": {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"type": "str"
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"type": "str"
},
},
"description": """
AtlosFeeder: A feeder module that integrates with the Atlos API to fetch source material URLs for archival.
### Features
- Connects to the Atlos API to retrieve a list of source material URLs.
- Filters source materials based on visibility, processing status, and metadata.
- Converts filtered source materials into `Metadata` objects with the relevant `atlos_id` and URL.
- Iterates through paginated results using a cursor for efficient API interaction.
### Notes
- Requires an Atlos API endpoint and a valid API token for authentication.
- Ensures only unprocessed, visible, and ready-to-archive URLs are returned.
- Handles pagination transparently when retrieving data from the Atlos API.
"""
}

Wyświetl plik

@ -1,9 +1,9 @@
from loguru import logger
import requests
from . import Feeder
from ..core import Metadata, ArchivingContext
from ..utils import get_atlos_config_options
from auto_archiver.base_processors import Feeder
from auto_archiver.core import Metadata, ArchivingContext
from auto_archiver.utils import get_atlos_config_options
class AtlosFeeder(Feeder):
@ -15,10 +15,6 @@ class AtlosFeeder(Feeder):
if type(self.api_token) != str:
raise Exception("Atlos Feeder did not receive an Atlos API token")
@staticmethod
def configs() -> dict:
return get_atlos_config_options()
def __iter__(self) -> Metadata:
# Get all the urls from the Atlos API
count = 0

Wyświetl plik

@ -0,0 +1 @@
from .cli_feeder import CLIFeeder

Wyświetl plik

@ -0,0 +1,23 @@
{
"name": "CLI Feeder",
"type": ["feeder"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru"],
},
"configs": {
"urls": {
"default": None,
"help": "URL(s) to archive, either a single URL or a list of urls, should not come from config.yaml",
},
},
"description": """
Processes URLs to archive passed via the command line and feeds them into the archiving pipeline.
### Features
- Takes a single URL or a list of URLs provided via the command line.
- Converts each URL into a `Metadata` object and yields it for processing.
- Ensures URLs are processed only if they are explicitly provided.
"""
}

Wyświetl plik

@ -1,7 +1,7 @@
from loguru import logger
from . import Feeder
from ..core import Metadata, ArchivingContext
from auto_archiver.base_processors import Feeder
from auto_archiver.core import Metadata, ArchivingContext
class CLIFeeder(Feeder):
@ -13,16 +13,6 @@ class CLIFeeder(Feeder):
if type(self.urls) != list or len(self.urls) == 0:
raise Exception("CLI Feeder did not receive any URL to process")
@staticmethod
def configs() -> dict:
return {
"urls": {
"default": None,
"help": "URL(s) to archive, either a single URL or a list of urls, should not come from config.yaml",
"cli_set": lambda cli_val, cur_val: list(set(cli_val.split(",")))
},
}
def __iter__(self) -> Metadata:
for url in self.urls:
logger.debug(f"Processing {url}")

Wyświetl plik

@ -0,0 +1 @@
from .console_db import ConsoleDb

Wyświetl plik

@ -0,0 +1,22 @@
{
"name": "Console Database",
"type": ["database"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru"],
},
"description": """
Provides a simple database implementation that outputs archival results and status updates to the console.
### Features
- Logs the status of archival tasks directly to the console, including:
- started
- failed (with error details)
- aborted
- done (with optional caching status)
- Useful for debugging or lightweight setups where no external database is required.
### Setup
No additional configuration is required.
""",
}

Wyświetl plik

@ -1,7 +1,7 @@
from loguru import logger
from . import Database
from ..core import Metadata
from auto_archiver.base_processors import Database
from auto_archiver.core import Metadata
class ConsoleDb(Database):
@ -14,10 +14,6 @@ class ConsoleDb(Database):
# without this STEP.__init__ is not called
super().__init__(config)
@staticmethod
def configs() -> dict:
return {}
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")

Wyświetl plik

@ -0,0 +1 @@
from .csv_db import CSVDb

Wyświetl plik

@ -0,0 +1,22 @@
{
"name": "csv_db",
"type": ["database"],
"requires_setup": False,
"external_dependencies": {"python": ["loguru"]
},
"configs": {
"csv_file": {"default": "db.csv", "help": "CSV file name"}
},
"description": """
Handles exporting archival results to a CSV file.
### Features
- Saves archival metadata as rows in a CSV file.
- Automatically creates the CSV file with a header if it does not exist.
- Appends new metadata entries to the existing file.
### Setup
Required config:
- csv_file: Path to the CSV file where results will be stored (default: "db.csv").
""",
}

Wyświetl plik

@ -3,8 +3,8 @@ from loguru import logger
from csv import DictWriter
from dataclasses import asdict
from . import Database
from ..core import Metadata
from auto_archiver.base_processors import Database
from auto_archiver.core import Metadata
class CSVDb(Database):
@ -18,11 +18,6 @@ class CSVDb(Database):
super().__init__(config)
self.assert_valid_string("csv_file")
@staticmethod
def configs() -> dict:
return {
"csv_file": {"default": "db.csv", "help": "CSV file name"}
}
def done(self, item: Metadata, cached: bool=False) -> None:
"""archival result ready - should be saved to DB"""

Wyświetl plik

@ -0,0 +1 @@
from .csv_feeder import CSVFeeder

Wyświetl plik

@ -0,0 +1,32 @@
{
"name": "CSV Feeder",
"type": ["feeder"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru"],
"bin": [""]
},
"configs": {
"files": {
"default": None,
"help": "Path to the input file(s) to read the URLs from, comma separated. \
Input files should be formatted with one URL per line",
},
"column": {
"default": None,
"help": "Column number or name to read the URLs from, 0-indexed",
}
},
"description": """
Reads URLs from CSV files and feeds them into the archiving process.
### Features
- Supports reading URLs from multiple input files, specified as a comma-separated list.
- Allows specifying the column number or name to extract URLs from.
- Skips header rows if the first value is not a valid URL.
- Integrates with the `ArchivingContext` to manage URL feeding.
### Setu N
- Input files should be formatted with one URL per line.
"""
}

Wyświetl plik

@ -0,0 +1,27 @@
from loguru import logger
import csv
from auto_archiver.base_processors import Feeder
from auto_archiver.core import Metadata, ArchivingContext
from auto_archiver.utils import url_or_none
class CSVFeeder(Feeder):
name = "csv_feeder"
def __iter__(self) -> Metadata:
url_column = self.column or 0
for file in self.files:
with open(file, "r") as f:
reader = csv.reader(f)
first_row = next(reader)
if not(url_or_none(first_row[url_column])):
# it's a header row, skip it
logger.debug(f"Skipping header row: {first_row}")
for row in reader:
url = row[0]
logger.debug(f"Processing {url}")
yield Metadata().set_url(url)
ArchivingContext.set("folder", "cli")
logger.success(f"Processed {len(self.urls)} URL(s)")

Wyświetl plik

@ -0,0 +1 @@
from .gdrive_storage import GDriveStorage

Wyświetl plik

@ -0,0 +1,43 @@
{
"name": "Google Drive Storage",
"type": ["storage"],
"requires_setup": True,
"external_dependencies": {
"python": [
"loguru",
"google-api-python-client",
"google-auth",
"google-auth-oauthlib",
"google-auth-httplib2"
],
},
"configs": {
"path_generator": {
"default": "url",
"help": "how to store the file in terms of directory structure: 'flat' sets to root; 'url' creates a directory based on the provided URL; 'random' creates a random directory.",
"choices": ["flat", "url", "random"],
},
"filename_generator": {
"default": "random",
"help": "how to name stored files: 'random' creates a random string; 'static' uses a replicable strategy such as a hash.",
"choices": ["random", "static"],
},
"root_folder_id": {"default": None, "help": "root google drive folder ID to use as storage, found in URL: 'https://drive.google.com/drive/folders/FOLDER_ID'"},
"oauth_token": {"default": None, "help": "JSON filename with Google Drive OAuth token: check auto-archiver repository scripts folder for create_update_gdrive_oauth_token.py. NOTE: storage used will count towards owner of GDrive folder, therefore it is best to use oauth_token_filename over service_account."},
"service_account": {"default": "secrets/service_account.json", "help": "service account JSON file path, same as used for Google Sheets. NOTE: storage used will count towards the developer account."},
},
"description": """
GDriveStorage: A storage module for saving archived content to Google Drive.
### Features
- Saves media files to Google Drive, organizing them into folders based on the provided path structure.
- Supports OAuth token-based authentication or service account credentials for API access.
- Automatically creates folders in Google Drive if they don't exist.
- Retrieves CDN URLs for stored files, enabling easy sharing and access.
### Notes
- Requires setup with either a Google OAuth token or a service account JSON file.
- Files are uploaded to the specified `root_folder_id` and organized by the `media.key` structure.
- Automatically handles Google Drive API token refreshes for long-running jobs.
"""
}

Wyświetl plik

@ -9,8 +9,8 @@ from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from ..core import Media
from . import Storage
from auto_archiver.core import Media
from auto_archiver.base_processors import Storage
class GDriveStorage(Storage):
@ -58,16 +58,6 @@ class GDriveStorage(Storage):
self.service = build('drive', 'v3', credentials=creds)
@staticmethod
def configs() -> dict:
return dict(
Storage.configs(),
** {
"root_folder_id": {"default": None, "help": "root google drive folder ID to use as storage, found in URL: 'https://drive.google.com/drive/folders/FOLDER_ID'"},
"oauth_token": {"default": None, "help": "JSON filename with Google Drive OAuth token: check auto-archiver repository scripts folder for create_update_gdrive_oauth_token.py. NOTE: storage used will count towards owner of GDrive folder, therefore it is best to use oauth_token_filename over service_account."},
"service_account": {"default": "secrets/service_account.json", "help": "service account JSON file path, same as used for Google Sheets. NOTE: storage used will count towards the developer account."},
})
def get_cdn_url(self, media: Media) -> str:
"""
only support files saved in a folder for GD

Wyświetl plik

@ -1,17 +1,12 @@
import os
import mimetypes
import requests
from loguru import logger
from auto_archiver.core.context import ArchivingContext
from auto_archiver.archivers.archiver import Archiver
from auto_archiver.base_processors.extractor import Extractor
from auto_archiver.core.metadata import Metadata, Media
from .dropin import GenericDropin, InfoExtractor
class Bluesky(GenericDropin):
def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata:
def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Extractor, url: str) -> Metadata:
result = Metadata()
result.set_url(url)
result.set_title(post["record"]["text"])
@ -42,7 +37,7 @@ class Bluesky(GenericDropin):
def _download_bsky_embeds(self, post: dict, archiver: Archiver) -> list[Media]:
def _download_bsky_embeds(self, post: dict, archiver: Extractor) -> list[Media]:
"""
Iterates over image(s) or video in a Bluesky post and downloads them
"""

Wyświetl plik

@ -1,6 +1,6 @@
from yt_dlp.extractor.common import InfoExtractor
from auto_archiver.core.metadata import Metadata
from auto_archiver.archivers.archiver import Archiver
from auto_archiver.base_processors.extractor import Extractor
class GenericDropin:
"""Base class for dropins for the generic extractor.
@ -30,7 +30,7 @@ class GenericDropin:
raise NotImplementedError("This method should be implemented in the subclass")
def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata:
def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Extractor, url: str) -> Metadata:
"""
This method should create a Metadata object from the post data.
"""

Wyświetl plik

@ -5,10 +5,10 @@ from yt_dlp.extractor.common import InfoExtractor
from loguru import logger
from auto_archiver.archivers.archiver import Archiver
from auto_archiver.base_processors.extractor import Extractor
from ...core import Metadata, Media, ArchivingContext
class GenericExtractor(Archiver):
class GenericExtractor(Extractor):
name = "youtubedl_archiver" #left as is for backwards compat
_dropins = {}

Wyświetl plik

@ -2,7 +2,7 @@ from typing import Type
from auto_archiver.utils import traverse_obj
from auto_archiver.core.metadata import Metadata, Media
from auto_archiver.archivers.archiver import Archiver
from auto_archiver.base_processors.extractor import Extractor
from yt_dlp.extractor.common import InfoExtractor
from dateutil.parser import parse as parse_dt
@ -19,7 +19,7 @@ class Truth(GenericDropin):
def skip_ytdlp_download(self, url, ie_instance: Type[InfoExtractor]) -> bool:
return True
def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata:
def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Extractor, url: str) -> Metadata:
"""
Creates metadata from a truth social post

Wyświetl plik

@ -6,7 +6,7 @@ from slugify import slugify
from auto_archiver.core.metadata import Metadata, Media
from auto_archiver.utils import UrlUtil
from auto_archiver.archivers.archiver import Archiver
from auto_archiver.base_processors.extractor import Extractor
from .dropin import GenericDropin, InfoExtractor
@ -32,7 +32,7 @@ class Twitter(GenericDropin):
twid = ie_instance._match_valid_url(url).group('id')
return ie_instance._extract_status(twid=twid)
def create_metadata(self, tweet: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata:
def create_metadata(self, tweet: dict, ie_instance: InfoExtractor, archiver: Extractor, url: str) -> Metadata:
result = Metadata()
try:
if not tweet.get("user") or not tweet.get("created_at"):

Wyświetl plik

@ -0,0 +1 @@
from .gsheet_db import GsheetsDb

Wyświetl plik

@ -0,0 +1,36 @@
{
"name": "Google Sheets Database",
"type": ["database"],
"requires_setup": True,
"external_dependencies": {
"python": ["loguru", "gspread", "python-slugify"],
},
"configs": {
"allow_worksheets": {
"default": set(),
"help": "(CSV) only worksheets whose name is included in allow are included (overrides worksheet_block), leave empty so all are allowed",
},
"block_worksheets": {
"default": set(),
"help": "(CSV) explicitly block some worksheets from being processed",
},
"use_sheet_names_in_stored_paths": {
"default": True,
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
}
},
"description": """
GsheetsDatabase:
Handles integration with Google Sheets for tracking archival tasks.
### Features
- Updates a Google Sheet with the status of the archived URLs, including in progress, success or failure, and method used.
- Saves metadata such as title, text, timestamp, hashes, screenshots, and media URLs to designated columns.
- Formats media-specific metadata, such as thumbnails and PDQ hashes for the sheet.
- Skips redundant updates for empty or invalid data fields.
### Notes
- Currently works only with metadata provided by GsheetFeeder.
- Requires configuration of a linked Google Sheet and appropriate API credentials.
"""
}

Wyświetl plik

@ -1,12 +1,13 @@
from typing import Union, Tuple
import datetime
from urllib.parse import quote
from loguru import logger
from . import Database
from ..core import Metadata, Media, ArchivingContext
from ..utils import GWorksheet
from auto_archiver.base_processors import Database
from auto_archiver.core import Metadata, Media, ArchivingContext
from auto_archiver.modules.gsheet_feeder import GWorksheet
class GsheetsDb(Database):
@ -20,10 +21,6 @@ class GsheetsDb(Database):
# without this STEP.__init__ is not called
super().__init__(config)
@staticmethod
def configs() -> dict:
return {}
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")
gw, row = self._retrieve_gsheet(item)
@ -108,5 +105,4 @@ class GsheetsDb(Database):
elif self.sheet_id:
print(self.sheet_id)
return gw, row

Wyświetl plik

@ -0,0 +1,2 @@
from .gworksheet import GWorksheet
from .gsheet_feeder import GsheetsFeeder

Wyświetl plik

@ -0,0 +1,65 @@
{
"name": "Google Sheets Feeder",
"type": ["feeder"],
"entry_point": "GsheetsFeeder",
"requires_setup": True,
"external_dependencies": {
"python": ["loguru", "gspread", "python-slugify"],
},
"configs": {
"sheet": {"default": None, "help": "name of the sheet to archive"},
"sheet_id": {"default": None, "help": "(alternative to sheet name) the id of the sheet to archive"},
"header": {"default": 1, "help": "index of the header row (starts at 1)"},
"service_account": {"default": "secrets/service_account.json", "help": "service account JSON file path"},
"columns": {
"default": {
'url': 'link',
'status': 'archive status',
'folder': 'destination folder',
'archive': 'archive location',
'date': 'archive date',
'thumbnail': 'thumbnail',
'timestamp': 'upload timestamp',
'title': 'upload title',
'text': 'text content',
'screenshot': 'screenshot',
'hash': 'hash',
'pdq_hash': 'perceptual hashes',
'wacz': 'wacz',
'replaywebpage': 'replaywebpage',
},
"help": "names of columns in the google sheet (stringified JSON object)",
"type": "auto_archiver.utils.json_loader",
},
"allow_worksheets": {
"default": set(),
"help": "(CSV) only worksheets whose name is included in allow are included (overrides worksheet_block), leave empty so all are allowed",
},
"block_worksheets": {
"default": set(),
"help": "(CSV) explicitly block some worksheets from being processed",
},
"use_sheet_names_in_stored_paths": {
"default": True,
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
"type": "bool",
}
},
"description": """
GsheetsFeeder
A Google Sheets-based feeder for the Auto Archiver.
This reads data from Google Sheets and filters rows based on user-defined rules.
The filtered rows are processed into `Metadata` objects.
### Features
- Validates the sheet structure and filters rows based on input configurations.
- Processes only worksheets allowed by the `allow_worksheets` and `block_worksheets` configurations.
- Ensures only rows with valid URLs and unprocessed statuses are included for archival.
- Supports organizing stored files into folder paths based on sheet and worksheet names.
### Notes
- Requires a Google Service Account JSON file for authentication. Suggested location is `secrets/gsheets_service_account.json`.
- Create the sheet using the template provided in the docs.
"""
}

Wyświetl plik

@ -8,45 +8,62 @@ The filtered rows are processed into `Metadata` objects.
- validates the sheet's structure and filters rows based on input configurations.
- Ensures only rows with valid URLs and unprocessed statuses are included.
"""
import gspread, os
import os
import gspread
from loguru import logger
from slugify import slugify
# from . import Enricher
from . import Feeder
from ..core import Metadata, ArchivingContext
from ..utils import Gsheets, GWorksheet
from auto_archiver.base_processors import Feeder
from auto_archiver.core import Metadata, ArchivingContext
from . import GWorksheet
class GsheetsFeeder(Gsheets, Feeder):
class GsheetsFeeder(Feeder):
name = "gsheet_feeder"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
self.gsheets_client = gspread.service_account(filename=self.service_account)
def __init__(self) -> None:
"""
Initializes the GsheetsFeeder with preloaded configurations.
"""
super().__init__()
# Initialize the gspread client with the provided service account file
# self.gsheets_client = gspread.service_account(filename=self.config["service_account"])
#
# # Set up feeder-specific configurations from the config
# self.sheet_name = config.get("sheet")
# self.sheet_id = config.get("sheet_id")
# self.header = config.get("header", 1)
# self.columns = config.get("columns", {})
# assert self.sheet_name or self.sheet_id, (
# "You need to define either a 'sheet' name or a 'sheet_id' in your manifest."
# )
# # Configuration attributes
# self.sheet = config.get("sheet")
# self.sheet_id = config.get("sheet_id")
# self.header = config.get("header", 1)
# self.columns = config.get("columns", {})
# self.allow_worksheets = config.get("allow_worksheets", set())
# self.block_worksheets = config.get("block_worksheets", set())
# self.use_sheet_names_in_stored_paths = config.get("use_sheet_names_in_stored_paths", True)
# Ensure the header is an integer
# try:
# self.header = int(self.header)
# except ValueError:
# pass
# assert isinstance(self.header, int), f"Header must be an integer, got {type(self.header)}"
# assert self.sheet or self.sheet_id, "Either 'sheet' or 'sheet_id' must be defined."
#
def open_sheet(self):
if self.sheet:
return self.gsheets_client.open(self.sheet)
else: # self.sheet_id
return self.gsheets_client.open_by_key(self.sheet_id)
@staticmethod
def configs() -> dict:
return dict(
Gsheets.configs(),
** {
"allow_worksheets": {
"default": set(),
"help": "(CSV) only worksheets whose name is included in allow are included (overrides worksheet_block), leave empty so all are allowed",
"cli_set": lambda cli_val, cur_val: set(cli_val.split(","))
},
"block_worksheets": {
"default": set(),
"help": "(CSV) explicitly block some worksheets from being processed",
"cli_set": lambda cli_val, cur_val: set(cli_val.split(","))
},
"use_sheet_names_in_stored_paths": {
"default": True,
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
}
})
def __iter__(self) -> Metadata:
sh = self.open_sheet()

Wyświetl plik

@ -0,0 +1 @@
from .hash_enricher import HashEnricher

Wyświetl plik

@ -0,0 +1,28 @@
{
"name": "Hash Enricher",
"type": ["enricher"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru"],
},
"configs": {
"algorithm": {"default": "SHA-256", "help": "hash algorithm to use", "choices": ["SHA-256", "SHA3-512"]},
# TODO add non-negative requirement to match previous implementation?
"chunksize": {"default": 1.6e7, "help": "number of bytes to use when reading files in chunks (if this value is too large you will run out of RAM), default is 16MB"},
},
"description": """
Generates cryptographic hashes for media files to ensure data integrity and authenticity.
### Features
- Calculates cryptographic hashes (SHA-256 or SHA3-512) for media files stored in `Metadata` objects.
- Ensures content authenticity, integrity validation, and duplicate identification.
- Efficiently processes large files by reading file bytes in configurable chunk sizes.
- Supports dynamic configuration of hash algorithms and chunk sizes.
- Updates media metadata with the computed hash value in the format `<algorithm>:<hash>`.
### Notes
- Default hash algorithm is SHA-256, but SHA3-512 is also supported.
- Chunk size defaults to 16 MB but can be adjusted based on memory requirements.
- Useful for workflows requiring hash-based content validation or deduplication.
""",
}

Wyświetl plik

@ -10,8 +10,8 @@ making it suitable for handling large files efficiently.
import hashlib
from loguru import logger
from . import Enricher
from ..core import Metadata, ArchivingContext
from auto_archiver.base_processors import Enricher
from auto_archiver.core import Metadata, ArchivingContext
class HashEnricher(Enricher):
@ -40,18 +40,15 @@ class HashEnricher(Enricher):
else:
self.chunksize = self.configs()["chunksize"]["default"]
self.chunksize = int(self.chunksize)
try:
self.chunksize = int(self.chunksize)
except ValueError:
raise ValueError(f"Invalid chunksize value: {self.chunksize}. Must be an integer.")
assert self.chunksize >= -1, "read length must be non-negative or -1"
ArchivingContext.set("hash_enricher.algorithm", self.algorithm, keep_on_reset=True)
@staticmethod
def configs() -> dict:
return {
"algorithm": {"default": "SHA-256", "help": "hash algorithm to use", "choices": ["SHA-256", "SHA3-512"]},
"chunksize": {"default": int(1.6e7), "help": "number of bytes to use when reading files in chunks (if this value is too large you will run out of RAM), default is 16MB"},
}
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
logger.debug(f"calculating media hashes for {url=} (using {self.algorithm})")

Wyświetl plik

@ -0,0 +1 @@
from .html_formatter import HtmlFormatter

Wyświetl plik

@ -0,0 +1,13 @@
{
"name": "HTML Formatter",
"type": ["formatter"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru", "jinja2"],
"bin": [""]
},
"configs": {
"detect_thumbnails": {"default": True, "help": "if true will group by thumbnails generated by thumbnail enricher by id 'thumbnail_00'"}
},
"description": """ """,
}

Wyświetl plik

@ -7,11 +7,11 @@ from loguru import logger
import json
import base64
from ..version import __version__
from ..core import Metadata, Media, ArchivingContext
from . import Formatter
from ..enrichers import HashEnricher
from ..utils.misc import random_str
from auto_archiver.version import __version__
from auto_archiver.core import Metadata, Media, ArchivingContext
from auto_archiver.base_processors import Formatter
from auto_archiver.modules.hash_enricher import HashEnricher
from auto_archiver.utils.misc import random_str
@dataclass
@ -28,12 +28,6 @@ class HtmlFormatter(Formatter):
})
self.template = self.environment.get_template("html_template.html")
@staticmethod
def configs() -> dict:
return {
"detect_thumbnails": {"default": True, "help": "if true will group by thumbnails generated by thumbnail enricher by id 'thumbnail_00'"}
}
def format(self, item: Metadata) -> Media:
url = item.get_url()
if item.is_empty():

Wyświetl plik

@ -0,0 +1 @@
from .instagram_api_extractor import InstagramAPIExtractor

Wyświetl plik

@ -1,15 +1,13 @@
{
"name": "Instagram API Archiver",
"name": "Instagram API Extractor",
"type": ["extractor"],
"entry_point": "instagram_api_archiver:InstagramApiArchiver",
"depends": ["core"],
"external_dependencies":
{"python": ["requests",
"loguru",
"retrying",
"tqdm",],
},
"no_setup_required": False,
"requires_setup": True,
"configs": {
"access_token": {"default": None, "help": "a valid instagrapi-api token"},
"api_endpoint": {"default": None, "help": "API endpoint to use"},
@ -26,5 +24,22 @@
"help": "if true, will remove empty values from the json output",
},
},
"description": "",
"description": """
Archives various types of Instagram content using the Instagrapi API.
### Features
- Connects to an Instagrapi API deployment to fetch Instagram profiles, posts, stories, highlights, reels, and tagged content.
- Supports advanced configuration options, including:
- Full profile download (all posts, stories, highlights, and tagged content).
- Limiting the number of posts to fetch for large profiles.
- Minimising JSON output to remove empty fields and redundant data.
- Provides robust error handling and retries for API calls.
- Ensures efficient media scraping, including handling nested or carousel media items.
- Adds downloaded media and metadata to the result for further processing.
### Notes
- Requires a valid Instagrapi API token (`access_token`) and API endpoint (`api_endpoint`).
- Full-profile downloads can be limited by setting `full_profile_max_posts`.
- Designed to fetch content in batches for large profiles, minimising API load.
""",
}

Wyświetl plik

@ -1,5 +1,5 @@
"""
The `instagram_api_archiver` module provides tools for archiving various types of Instagram content
The `instagram_api_extractor` module provides tools for archiving various types of Instagram content
using the [Instagrapi API](https://github.com/subzeroid/instagrapi).
Connects to an Instagrapi API deployment and allows for downloading Instagram user profiles,
@ -16,19 +16,19 @@ from loguru import logger
from retrying import retry
from tqdm import tqdm
from auto_archiver.archivers import Archiver
from auto_archiver.base_processors import Extractor
from auto_archiver.core import Media
from auto_archiver.core import Metadata
class InstagramAPIArchiver(Archiver):
class InstagramAPIExtractor(Extractor):
"""
Uses an https://github.com/subzeroid/instagrapi API deployment to fetch instagram posts data
# TODO: improvement collect aggregates of locations[0].location and mentions for all posts
"""
name = "instagram_api_archiver"
name = "instagram_api_extractor"
global_pattern = re.compile(
r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com)\/(stories(?:\/highlights)?|p|reel)?\/?([^\/\?]*)\/?(\d+)?"
@ -45,25 +45,6 @@ class InstagramAPIArchiver(Archiver):
self.full_profile = bool(self.full_profile)
self.minimize_json_output = bool(self.minimize_json_output)
@staticmethod
def configs() -> dict:
return {
"access_token": {"default": None, "help": "a valid instagrapi-api token"},
"api_endpoint": {"default": None, "help": "API endpoint to use"},
"full_profile": {
"default": False,
"help": "if true, will download all posts, tagged posts, stories, and highlights for a profile, if false, will only download the profile pic and information.",
},
"full_profile_max_posts": {
"default": 0,
"help": "Use to limit the number of posts to download when full_profile is true. 0 means no limit. limit is applied softly since posts are fetched in batch, once to: posts, tagged posts, and highlights",
},
"minimize_json_output": {
"default": True,
"help": "if true, will remove empty values from the json output",
},
}
def download(self, item: Metadata) -> Metadata:
url = item.get_url()

Wyświetl plik

@ -0,0 +1 @@
from .instagram_extractor import InstagramExtractor

Wyświetl plik

@ -1,13 +1,13 @@
{
"name": "Instagram Archiver",
"name": "Instagram Extractor",
"type": ["extractor"],
"entry_point": "instagram_archiver:InstagramArchiver",
"depends": ["core"],
"external_dependencies": {
"python": ["instaloader",
"loguru",],
"python": [
"instaloader",
"loguru",
],
},
"no_setup_required": False,
"requires_setup": True,
"configs": {
"username": {"default": None, "help": "a valid Instagram username"},
"password": {

Wyświetl plik

@ -7,15 +7,15 @@ import re, os, shutil, traceback
import instaloader # https://instaloader.github.io/as-module.html
from loguru import logger
from auto_archiver.archivers import Archiver
from auto_archiver.base_processors import Extractor
from auto_archiver.core import Metadata
from auto_archiver.core import Media
class InstagramArchiver(Archiver):
class InstagramExtractor(Extractor):
"""
Uses Instaloader to download either a post (inc images, videos, text) or as much as possible from a profile (posts, stories, highlights, ...)
"""
name = "instagram_archiver"
name = "instagram_extractor"
# NB: post regex should be tested before profile
# https://regex101.com/r/MGPquX/1
@ -45,16 +45,7 @@ class InstagramArchiver(Archiver):
except Exception as e2:
logger.error(f"Unable to finish login (retrying from file): {e2}\n{traceback.format_exc()}")
@staticmethod
def configs() -> dict:
return {
"username": {"default": None, "help": "a valid Instagram username"},
"password": {"default": None, "help": "the corresponding Instagram account password"},
"download_folder": {"default": "instaloader", "help": "name of a folder to temporarily download content to"},
"session_file": {"default": "secrets/instaloader.session", "help": "path to the instagram session which saves session credentials"},
#TODO: fine-grain
# "download_stories": {"default": True, "help": "if the link is to a user profile: whether to get stories information"},
}
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
@ -76,7 +67,7 @@ class InstagramArchiver(Archiver):
elif len(profile_matches):
result = self.download_profile(url, profile_matches[0])
except Exception as e:
logger.error(f"Failed to download with instagram archiver due to: {e}, make sure your account credentials are valid.")
logger.error(f"Failed to download with instagram extractor due to: {e}, make sure your account credentials are valid.")
finally:
shutil.rmtree(self.download_folder, ignore_errors=True)
return result

Wyświetl plik

@ -0,0 +1 @@
from .instagram_tbot_extractor import InstagramTbotExtractor

Wyświetl plik

@ -1,8 +1,6 @@
{
"name": "Instagram Telegram Bot Archiver",
"name": "Instagram Telegram Bot Extractor",
"type": ["extractor"],
"entry_point": "instagram_tbot_archiver:InstagramTbotArchiver",
"depends": ["core", "utils"],
"external_dependencies": {"python": ["loguru",
"telethon",],
},
@ -14,7 +12,7 @@
"timeout": {"default": 45, "help": "timeout to fetch the instagram content in seconds."},
},
"description": """
The `InstagramTbotArchiver` module uses a Telegram bot (`instagram_load_bot`) to fetch and archive Instagram content,
The `InstagramTbotExtractor` module uses a Telegram bot (`instagram_load_bot`) to fetch and archive Instagram content,
such as posts and stories. It leverages the Telethon library to interact with the Telegram API, sending Instagram URLs
to the bot and downloading the resulting media and metadata. The downloaded content is stored as `Media` objects and
returned as part of a `Metadata` object.
@ -27,7 +25,7 @@ returned as part of a `Metadata` object.
### Setup
To use the `InstagramTbotArchiver`, you need to provide the following configuration settings:
To use the `InstagramTbotExtractor`, you need to provide the following configuration settings:
- **API ID and Hash**: Telegram API credentials obtained from [my.telegram.org/apps](https://my.telegram.org/apps).
- **Session File**: Optional path to store the Telegram session file for future use.

Wyświetl plik

@ -1,5 +1,5 @@
"""
InstagramTbotArchiver Module
InstagramTbotExtractor Module
This module provides functionality to archive Instagram content (posts, stories, etc.) using a Telegram bot (`instagram_load_bot`).
It interacts with the Telegram API via the Telethon library to send Instagram URLs to the bot, which retrieves the
@ -15,18 +15,18 @@ from sqlite3 import OperationalError
from loguru import logger
from telethon.sync import TelegramClient
from auto_archiver.archivers import Archiver
from auto_archiver.base_processors import Extractor
from auto_archiver.core import Metadata, Media, ArchivingContext
from auto_archiver.utils import random_str
class InstagramTbotArchiver(Archiver):
class InstagramTbotExtractor(Extractor):
"""
calls a telegram bot to fetch instagram posts/stories... and gets available media from it
https://github.com/adw0rd/instagrapi
https://t.me/instagram_load_bot
"""
name = "instagram_tbot_archiver"
name = "instagram_tbot_extractor"
def __init__(self, config: dict) -> None:
super().__init__(config)
@ -34,15 +34,6 @@ class InstagramTbotArchiver(Archiver):
self.assert_valid_string("api_hash")
self.timeout = int(self.timeout)
@staticmethod
def configs() -> dict:
return {
"api_id": {"default": None, "help": "telegram API_ID value, go to https://my.telegram.org/apps"},
"api_hash": {"default": None, "help": "telegram API_HASH value, go to https://my.telegram.org/apps"},
"session_file": {"default": "secrets/anon-insta", "help": "optional, records the telegram login session for future usage, '.session' will be appended to the provided value."},
"timeout": {"default": 45, "help": "timeout to fetch the instagram content in seconds."},
}
def setup(self) -> None:
"""
1. makes a copy of session_file that is removed in cleanup
@ -58,7 +49,7 @@ class InstagramTbotArchiver(Archiver):
try:
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
except OperationalError as e:
logger.error(f"Unable to access the {self.session_file} session, please make sure you don't use the same session file here and in telethon_archiver. if you do then disable at least one of the archivers for the 1st time you setup telethon session: {e}")
logger.error(f"Unable to access the {self.session_file} session, please make sure you don't use the same session file here and in telethon_extractor. if you do then disable at least one of the archivers for the 1st time you setup telethon session: {e}")
with self.client.start():
logger.success(f"SETUP {self.name} login works.")

Wyświetl plik

@ -0,0 +1 @@
from .local import LocalStorage

Wyświetl plik

@ -0,0 +1,35 @@
{
"name": "Local Storage",
"type": ["storage"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru"],
},
"configs": {
"path_generator": {
"default": "url",
"help": "how to store the file in terms of directory structure: 'flat' sets to root; 'url' creates a directory based on the provided URL; 'random' creates a random directory.",
"choices": ["flat", "url", "random"],
},
"filename_generator": {
"default": "random",
"help": "how to name stored files: 'random' creates a random string; 'static' uses a replicable strategy such as a hash.",
"choices": ["random", "static"],
},
"save_to": {"default": "./archived", "help": "folder where to save archived content"},
"save_absolute": {"default": False, "help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (WARN: leaks the file structure)"},
},
"description": """
LocalStorage: A storage module for saving archived content locally on the filesystem.
### Features
- Saves archived media files to a specified folder on the local filesystem.
- Maintains file metadata during storage using `shutil.copy2`.
- Supports both absolute and relative paths for stored files, configurable via `save_absolute`.
- Automatically creates directories as needed for storing files.
### Notes
- Default storage folder is `./archived`, but this can be changed via the `save_to` configuration.
- The `save_absolute` option can reveal the file structure in output formats; use with caution.
"""
}

Wyświetl plik

@ -4,8 +4,8 @@ from typing import IO
import os
from loguru import logger
from ..core import Media
from ..storages import Storage
from auto_archiver.core import Media
from auto_archiver.base_processors import Storage
class LocalStorage(Storage):
@ -15,15 +15,6 @@ class LocalStorage(Storage):
super().__init__(config)
os.makedirs(self.save_to, exist_ok=True)
@staticmethod
def configs() -> dict:
return dict(
Storage.configs(),
** {
"save_to": {"default": "./archived", "help": "folder where to save archived content"},
"save_absolute": {"default": False, "help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (WARN: leaks the file structure)"},
})
def get_cdn_url(self, media: Media) -> str:
# TODO: is this viable with Storage.configs on path/filename?
dest = os.path.join(self.save_to, media.key)

Wyświetl plik

@ -0,0 +1 @@
from .meta_enricher import MetaEnricher

Wyświetl plik

@ -0,0 +1,22 @@
{
"name": "Archive Metadata Enricher",
"type": ["enricher"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru"],
},
"description": """
Adds metadata information about the archive operations, Adds metadata about archive operations, including file sizes and archive duration./
To be included at the end of all enrichments.
### Features
- Calculates the total size of all archived media files, storing the result in human-readable and byte formats.
- Computes the duration of the archival process, storing the elapsed time in seconds.
- Ensures all enrichments are performed only if the `Metadata` object contains valid data.
- Adds detailed metadata to provide insights into file sizes and archival performance.
### Notes
- Skips enrichment if no media or metadata is available in the `Metadata` object.
- File sizes are calculated using the `os.stat` module, ensuring accurate byte-level reporting.
""",
}

Wyświetl plik

@ -2,8 +2,8 @@ import datetime
import os
from loguru import logger
from . import Enricher
from ..core import Metadata
from auto_archiver.base_processors import Enricher
from auto_archiver.core import Metadata
class MetaEnricher(Enricher):
@ -17,10 +17,6 @@ class MetaEnricher(Enricher):
# without this STEP.__init__ is not called
super().__init__(config)
@staticmethod
def configs() -> dict:
return {}
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
if to_enrich.is_empty():
@ -28,7 +24,7 @@ class MetaEnricher(Enricher):
return
logger.debug(f"calculating archive metadata information for {url=}")
self.enrich_file_sizes(to_enrich)
self.enrich_archive_duration(to_enrich)
@ -40,10 +36,10 @@ class MetaEnricher(Enricher):
media.set("bytes", file_stats.st_size)
media.set("size", self.human_readable_bytes(file_stats.st_size))
total_size += file_stats.st_size
to_enrich.set("total_bytes", total_size)
to_enrich.set("total_size", self.human_readable_bytes(total_size))
def human_readable_bytes(self, size: int) -> str:
# receives number of bytes and returns human readble size

Wyświetl plik

@ -0,0 +1 @@
from .metadata_enricher import MetadataEnricher

Wyświetl plik

@ -0,0 +1,22 @@
{
"name": "Media Metadata Enricher",
"type": ["enricher"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru"],
"bin": ["exiftool"]
},
"description": """
Extracts metadata information from files using ExifTool.
### Features
- Uses ExifTool to extract detailed metadata from media files.
- Processes file-specific data like camera settings, geolocation, timestamps, and other embedded metadata.
- Adds extracted metadata to the corresponding `Media` object within the `Metadata`.
### Notes
- Requires ExifTool to be installed and accessible via the system's PATH.
- Skips enrichment for files where metadata extraction fails.
"""
}

Wyświetl plik

@ -2,8 +2,8 @@ import subprocess
import traceback
from loguru import logger
from . import Enricher
from ..core import Metadata
from auto_archiver.base_processors import Enricher
from auto_archiver.core import Metadata
class MetadataEnricher(Enricher):
@ -16,9 +16,6 @@ class MetadataEnricher(Enricher):
# without this STEP.__init__ is not called
super().__init__(config)
@staticmethod
def configs() -> dict:
return {}
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()

Wyświetl plik

@ -0,0 +1 @@
from .mute_formatter import MuteFormatter

Wyświetl plik

@ -0,0 +1,9 @@
m = {
"name": "Mute Formatter",
"type": ["formatter"],
"requires_setup": False,
"external_dependencies": {
},
"description": """ Default formatter.
""",
}

Wyświetl plik

@ -0,0 +1 @@
from .pdq_hash_enricher import PdqHashEnricher

Wyświetl plik

@ -0,0 +1,21 @@
{
"name": "PDQ Hash Enricher",
"type": ["enricher"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru", "pdqhash", "numpy", "Pillow"],
},
"description": """
PDQ Hash Enricher for generating perceptual hashes of media files.
### Features
- Calculates perceptual hashes for image files using the PDQ hashing algorithm.
- Enables detection of duplicate or near-duplicate visual content.
- Processes images stored in `Metadata` objects, adding computed hashes to the corresponding `Media` entries.
- Skips non-image media or files unsuitable for hashing (e.g., corrupted or unsupported formats).
### Notes
- Best used after enrichers like `thumbnail_enricher` or `screenshot_enricher` to ensure images are available.
- Uses the `pdqhash` library to compute 256-bit perceptual hashes, which are stored as hexadecimal strings.
"""
}

Wyświetl plik

@ -16,8 +16,8 @@ import numpy as np
from PIL import Image, UnidentifiedImageError
from loguru import logger
from . import Enricher
from ..core import Metadata
from auto_archiver.base_processors import Enricher
from auto_archiver.core import Metadata
class PdqHashEnricher(Enricher):
@ -31,10 +31,6 @@ class PdqHashEnricher(Enricher):
# Without this STEP.__init__ is not called
super().__init__(config)
@staticmethod
def configs() -> dict:
return {}
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
logger.debug(f"calculating perceptual hashes for {url=}")

Wyświetl plik

@ -0,0 +1 @@
from .s3 import S3Storage

Wyświetl plik

@ -0,0 +1,49 @@
{
"name": "S3 Storage",
"type": ["storage"],
"requires_setup": True,
"external_dependencies": {
"python": ["boto3", "loguru"],
},
"configs": {
"path_generator": {
"default": "url",
"help": "how to store the file in terms of directory structure: 'flat' sets to root; 'url' creates a directory based on the provided URL; 'random' creates a random directory.",
"choices": ["flat", "url", "random"],
},
"filename_generator": {
"default": "random",
"help": "how to name stored files: 'random' creates a random string; 'static' uses a replicable strategy such as a hash.",
"choices": ["random", "static"],
},
"bucket": {"default": None, "help": "S3 bucket name"},
"region": {"default": None, "help": "S3 region name"},
"key": {"default": None, "help": "S3 API key"},
"secret": {"default": None, "help": "S3 API secret"},
"random_no_duplicate": {"default": False, "help": f"if set, it will override `path_generator`, `filename_generator` and `folder`. It will check if the file already exists and if so it will not upload it again. Creates a new root folder path `{NO_DUPLICATES_FOLDER}`"},
"endpoint_url": {
"default": 'https://{region}.digitaloceanspaces.com',
"help": "S3 bucket endpoint, {region} are inserted at runtime"
},
"cdn_url": {
"default": 'https://{bucket}.{region}.cdn.digitaloceanspaces.com/{key}',
"help": "S3 CDN url, {bucket}, {region} and {key} are inserted at runtime"
},
"private": {"default": False, "help": "if true S3 files will not be readable online"},
},
"description": """
S3Storage: A storage module for saving media files to an S3-compatible object storage.
### Features
- Uploads media files to an S3 bucket with customizable configurations.
- Supports `random_no_duplicate` mode to avoid duplicate uploads by checking existing files based on SHA-256 hashes.
- Automatically generates unique paths for files when duplicates are found.
- Configurable endpoint and CDN URL for different S3-compatible providers.
- Supports both private and public file storage, with public files being readable online.
### Notes
- Requires S3 credentials (API key and secret) and a bucket name to function.
- The `random_no_duplicate` option ensures no duplicate uploads by leveraging hash-based folder structures.
- Uses `boto3` for interaction with the S3 API.
"""
}

Wyświetl plik

@ -2,10 +2,11 @@
from typing import IO
import boto3, os
from ..utils.misc import random_str
from ..core import Media
from ..storages import Storage
from ..enrichers import HashEnricher
from auto_archiver.utils.misc import random_str
from auto_archiver.core import Media
from auto_archiver.base_processors import Storage
# TODO
from auto_archiver.modules.hash_enricher import HashEnricher
from loguru import logger
NO_DUPLICATES_FOLDER = "no-dups/"
@ -25,27 +26,6 @@ class S3Storage(Storage):
if self.random_no_duplicate:
logger.warning("random_no_duplicate is set to True, this will override `path_generator`, `filename_generator` and `folder`.")
@staticmethod
def configs() -> dict:
return dict(
Storage.configs(),
** {
"bucket": {"default": None, "help": "S3 bucket name"},
"region": {"default": None, "help": "S3 region name"},
"key": {"default": None, "help": "S3 API key"},
"secret": {"default": None, "help": "S3 API secret"},
"random_no_duplicate": {"default": False, "help": f"if set, it will override `path_generator`, `filename_generator` and `folder`. It will check if the file already exists and if so it will not upload it again. Creates a new root folder path `{NO_DUPLICATES_FOLDER}`"},
"endpoint_url": {
"default": 'https://{region}.digitaloceanspaces.com',
"help": "S3 bucket endpoint, {region} are inserted at runtime"
},
"cdn_url": {
"default": 'https://{bucket}.{region}.cdn.digitaloceanspaces.com/{key}',
"help": "S3 CDN url, {bucket}, {region} and {key} are inserted at runtime"
},
"private": {"default": False, "help": "if true S3 files will not be readable online"},
})
def get_cdn_url(self, media: Media) -> str:
return self.cdn_url.format(bucket=self.bucket, region=self.region, key=media.key)

Wyświetl plik

@ -0,0 +1 @@
from .screenshot_enricher import ScreenshotEnricher

Wyświetl plik

@ -0,0 +1,30 @@
{
"name": "Screenshot Enricher",
"type": ["enricher"],
"requires_setup": True,
"external_dependencies": {
"python": ["loguru", "selenium"],
"bin": ["chromedriver"]
},
"configs": {
"width": {"default": 1280, "help": "width of the screenshots"},
"height": {"default": 720, "help": "height of the screenshots"},
"timeout": {"default": 60, "help": "timeout for taking the screenshot"},
"sleep_before_screenshot": {"default": 4, "help": "seconds to wait for the pages to load before taking screenshot"},
"http_proxy": {"default": "", "help": "http proxy to use for the webdriver, eg http://proxy-user:password@proxy-ip:port"},
"save_to_pdf": {"default": False, "help": "save the page as pdf along with the screenshot. PDF saving options can be adjusted with the 'print_options' parameter"},
"print_options": {"default": {}, "help": "options to pass to the pdf printer"}
},
"description": """
Captures screenshots and optionally saves web pages as PDFs using a WebDriver.
### Features
- Takes screenshots of web pages, with configurable width, height, and timeout settings.
- Optionally saves pages as PDFs, with additional configuration for PDF printing options.
- Bypasses URLs detected as authentication walls.
- Integrates seamlessly with the metadata enrichment pipeline, adding screenshots and PDFs as media.
### Notes
- Requires a WebDriver (e.g., ChromeDriver) installed and accessible via the system's PATH.
"""
}

Wyświetl plik

@ -5,24 +5,15 @@ import base64
from selenium.common.exceptions import TimeoutException
from . import Enricher
from ..utils import Webdriver, UrlUtil, random_str
from ..core import Media, Metadata, ArchivingContext
from auto_archiver.base_processors import Enricher
from auto_archiver.utils import Webdriver, UrlUtil, random_str
from auto_archiver.core import Media, Metadata, ArchivingContext
class ScreenshotEnricher(Enricher):
name = "screenshot_enricher"
@staticmethod
def configs() -> dict:
return {
"width": {"default": 1280, "help": "width of the screenshots"},
"height": {"default": 720, "help": "height of the screenshots"},
"timeout": {"default": 60, "help": "timeout for taking the screenshot"},
"sleep_before_screenshot": {"default": 4, "help": "seconds to wait for the pages to load before taking screenshot"},
"http_proxy": {"default": "", "help": "http proxy to use for the webdriver, eg http://proxy-user:password@proxy-ip:port"},
"save_to_pdf": {"default": False, "help": "save the page as pdf along with the screenshot. PDF saving options can be adjusted with the 'print_options' parameter"},
"print_options": {"default": {}, "help": "options to pass to the pdf printer"}
}
def __init__(self, config: dict) -> None:
super().__init__(config)
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()

Wyświetl plik

@ -0,0 +1 @@
from .ssl_enricher import SSLEnricher

Wyświetl plik

@ -0,0 +1,22 @@
{
"name": "SSL Certificate Enricher",
"type": ["enricher"],
"requires_setup": False,
"external_dependencies": {
"python": ["loguru", "python-slugify"],
},
"configs": {
"skip_when_nothing_archived": {"default": True, "help": "if true, will skip enriching when no media is archived"},
},
"description": """
Retrieves SSL certificate information for a domain and stores it as a file.
### Features
- Fetches SSL certificates for domains using the HTTPS protocol.
- Stores certificates in PEM format and adds them as media to the metadata.
- Skips enrichment if no media has been archived, based on the `skip_when_nothing_archived` configuration.
### Notes
- Requires the target URL to use the HTTPS scheme; other schemes are not supported.
"""
}

Some files were not shown because too many files have changed in this diff Show More