Update modules for new core structure.

pull/189/head
erinhmclark 2025-01-30 08:42:23 +00:00
rodzic 18ff36ce15
commit cddae65a90
35 zmienionych plików z 307 dodań i 233 usunięć

Wyświetl plik

@ -12,7 +12,7 @@ from googleapiclient.errors import HttpError
# Code below from https://developers.google.com/drive/api/quickstart/python
# Example invocation: py scripts/create_update_gdrive_oauth_token.py -c secrets/credentials.json -t secrets/gd-token.json
SCOPES = ['https://www.googleapis.com/auth/drive']
SCOPES = ["https://www.googleapis.com/auth/drive.file"]
@click.command(
@ -23,7 +23,7 @@ SCOPES = ['https://www.googleapis.com/auth/drive']
"-c",
type=click.Path(exists=True),
help="path to the credentials.json file downloaded from https://console.cloud.google.com/apis/credentials",
required=True
required=True,
)
@click.option(
"--token",
@ -31,59 +31,62 @@ SCOPES = ['https://www.googleapis.com/auth/drive']
type=click.Path(exists=False),
default="gd-token.json",
help="file where to place the OAuth token, defaults to gd-token.json which you must then move to where your orchestration file points to, defaults to gd-token.json",
required=True
required=True,
)
def main(credentials, token):
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first time.
creds = None
if os.path.exists(token):
with open(token, 'r') as stream:
with open(token, "r") as stream:
creds_json = json.load(stream)
# creds = Credentials.from_authorized_user_file(creds_json, SCOPES)
creds_json['refresh_token'] = creds_json.get("refresh_token", "")
creds_json["refresh_token"] = creds_json.get("refresh_token", "")
creds = Credentials.from_authorized_user_info(creds_json, SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
print('Requesting new token')
print("Requesting new token")
creds.refresh(Request())
else:
print('First run through so putting up login dialog')
print("First run through so putting up login dialog")
# credentials.json downloaded from https://console.cloud.google.com/apis/credentials
flow = InstalledAppFlow.from_client_secrets_file(credentials, SCOPES)
creds = flow.run_local_server(port=55192)
# Save the credentials for the next run
with open(token, 'w') as token:
print('Saving new token')
with open(token, "w") as token:
print("Saving new token")
token.write(creds.to_json())
else:
print('Token valid')
print("Token valid")
try:
service = build('drive', 'v3', credentials=creds)
service = build("drive", "v3", credentials=creds)
# About the user
results = service.about().get(fields="*").execute()
emailAddress = results['user']['emailAddress']
emailAddress = results["user"]["emailAddress"]
print(emailAddress)
# Call the Drive v3 API and return some files
results = service.files().list(
pageSize=10, fields="nextPageToken, files(id, name)").execute()
items = results.get('files', [])
results = (
service.files()
.list(pageSize=10, fields="nextPageToken, files(id, name)")
.execute()
)
items = results.get("files", [])
if not items:
print('No files found.')
print("No files found.")
return
print('Files:')
print("Files:")
for item in items:
print(u'{0} ({1})'.format(item['name'], item['id']))
print("{0} ({1})".format(item["name"], item["id"]))
except HttpError as error:
print(f'An error occurred: {error}')
print(f"An error occurred: {error}")
if __name__ == '__main__':
if __name__ == "__main__":
main()

Wyświetl plik

@ -0,0 +1,29 @@
"""
This script is used to create a new session file for the Telegram client.
To do this you must first create a Telegram application at https://my.telegram.org/apps
And store your id and hash in the environment variables TELEGRAM_API_ID and TELEGRAM_API_HASH.
Create a .env file, or add the following to your environment :
```
export TELEGRAM_API_ID=[YOUR_ID_HERE]
export TELEGRAM_API_HASH=[YOUR_HASH_HERE]
```
Then run this script to create a new session file.
You will need to provide your phone number and a 2FA code the first time you run this script.
"""
import os
from telethon.sync import TelegramClient
from loguru import logger
# Create a
API_ID = os.getenv("TELEGRAM_API_ID")
API_HASH = os.getenv("TELEGRAM_API_HASH")
SESSION_FILE = "secrets/anon-insta"
os.makedirs("secrets", exist_ok=True)
with TelegramClient(SESSION_FILE, API_ID, API_HASH) as client:
logger.success(f"New session file created: {SESSION_FILE}.session")

Wyświetl plik

@ -220,7 +220,7 @@ class ArchivingOrchestrator:
loaded_module: BaseModule = get_module(module, self.config)
except (KeyboardInterrupt, Exception) as e:
logger.error(f"Error during setup of archivers: {e}\n{traceback.format_exc()}")
if module_type == 'extractor':
if module_type == 'extractor' and loaded_module.name == module:
loaded_module.cleanup()
exit()

Wyświetl plik

@ -30,7 +30,7 @@ class Storage(BaseModule):
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass
def upload(self, media: Media, **kwargs) -> bool:
logger.debug(f'[{self.__class__.name}] storing file {media.filename} with key {media.key}')
logger.debug(f'[{self.__class__.__name__}] storing file {media.filename} with key {media.key}')
with open(media.filename, 'rb') as f:
return self.uploadf(f, media, **kwargs)

Wyświetl plik

@ -1 +1 @@
from api_db import AAApiDb
from .api_db import AAApiDb

Wyświetl plik

@ -4,19 +4,41 @@
"entry_point": "api_db:AAApiDb",
"requires_setup": True,
"dependencies": {
"python": ["requests",
"loguru"],
"python": ["requests", "loguru"],
},
"configs": {
"api_endpoint": {"default": None, "help": "API endpoint where calls are made to"},
"api_token": {"default": None, "help": "API Bearer token."},
"public": {"default": False, "help": "whether the URL should be publicly available via the API"},
"author_id": {"default": None, "help": "which email to assign as author"},
"group_id": {"default": None, "help": "which group of users have access to the archive in case public=false as author"},
"allow_rearchive": {"default": True, "help": "if False then the API database will be queried prior to any archiving operations and stop if the link has already been archived", "type": "bool",},
"store_results": {"default": True, "help": "when set, will send the results to the API database.", "type": "bool",},
"tags": {"default": [], "help": "what tags to add to the archived URL",}
"api_endpoint": {
"default": None,
"required": True,
"help": "API endpoint where calls are made to",
},
"api_token": {"default": None,
"help": "API Bearer token."},
"public": {
"default": False,
"type": "bool",
"help": "whether the URL should be publicly available via the API",
},
"author_id": {"default": None, "help": "which email to assign as author"},
"group_id": {
"default": None,
"help": "which group of users have access to the archive in case public=false as author",
},
"allow_rearchive": {
"default": True,
"type": "bool",
"help": "if False then the API database will be queried prior to any archiving operations and stop if the link has already been archived",
},
"store_results": {
"default": True,
"type": "bool",
"help": "when set, will send the results to the API database.",
},
"tags": {
"default": [],
"help": "what tags to add to the archived URL",
},
},
"description": """
Provides integration with the Auto-Archiver API for querying and storing archival data.

Wyświetl plik

@ -1,5 +1,7 @@
from typing import Union
import requests, os
import os
import requests
from loguru import logger
from auto_archiver.core import Database
@ -7,17 +9,7 @@ from auto_archiver.core import Metadata
class AAApiDb(Database):
"""
Connects to auto-archiver-api instance
"""
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
self.allow_rearchive = bool(self.allow_rearchive)
self.store_results = bool(self.store_results)
self.assert_valid_string("api_endpoint")
"""Connects to auto-archiver-api instance"""
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
""" query the database for the existence of this item.

Wyświetl plik

@ -1 +0,0 @@
from .atlos import AtlosStorage

Wyświetl plik

@ -1,40 +0,0 @@
{
"name": "atlos_storage",
"type": ["storage"],
"requires_setup": True,
"dependencies": {"python": ["loguru", "requests"], "bin": [""]},
"configs": {
"path_generator": {
"default": "url",
"help": "how to store the file in terms of directory structure: 'flat' sets to root; 'url' creates a directory based on the provided URL; 'random' creates a random directory.",
},
"filename_generator": {
"default": "random",
"help": "how to name stored files: 'random' creates a random string; 'static' uses a replicable strategy such as a hash.",
},
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"type": "str",
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"type": "str",
},
},
"description": """
AtlosStorage: A storage module for saving media files to the Atlos platform.
### Features
- Uploads media files to Atlos using Atlos-specific APIs.
- Automatically calculates SHA-256 hashes of media files for integrity verification.
- Skips uploads for files that already exist on Atlos with the same hash.
- Supports attaching metadata, such as `atlos_id`, to the uploaded files.
- Provides CDN-like URLs for accessing uploaded media.
### Notes
- Requires Atlos API configuration, including `atlos_url` and `api_token`.
- Files are linked to an `atlos_id` in the metadata, ensuring proper association with Atlos source materials.
""",
}

Wyświetl plik

@ -1,14 +1,10 @@
import os
from typing import Union
from loguru import logger
from csv import DictWriter
from dataclasses import asdict
import requests
from loguru import logger
from auto_archiver.core import Database
from auto_archiver.core import Metadata
from auto_archiver.utils import get_atlos_config_options
class AtlosDb(Database):

Wyświetl plik

@ -8,8 +8,9 @@
"configs": {
"api_token": {
"default": None,
"type": "str",
"required": True,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"type": "str"
},
"atlos_url": {
"default": "https://platform.atlos.org",

Wyświetl plik

@ -1,19 +1,12 @@
from loguru import logger
import requests
from loguru import logger
from auto_archiver.core import Feeder
from auto_archiver.core import Metadata, ArchivingContext
from auto_archiver.utils import get_atlos_config_options
from auto_archiver.core import Metadata
class AtlosFeeder(Feeder):
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
if type(self.api_token) != str:
raise Exception("Atlos Feeder did not receive an Atlos API token")
def __iter__(self) -> Metadata:
# Get all the urls from the Atlos API
count = 0

Wyświetl plik

@ -1,12 +1,12 @@
import os
from typing import IO, List, Optional
from loguru import logger
import requests
import hashlib
import os
from typing import IO, Optional
import requests
from loguru import logger
from auto_archiver.core import Media, Metadata
from auto_archiver.core import Storage
from auto_archiver.utils import get_atlos_config_options
class AtlosStorage(Storage):

Wyświetl plik

@ -1,14 +1,14 @@
{
"name": "Google Drive Storage",
"type": ["storage"],
"author": "Dave Mateer",
"entry_point": "gdrive_storage::GDriveStorage",
"requires_setup": True,
"dependencies": {
"python": [
"loguru",
"google-api-python-client",
"google-auth",
"google-auth-oauthlib",
"google-auth-httplib2"
"googleapiclient",
"google",
],
},
"configs": {
@ -18,17 +18,24 @@
"choices": ["flat", "url", "random"],
},
"filename_generator": {
"default": "random",
"default": "static",
"help": "how to name stored files: 'random' creates a random string; 'static' uses a replicable strategy such as a hash.",
"choices": ["random", "static"],
},
"root_folder_id": {"default": None, "help": "root google drive folder ID to use as storage, found in URL: 'https://drive.google.com/drive/folders/FOLDER_ID'"},
"oauth_token": {"default": None, "help": "JSON filename with Google Drive OAuth token: check auto-archiver repository scripts folder for create_update_gdrive_oauth_token.py. NOTE: storage used will count towards owner of GDrive folder, therefore it is best to use oauth_token_filename over service_account."},
"root_folder_id": {"default": None,
# "required": True,
"help": "root google drive folder ID to use as storage, found in URL: 'https://drive.google.com/drive/folders/FOLDER_ID'"},
"oauth_token": {"default": None,
"help": "JSON filename with Google Drive OAuth token: check auto-archiver repository scripts folder for create_update_gdrive_oauth_token.py. NOTE: storage used will count towards owner of GDrive folder, therefore it is best to use oauth_token_filename over service_account."},
"service_account": {"default": "secrets/service_account.json", "help": "service account JSON file path, same as used for Google Sheets. NOTE: storage used will count towards the developer account."},
},
"description": """
GDriveStorage: A storage module for saving archived content to Google Drive.
Author: Dave Mateer, (And maintained by: )
Source Documentation: https://davemateer.com/2022/04/28/google-drive-with-python
### Features
- Saves media files to Google Drive, organizing them into folders based on the provided path structure.
- Supports OAuth token-based authentication or service account credentials for API access.
@ -39,5 +46,55 @@
- Requires setup with either a Google OAuth token or a service account JSON file.
- Files are uploaded to the specified `root_folder_id` and organized by the `media.key` structure.
- Automatically handles Google Drive API token refreshes for long-running jobs.
"""
## Overview
This module integrates Google Drive as a storage backend, enabling automatic folder creation and file uploads. It supports authentication via **service accounts** (recommended for automation) or **OAuth tokens** (for user-based authentication).
## Features
- Saves files to Google Drive, organizing them into structured folders.
- Supports both **service account** and **OAuth token** authentication.
- Automatically creates folders if they don't exist.
- Generates public URLs for easy file sharing.
## Setup Guide
1. **Enable Google Drive API**
- Create a Google Cloud project at [Google Cloud Console](https://console.cloud.google.com/)
- Enable the **Google Drive API**.
2. **Set Up a Google Drive Folder**
- Create a folder in **Google Drive** and copy its **folder ID** from the URL.
- Add the **folder ID** to your configuration (`orchestration.yaml`):
```yaml
root_folder_id: "FOLDER_ID"
```
3. **Authentication Options**
- **Option 1: Service Account (Recommended)**
- Create a **service account** in Google Cloud IAM.
- Download the JSON key file and save it as:
```
secrets/service_account.json
```
- **Share your Drive folder** with the service accounts `client_email` (found in the JSON file).
- **Option 2: OAuth Token (User Authentication)**
- Create OAuth **Desktop App credentials** in Google Cloud.
- Save the credentials as:
```
secrets/oauth_credentials.json
```
- Generate an OAuth token by running:
```sh
python scripts/create_update_gdrive_oauth_token.py -c secrets/oauth_credentials.json
```
Notes on the OAuth token:
Tokens are refreshed after 1 hour however keep working for 7 days (tbc)
so as long as the job doesn't last for 7 days then this method of refreshing only once per run will work
see this link for details on the token:
https://davemateer.com/2022/04/28/google-drive-with-python#tokens
"""
}

Wyświetl plik

@ -1,68 +1,69 @@
import shutil, os, time, json
import json
import os
import time
from typing import IO
from loguru import logger
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from loguru import logger
from auto_archiver.core import Media
from auto_archiver.core import Storage
class GDriveStorage(Storage):
def __init__(self, config: dict) -> None:
super().__init__(config)
def setup(self, config: dict) -> None:
# Step 1: Call the BaseModule setup to dynamically assign configs
super().setup(config)
self.scopes = ['https://www.googleapis.com/auth/drive']
# Initialize Google Drive service
self._setup_google_drive_service()
SCOPES = ['https://www.googleapis.com/auth/drive']
if self.oauth_token is not None:
"""
Tokens are refreshed after 1 hour
however keep working for 7 days (tbc)
so as long as the job doesn't last for 7 days
then this method of refreshing only once per run will work
see this link for details on the token
https://davemateer.com/2022/04/28/google-drive-with-python#tokens
"""
logger.debug(f'Using GD OAuth token {self.oauth_token}')
# workaround for missing 'refresh_token' in from_authorized_user_file
with open(self.oauth_token, 'r') as stream:
creds_json = json.load(stream)
creds_json['refresh_token'] = creds_json.get("refresh_token", "")
creds = Credentials.from_authorized_user_info(creds_json, SCOPES)
# creds = Credentials.from_authorized_user_file(self.oauth_token, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
logger.debug('Requesting new GD OAuth token')
creds.refresh(Request())
else:
raise Exception("Problem with creds - create the token again")
# Save the credentials for the next run
with open(self.oauth_token, 'w') as token:
logger.debug('Saving new GD OAuth token')
token.write(creds.to_json())
else:
logger.debug('GD OAuth Token valid')
def _setup_google_drive_service(self):
"""Initialize Google Drive service based on provided credentials."""
if self.oauth_token:
logger.debug(f"Using Google Drive OAuth token: {self.oauth_token}")
self.service = self._initialize_with_oauth_token()
elif self.service_account:
logger.debug(f"Using Google Drive service account: {self.service_account}")
self.service = self._initialize_with_service_account()
else:
gd_service_account = self.service_account
logger.debug(f'Using GD Service Account {gd_service_account}')
creds = service_account.Credentials.from_service_account_file(gd_service_account, scopes=SCOPES)
raise ValueError("Missing credentials: either `oauth_token` or `service_account` must be provided.")
self.service = build('drive', 'v3', credentials=creds)
def _initialize_with_oauth_token(self):
"""Initialize Google Drive service with OAuth token."""
with open(self.oauth_token, 'r') as stream:
creds_json = json.load(stream)
creds_json['refresh_token'] = creds_json.get("refresh_token", "")
creds = Credentials.from_authorized_user_info(creds_json, self.scopes)
if not creds.valid and creds.expired and creds.refresh_token:
creds.refresh(Request())
with open(self.oauth_token, 'w') as token_file:
logger.debug("Saving refreshed OAuth token.")
token_file.write(creds.to_json())
elif not creds.valid:
raise ValueError("Invalid OAuth token. Please regenerate the token.")
return build('drive', 'v3', credentials=creds)
def _initialize_with_service_account(self):
"""Initialize Google Drive service with service account."""
creds = service_account.Credentials.from_service_account_file(self.service_account, scopes=self.scopes)
return build('drive', 'v3', credentials=creds)
def get_cdn_url(self, media: Media) -> str:
"""
only support files saved in a folder for GD
S3 supports folder and all stored in the root
"""
# full_name = os.path.join(self.folder, media.key)
parent_id, folder_id = self.root_folder_id, None
path_parts = media.key.split(os.path.sep)
@ -77,7 +78,7 @@ class GDriveStorage(Storage):
return f"https://drive.google.com/file/d/{file_id}/view?usp=sharing"
def upload(self, media: Media, **kwargs) -> bool:
logger.debug(f'[{self.__class__.name}] storing file {media.filename} with key {media.key}')
logger.debug(f'[{self.__class__.__name__}] storing file {media.filename} with key {media.key}')
"""
1. for each sub-folder in the path check if exists or create
2. upload file to root_id/other_paths.../filename
@ -168,8 +169,3 @@ class GDriveStorage(Storage):
gd_folder = self.service.files().create(supportsAllDrives=True, body=file_metadata, fields='id').execute()
return gd_folder.get('id')
# def exists(self, key):
# try:
# self.get_cdn_url(key)
# return True
# except: return False

Wyświetl plik

@ -4,7 +4,7 @@
"entry_point": "gsheet_db::GsheetsDb",
"requires_setup": True,
"dependencies": {
"python": ["loguru", "gspread", "python-slugify"],
"python": ["loguru", "gspread", "slugify"],
},
"configs": {
"allow_worksheets": {
@ -17,6 +17,7 @@
},
"use_sheet_names_in_stored_paths": {
"default": True,
"type": "bool",
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
}
},

Wyświetl plik

@ -4,7 +4,7 @@
"entry_point": "gsheet_feeder::GsheetsFeeder",
"requires_setup": True,
"dependencies": {
"python": ["loguru", "gspread", "python-slugify"],
"python": ["loguru", "gspread", "slugify"],
},
"configs": {
"sheet": {"default": None, "help": "name of the sheet to archive"},

Wyświetl plik

@ -1,6 +1,7 @@
{
"name": "Instagram API Extractor",
"type": ["extractor"],
"entry_point": "instagram_api_extractor::InstagramAPIExtractor",
"dependencies":
{"python": ["requests",
"loguru",
@ -9,24 +10,32 @@
},
"requires_setup": True,
"configs": {
"access_token": {"default": None, "help": "a valid instagrapi-api token"},
"api_endpoint": {"default": None, "help": "API endpoint to use"},
"access_token": {"default": None,
"help": "a valid instagrapi-api token"},
"api_endpoint": {"default": None,
# "required": True,
"help": "API endpoint to use"},
"full_profile": {
"default": False,
"type": "bool",
"help": "if true, will download all posts, tagged posts, stories, and highlights for a profile, if false, will only download the profile pic and information.",
},
"full_profile_max_posts": {
"default": 0,
"type": "int",
"help": "Use to limit the number of posts to download when full_profile is true. 0 means no limit. limit is applied softly since posts are fetched in batch, once to: posts, tagged posts, and highlights",
},
"minimize_json_output": {
"default": True,
"type": "bool",
"help": "if true, will remove empty values from the json output",
},
},
"description": """
Archives various types of Instagram content using the Instagrapi API.
Requires setting up an Instagrapi API deployment and providing an access token and API endpoint.
### Features
- Connects to an Instagrapi API deployment to fetch Instagram profiles, posts, stories, highlights, reels, and tagged content.
- Supports advanced configuration options, including:

Wyświetl plik

@ -32,16 +32,11 @@ class InstagramAPIExtractor(Extractor):
r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com)\/(stories(?:\/highlights)?|p|reel)?\/?([^\/\?]*)\/?(\d+)?"
)
def __init__(self, config: dict) -> None:
super().__init__(config)
self.assert_valid_string("access_token")
self.assert_valid_string("api_endpoint")
self.full_profile_max_posts = int(self.full_profile_max_posts)
def setup(self, config: dict) -> None:
super().setup(config)
if self.api_endpoint[-1] == "/":
self.api_endpoint = self.api_endpoint[:-1]
self.full_profile = bool(self.full_profile)
self.minimize_json_output = bool(self.minimize_json_output)
def download(self, item: Metadata) -> Metadata:
url = item.get_url()

Wyświetl plik

@ -9,9 +9,12 @@
},
"requires_setup": True,
"configs": {
"username": {"default": None, "help": "a valid Instagram username"},
"username": {"default": None,
"required": True,
"help": "a valid Instagram username"},
"password": {
"default": None,
"required": True,
"help": "the corresponding Instagram account password",
},
"download_folder": {
@ -25,9 +28,11 @@
# TODO: fine-grain
# "download_stories": {"default": True, "help": "if the link is to a user profile: whether to get stories information"},
},
"description": """Uses the Instaloader library to download content from Instagram. This class handles both individual posts
and user profiles, downloading as much information as possible, including images, videos, text, stories,
highlights, and tagged posts. Authentication is required via username/password or a session file.
"description": """
Uses the [Instaloader library](https://instaloader.github.io/as-module.html) to download content from Instagram. This class handles both individual posts
and user profiles, downloading as much information as possible, including images, videos, text, stories,
highlights, and tagged posts.
Authentication is required via username/password or a session file.
""",
}

Wyświetl plik

@ -4,7 +4,7 @@
"""
import re, os, shutil, traceback
import instaloader # https://instaloader.github.io/as-module.html
import instaloader
from loguru import logger
from auto_archiver.core import Extractor
@ -22,13 +22,9 @@ class InstagramExtractor(Extractor):
profile_pattern = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/(\w+)")
# TODO: links to stories
def __init__(self, config: dict) -> None:
super().__init__(config)
# TODO: refactor how configuration validation is done
self.assert_valid_string("username")
self.assert_valid_string("password")
self.assert_valid_string("download_folder")
self.assert_valid_string("session_file")
def setup(self, config: dict) -> None:
super().setup(config)
self.insta = instaloader.Instaloader(
download_geotags=True, download_comments=True, compress_json=False, dirname_pattern=self.download_folder, filename_pattern="{date_utc}_UTC_{target}__{typename}"
)

Wyświetl plik

@ -1,15 +1,16 @@
{
"name": "Instagram Telegram Bot Extractor",
"type": ["extractor"],
"dependencies": {"python": ["loguru",
"telethon",],
"dependencies": {"python": ["loguru", "telethon",],
},
"requires_setup": True,
"configs": {
"api_id": {"default": None, "help": "telegram API_ID value, go to https://my.telegram.org/apps"},
"api_hash": {"default": None, "help": "telegram API_HASH value, go to https://my.telegram.org/apps"},
"session_file": {"default": "secrets/anon-insta", "help": "optional, records the telegram login session for future usage, '.session' will be appended to the provided value."},
"timeout": {"default": 45, "help": "timeout to fetch the instagram content in seconds."},
"timeout": {"default": 45,
"type": "int",
"help": "timeout to fetch the instagram content in seconds."},
},
"description": """
The `InstagramTbotExtractor` module uses a Telegram bot (`instagram_load_bot`) to fetch and archive Instagram content,
@ -28,6 +29,12 @@ returned as part of a `Metadata` object.
To use the `InstagramTbotExtractor`, you need to provide the following configuration settings:
- **API ID and Hash**: Telegram API credentials obtained from [my.telegram.org/apps](https://my.telegram.org/apps).
- **Session File**: Optional path to store the Telegram session file for future use.
- The session file is created automatically and should be unique for each instance.
- You may need to enter your Telegram credentials (phone) and use the a 2FA code sent to you the first time you run the extractor.:
```2025-01-30 00:43:49.348 | INFO | auto_archiver.modules.instagram_tbot_extractor.instagram_tbot_extractor:setup:36 - SETUP instagram_tbot_extractor checking login...
Please enter your phone (or bot token): +447123456789
Please enter the code you received: 00000
Signed in successfully as E C; remember to not break the ToS or you will risk an account ban!
```
""",
}

Wyświetl plik

@ -27,15 +27,19 @@ class InstagramTbotExtractor(Extractor):
https://t.me/instagram_load_bot
"""
def setup(self) -> None:
def setup(self, configs) -> None:
"""
1. makes a copy of session_file that is removed in cleanup
2. checks if the session file is valid
"""
super().setup(configs)
logger.info(f"SETUP {self.name} checking login...")
# make a copy of the session that is used exclusively with this archiver instance
new_session_file = os.path.join("secrets/", f"instabot-{time.strftime('%Y-%m-%d')}{random_str(8)}.session")
if not os.path.exists(f"{self.session_file}.session"):
raise FileNotFoundError(f"session file {self.session_file}.session not found, "
f"to set this up run the setup script in scripts/telegram_setup.py")
shutil.copy(self.session_file + ".session", new_session_file)
self.session_file = new_session_file.replace(".session", "")
@ -43,7 +47,6 @@ class InstagramTbotExtractor(Extractor):
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
except OperationalError as e:
logger.error(f"Unable to access the {self.session_file} session, please make sure you don't use the same session file here and in telethon_extractor. if you do then disable at least one of the archivers for the 1st time you setup telethon session: {e}")
with self.client.start():
logger.success(f"SETUP {self.name} login works.")

Wyświetl plik

@ -3,7 +3,7 @@
"type": ["enricher"],
"requires_setup": False,
"dependencies": {
"python": ["loguru", "pdqhash", "numpy", "Pillow"],
"python": ["loguru", "pdqhash", "numpy", "PIL"],
},
"description": """
PDQ Hash Enricher for generating perceptual hashes of media files.

Wyświetl plik

@ -1 +1 @@
from .s3 import S3Storage
from .s3_storage import S3Storage

Wyświetl plik

@ -7,12 +7,12 @@
},
"configs": {
"path_generator": {
"default": "url",
"default": "flat",
"help": "how to store the file in terms of directory structure: 'flat' sets to root; 'url' creates a directory based on the provided URL; 'random' creates a random directory.",
"choices": ["flat", "url", "random"],
},
"filename_generator": {
"default": "random",
"default": "static",
"help": "how to name stored files: 'random' creates a random string; 'static' uses a replicable strategy such as a hash.",
"choices": ["random", "static"],
},
@ -20,7 +20,9 @@
"region": {"default": None, "help": "S3 region name"},
"key": {"default": None, "help": "S3 API key"},
"secret": {"default": None, "help": "S3 API secret"},
"random_no_duplicate": {"default": False, "help": "if set, it will override `path_generator`, `filename_generator` and `folder`. It will check if the file already exists and if so it will not upload it again. Creates a new root folder path `no-dups/`"},
"random_no_duplicate": {"default": False,
"type": "bool",
"help": "if set, it will override `path_generator`, `filename_generator` and `folder`. It will check if the file already exists and if so it will not upload it again. Creates a new root folder path `no-dups/`"},
"endpoint_url": {
"default": 'https://{region}.digitaloceanspaces.com',
"help": "S3 bucket endpoint, {region} are inserted at runtime"
@ -29,7 +31,9 @@
"default": 'https://{bucket}.{region}.cdn.digitaloceanspaces.com/{key}',
"help": "S3 CDN url, {bucket}, {region} and {key} are inserted at runtime"
},
"private": {"default": False, "help": "if true S3 files will not be readable online"},
"private": {"default": False,
"type": "bool",
"help": "if true S3 files will not be readable online"},
},
"description": """
S3Storage: A storage module for saving media files to an S3-compatible object storage.

Wyświetl plik

@ -1,19 +1,21 @@
from typing import IO
import boto3, os
from auto_archiver.utils.misc import random_str
from auto_archiver.core import Media
from auto_archiver.core import Storage
from auto_archiver.modules.hash_enricher import HashEnricher
import boto3
import os
from loguru import logger
NO_DUPLICATES_FOLDER = "no-dups/"
class S3Storage(Storage):
from auto_archiver.core import Media
from auto_archiver.core import Storage
from auto_archiver.modules.hash_enricher import HashEnricher
from auto_archiver.utils.misc import random_str
def __init__(self, config: dict) -> None:
super().__init__(config)
NO_DUPLICATES_FOLDER = "no-dups/"
class S3Storage(Storage, HashEnricher):
def setup(self, config: dict) -> None:
super().setup(config)
self.s3 = boto3.client(
's3',
region_name=self.region,
@ -21,7 +23,6 @@ class S3Storage(Storage):
aws_access_key_id=self.key,
aws_secret_access_key=self.secret
)
self.random_no_duplicate = bool(self.random_no_duplicate)
if self.random_no_duplicate:
logger.warning("random_no_duplicate is set to True, this will override `path_generator`, `filename_generator` and `folder`.")
@ -48,8 +49,7 @@ class S3Storage(Storage):
def is_upload_needed(self, media: Media) -> bool:
if self.random_no_duplicate:
# checks if a folder with the hash already exists, if so it skips the upload
he = HashEnricher({"hash_enricher": {"algorithm": "SHA-256", "chunksize": 1.6e7}})
hd = he.calculate_hash(media.filename)
hd = self.calculate_hash(media.filename)
path = os.path.join(NO_DUPLICATES_FOLDER, hd[:24])
if existing_key:=self.file_in_folder(path):
@ -61,8 +61,7 @@ class S3Storage(Storage):
_, ext = os.path.splitext(media.key)
media.key = os.path.join(path, f"{random_str(24)}{ext}")
return True
def file_in_folder(self, path:str) -> str:
# checks if path exists and is not an empty folder
if not path.endswith('/'):

Wyświetl plik

@ -3,7 +3,7 @@
"type": ["enricher"],
"requires_setup": False,
"dependencies": {
"python": ["loguru", "python-slugify"],
"python": ["loguru", "slugify"],
},
'entry_point': 'ssl_enricher::SSLEnricher',
"configs": {

Wyświetl plik

@ -3,7 +3,7 @@
"type": ["enricher"],
"requires_setup": False,
"dependencies": {
"python": ["loguru", "ffmpeg-python"],
"python": ["loguru", "ffmpeg"],
"bin": ["ffmpeg"]
},
"configs": {

Wyświetl plik

@ -4,14 +4,20 @@
"requires_setup": True,
"depends": ["core", "utils"],
"dependencies": {
"python": ["loguru",
"vk_url_scraper"],
"python": ["loguru", "vk_url_scraper"],
},
"configs": {
"username": {"default": None, "help": "valid VKontakte username"},
"password": {"default": None, "help": "valid VKontakte password"},
"session_file": {"default": "secrets/vk_config.v2.json", "help": "valid VKontakte password"},
"username": {"default": None,
"required": True,
"help": "valid VKontakte username"},
"password": {"default": None,
"required": True,
"help": "valid VKontakte password"},
"session_file": {
"default": "secrets/vk_config.v2.json",
"help": "valid VKontakte password",
},
},
"description": """
The `VkExtractor` fetches posts, text, and images from VK (VKontakte) social media pages.
This archiver is specialized for `/wall` posts and uses the `VkScraper` library to extract
@ -31,6 +37,5 @@ To use the `VkArchiver`, you must provide valid VKontakte login credentials and
Credentials can be set in the configuration file or directly via environment variables. Ensure you
have access to the VKontakte API by creating an account at [VKontakte](https://vk.com/).
"""
,
""",
}

Wyświetl plik

@ -12,10 +12,8 @@ class VkExtractor(Extractor):
Currently only works for /wall posts
"""
def __init__(self, config: dict) -> None:
super().__init__(config)
self.assert_valid_string("username")
self.assert_valid_string("password")
def setup(self, config: dict) -> None:
super().setup(config)
self.vks = VkScraper(self.username, self.password, session_file=self.session_file)
def download(self, item: Metadata) -> Metadata:

Wyświetl plik

@ -1,6 +1,7 @@
{
"name": "WACZ Enricher",
"type": ["enricher", "archiver"],
"entry_point": "wacz_enricher::WaczExtractorEnricher",
"requires_setup": True,
"dependencies": {
"python": [
@ -25,6 +26,7 @@
},
"description": """
Creates .WACZ archives of web pages using the `browsertrix-crawler` tool, with options for media extraction and screenshot saving.
[Browsertrix-crawler](https://crawler.docs.browsertrix.com/user-guide/) is a headless browser-based crawler that archives web pages in WACZ format.
### Features
- Archives web pages into .WACZ format using Docker or direct invocation of `browsertrix-crawler`.
@ -33,7 +35,7 @@
- Generates metadata from the archived page's content and structure (e.g., titles, text).
### Notes
- Requires Docker for running `browsertrix-crawler` unless explicitly disabled.
- Requires Docker for running `browsertrix-crawler` .
- Configurable via parameters for timeout, media extraction, screenshots, and proxy settings.
"""
}

Wyświetl plik

@ -18,7 +18,9 @@ class WaczExtractorEnricher(Enricher, Extractor):
When used as an archiver it will extract the media from the .WACZ archive so it can be enriched.
"""
def setup(self) -> None:
def setup(self, configs) -> None:
super().setup(configs)
self.use_docker = os.environ.get('WACZ_ENABLE_DOCKER') or not os.environ.get('RUNNING_IN_DOCKER')
self.docker_in_docker = os.environ.get('WACZ_ENABLE_DOCKER') and os.environ.get('RUNNING_IN_DOCKER')

Wyświetl plik

@ -3,7 +3,7 @@
"type": ["enricher"],
"requires_setup": True,
"dependencies": {
"python": ["loguru", "requests"],
"python": ["s3_storage", "loguru", "requests"],
},
"configs": {
"api_endpoint": {"default": None, "help": "WhisperApi api endpoint, eg: https://whisperbox-api.com/api/v1, a deployment of https://github.com/bellingcat/whisperbox-transcribe."},

Wyświetl plik

@ -5,7 +5,7 @@ from loguru import logger
from auto_archiver.core import Enricher
from auto_archiver.core import Metadata, Media, ArchivingContext
from auto_archiver.modules.s3_storage import S3Storage
from auto_archiver.core.module import get_module
class WhisperEnricher(Enricher):
"""
@ -53,7 +53,7 @@ class WhisperEnricher(Enricher):
to_enrich.set_content(f"\n[automatic video transcript]: {v}")
def submit_job(self, media: Media):
s3 = self._get_s3_storage()
s3 = get_module("s3_storage", self.config)
s3_url = s3.get_cdn_url(media)
assert s3_url in media.urls, f"Could not find S3 url ({s3_url}) in list of stored media urls "
payload = {