Merge pull request #189 from bellingcat/add_module_tests

- Add module tests
- Fix some storage related bugs 
- Separate the modules setup() method and base module config_setup()
pull/185/head
Erin Clark 2025-02-11 13:11:41 +00:00 zatwierdzone przez GitHub
commit aa5ac18d6a
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
31 zmienionych plików z 1554 dodań i 186 usunięć

Wyświetl plik

@ -14,7 +14,7 @@ class BaseModule(ABC):
Base module class. All modules should inherit from this class. Base module class. All modules should inherit from this class.
The exact methods a class implements will depend on the type of module it is, The exact methods a class implements will depend on the type of module it is,
however all modules have a .setup(config: dict) method to run any setup code however modules can have a .setup() method to run any setup code
(e.g. logging in to a site, spinning up a browser etc.) (e.g. logging in to a site, spinning up a browser etc.)
See BaseModule.MODULE_TYPES for the types of modules you can create, noting that See BaseModule.MODULE_TYPES for the types of modules you can create, noting that
@ -60,7 +60,7 @@ class BaseModule(ABC):
def storages(self) -> list: def storages(self) -> list:
return self.config.get('storages', []) return self.config.get('storages', [])
def setup(self, config: dict): def config_setup(self, config: dict):
authentication = config.get('authentication', {}) authentication = config.get('authentication', {})
# extract out concatenated sites # extract out concatenated sites
@ -80,6 +80,10 @@ class BaseModule(ABC):
for key, val in config.get(self.name, {}).items(): for key, val in config.get(self.name, {}).items():
setattr(self, key, val) setattr(self, key, val)
def setup(self):
# For any additional setup required by modules, e.g. autehntication
pass
def auth_for_site(self, site: str, extract_cookies=True) -> Mapping[str, Any]: def auth_for_site(self, site: str, extract_cookies=True) -> Mapping[str, Any]:
""" """
Returns the authentication information for a given site. This is used to authenticate Returns the authentication information for a given site. This is used to authenticate

Wyświetl plik

@ -65,7 +65,7 @@ class Media:
def is_stored(self, in_storage) -> bool: def is_stored(self, in_storage) -> bool:
# checks if the media is already stored in the given storage # checks if the media is already stored in the given storage
return len(self.urls) > 0 and any([u for u in self.urls if in_storage.get_cdn_url() in u]) return len(self.urls) > 0 and len(self.urls) == len(in_storage.config["steps"]["storages"])
def set(self, key: str, value: Any) -> Media: def set(self, key: str, value: Any) -> Media:
self.properties[key] = value self.properties[key] = value

Wyświetl plik

@ -58,7 +58,7 @@ def get_module_lazy(module_name: str, suppress_warnings: bool = False) -> LazyBa
This has all the information about the module, but does not load the module itself or its dependencies This has all the information about the module, but does not load the module itself or its dependencies
To load an actual module, call .setup() on a laz module To load an actual module, call .setup() on a lazy module
""" """
if module_name in _LAZY_LOADED_MODULES: if module_name in _LAZY_LOADED_MODULES:
@ -241,7 +241,8 @@ class LazyBaseModule:
# merge the default config with the user config # merge the default config with the user config
default_config = dict((k, v['default']) for k, v in self.configs.items() if v.get('default')) default_config = dict((k, v['default']) for k, v in self.configs.items() if v.get('default'))
config[self.name] = default_config | config.get(self.name, {}) config[self.name] = default_config | config.get(self.name, {})
instance.setup(config) instance.config_setup(config)
instance.setup()
return instance return instance
def __repr__(self): def __repr__(self):

Wyświetl plik

@ -19,9 +19,7 @@ from auto_archiver.core import Storage
class GDriveStorage(Storage): class GDriveStorage(Storage):
def setup(self, config: dict) -> None: def setup(self) -> None:
# Step 1: Call the BaseModule setup to dynamically assign configs
super().setup(config)
self.scopes = ['https://www.googleapis.com/auth/drive'] self.scopes = ['https://www.googleapis.com/auth/drive']
# Initialize Google Drive service # Initialize Google Drive service
self._setup_google_drive_service() self._setup_google_drive_service()
@ -72,9 +70,12 @@ class GDriveStorage(Storage):
for folder in path_parts[0:-1]: for folder in path_parts[0:-1]:
folder_id = self._get_id_from_parent_and_name(parent_id, folder, use_mime_type=True, raise_on_missing=True) folder_id = self._get_id_from_parent_and_name(parent_id, folder, use_mime_type=True, raise_on_missing=True)
parent_id = folder_id parent_id = folder_id
# get id of file inside folder (or sub folder) # get id of file inside folder (or sub folder)
file_id = self._get_id_from_parent_and_name(folder_id, filename) file_id = self._get_id_from_parent_and_name(folder_id, filename, raise_on_missing=True)
if not file_id:
#
logger.info(f"file {filename} not found in folder {folder_id}")
return None
return f"https://drive.google.com/file/d/{file_id}/view?usp=sharing" return f"https://drive.google.com/file/d/{file_id}/view?usp=sharing"
def upload(self, media: Media, **kwargs) -> bool: def upload(self, media: Media, **kwargs) -> bool:
@ -106,7 +107,13 @@ class GDriveStorage(Storage):
# must be implemented even if unused # must be implemented even if unused
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass
def _get_id_from_parent_and_name(self, parent_id: str, name: str, retries: int = 1, sleep_seconds: int = 10, use_mime_type: bool = False, raise_on_missing: bool = True, use_cache=False): def _get_id_from_parent_and_name(self, parent_id: str,
name: str,
retries: int = 1,
sleep_seconds: int = 10,
use_mime_type: bool = False,
raise_on_missing: bool = True,
use_cache=False):
""" """
Retrieves the id of a folder or file from its @name and the @parent_id folder Retrieves the id of a folder or file from its @name and the @parent_id folder
Optionally does multiple @retries and sleeps @sleep_seconds between them Optionally does multiple @retries and sleeps @sleep_seconds between them

Wyświetl plik

@ -1,6 +1,4 @@
from typing import Union, Tuple from typing import Union, Tuple
import datetime
from urllib.parse import quote from urllib.parse import quote
from loguru import logger from loguru import logger
@ -8,32 +6,33 @@ from loguru import logger
from auto_archiver.core import Database from auto_archiver.core import Database
from auto_archiver.core import Metadata, Media from auto_archiver.core import Metadata, Media
from auto_archiver.modules.gsheet_feeder import GWorksheet from auto_archiver.modules.gsheet_feeder import GWorksheet
from auto_archiver.utils.misc import get_current_timestamp
class GsheetsDb(Database): class GsheetsDb(Database):
""" """
NB: only works if GsheetFeeder is used. NB: only works if GsheetFeeder is used.
could be updated in the future to support non-GsheetFeeder metadata could be updated in the future to support non-GsheetFeeder metadata
""" """
def started(self, item: Metadata) -> None: def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}") logger.warning(f"STARTED {item}")
gw, row = self._retrieve_gsheet(item) gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, 'status', 'Archive in progress') gw.set_cell(row, "status", "Archive in progress")
def failed(self, item: Metadata, reason:str) -> None: def failed(self, item: Metadata, reason: str) -> None:
logger.error(f"FAILED {item}") logger.error(f"FAILED {item}")
self._safe_status_update(item, f'Archive failed {reason}') self._safe_status_update(item, f"Archive failed {reason}")
def aborted(self, item: Metadata) -> None: def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}") logger.warning(f"ABORTED {item}")
self._safe_status_update(item, '') self._safe_status_update(item, "")
def fetch(self, item: Metadata) -> Union[Metadata, bool]: def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check if the given item has been archived already""" """check if the given item has been archived already"""
return False return False
def done(self, item: Metadata, cached: bool=False) -> None: def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB""" """archival result ready - should be saved to DB"""
logger.success(f"DONE {item.get_url()}") logger.success(f"DONE {item.get_url()}")
gw, row = self._retrieve_gsheet(item) gw, row = self._retrieve_gsheet(item)
@ -45,23 +44,25 @@ class GsheetsDb(Database):
def batch_if_valid(col, val, final_value=None): def batch_if_valid(col, val, final_value=None):
final_value = final_value or val final_value = final_value or val
try: try:
if val and gw.col_exists(col) and gw.get_cell(row_values, col) == '': if val and gw.col_exists(col) and gw.get_cell(row_values, col) == "":
cell_updates.append((row, col, final_value)) cell_updates.append((row, col, final_value))
except Exception as e: except Exception as e:
logger.error(f"Unable to batch {col}={final_value} due to {e}") logger.error(f"Unable to batch {col}={final_value} due to {e}")
status_message = item.status status_message = item.status
if cached: if cached:
status_message = f"[cached] {status_message}" status_message = f"[cached] {status_message}"
cell_updates.append((row, 'status', status_message)) cell_updates.append((row, "status", status_message))
media: Media = item.get_final_media() media: Media = item.get_final_media()
if hasattr(media, "urls"): if hasattr(media, "urls"):
batch_if_valid('archive', "\n".join(media.urls)) batch_if_valid("archive", "\n".join(media.urls))
batch_if_valid('date', True, datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=datetime.timezone.utc).isoformat()) batch_if_valid("date", True, get_current_timestamp())
batch_if_valid('title', item.get_title()) batch_if_valid("title", item.get_title())
batch_if_valid('text', item.get("content", "")) batch_if_valid("text", item.get("content", ""))
batch_if_valid('timestamp', item.get_timestamp()) batch_if_valid("timestamp", item.get_timestamp())
if media: batch_if_valid('hash', media.get("hash", "not-calculated")) if media:
batch_if_valid("hash", media.get("hash", "not-calculated"))
# merge all pdq hashes into a single string, if present # merge all pdq hashes into a single string, if present
pdq_hashes = [] pdq_hashes = []
@ -70,29 +71,40 @@ class GsheetsDb(Database):
if pdq := m.get("pdq_hash"): if pdq := m.get("pdq_hash"):
pdq_hashes.append(pdq) pdq_hashes.append(pdq)
if len(pdq_hashes): if len(pdq_hashes):
batch_if_valid('pdq_hash', ",".join(pdq_hashes)) batch_if_valid("pdq_hash", ",".join(pdq_hashes))
if (screenshot := item.get_media_by_id("screenshot")) and hasattr(screenshot, "urls"): if (screenshot := item.get_media_by_id("screenshot")) and hasattr(
batch_if_valid('screenshot', "\n".join(screenshot.urls)) screenshot, "urls"
):
batch_if_valid("screenshot", "\n".join(screenshot.urls))
if (thumbnail := item.get_first_image("thumbnail")): if thumbnail := item.get_first_image("thumbnail"):
if hasattr(thumbnail, "urls"): if hasattr(thumbnail, "urls"):
batch_if_valid('thumbnail', f'=IMAGE("{thumbnail.urls[0]}")') batch_if_valid("thumbnail", f'=IMAGE("{thumbnail.urls[0]}")')
if (browsertrix := item.get_media_by_id("browsertrix")): if browsertrix := item.get_media_by_id("browsertrix"):
batch_if_valid('wacz', "\n".join(browsertrix.urls)) batch_if_valid("wacz", "\n".join(browsertrix.urls))
batch_if_valid('replaywebpage', "\n".join([f'https://replayweb.page/?source={quote(wacz)}#view=pages&url={quote(item.get_url())}' for wacz in browsertrix.urls])) batch_if_valid(
"replaywebpage",
"\n".join(
[
f"https://replayweb.page/?source={quote(wacz)}#view=pages&url={quote(item.get_url())}"
for wacz in browsertrix.urls
]
),
)
gw.batch_set_cell(cell_updates) gw.batch_set_cell(cell_updates)
def _safe_status_update(self, item: Metadata, new_status: str) -> None: def _safe_status_update(self, item: Metadata, new_status: str) -> None:
try: try:
gw, row = self._retrieve_gsheet(item) gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, 'status', new_status) gw.set_cell(row, "status", new_status)
except Exception as e: except Exception as e:
logger.debug(f"Unable to update sheet: {e}") logger.debug(f"Unable to update sheet: {e}")
def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]: def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]:
if gsheet := item.get_context("gsheet"): if gsheet := item.get_context("gsheet"):
gw: GWorksheet = gsheet.get("worksheet") gw: GWorksheet = gsheet.get("worksheet")
row: int = gsheet.get("row") row: int = gsheet.get("row")

Wyświetl plik

@ -21,8 +21,7 @@ from . import GWorksheet
class GsheetsFeeder(Feeder): class GsheetsFeeder(Feeder):
def setup(self, config: dict): def setup(self) -> None:
super().setup(config)
self.gsheets_client = gspread.service_account(filename=self.service_account) self.gsheets_client = gspread.service_account(filename=self.service_account)
# TODO mv to validators # TODO mv to validators
assert self.sheet or self.sheet_id, ( assert self.sheet or self.sheet_id, (
@ -37,41 +36,48 @@ class GsheetsFeeder(Feeder):
def __iter__(self) -> Metadata: def __iter__(self) -> Metadata:
sh = self.open_sheet() sh = self.open_sheet()
for ii, wks in enumerate(sh.worksheets()): for ii, worksheet in enumerate(sh.worksheets()):
if not self.should_process_sheet(wks.title): if not self.should_process_sheet(worksheet.title):
logger.debug(f"SKIPPED worksheet '{wks.title}' due to allow/block rules") logger.debug(f"SKIPPED worksheet '{worksheet.title}' due to allow/block rules")
continue continue
logger.info(f'Opening worksheet {ii=}: {worksheet.title=} header={self.header}')
logger.info(f'Opening worksheet {ii=}: {wks.title=} header={self.header}') gw = GWorksheet(worksheet, header_row=self.header, columns=self.columns)
gw = GWorksheet(wks, header_row=self.header, columns=self.columns)
if len(missing_cols := self.missing_required_columns(gw)): if len(missing_cols := self.missing_required_columns(gw)):
logger.warning(f"SKIPPED worksheet '{wks.title}' due to missing required column(s) for {missing_cols}") logger.warning(f"SKIPPED worksheet '{worksheet.title}' due to missing required column(s) for {missing_cols}")
continue continue
for row in range(1 + self.header, gw.count_rows() + 1): # process and yield metadata here:
url = gw.get_cell(row, 'url').strip() yield from self._process_rows(gw)
if not len(url): continue logger.success(f'Finished worksheet {worksheet.title}')
original_status = gw.get_cell(row, 'status') def _process_rows(self, gw: GWorksheet):
status = gw.get_cell(row, 'status', fresh=original_status in ['', None]) for row in range(1 + self.header, gw.count_rows() + 1):
# TODO: custom status parser(?) aka should_retry_from_status url = gw.get_cell(row, 'url').strip()
if status not in ['', None]: continue if not len(url): continue
original_status = gw.get_cell(row, 'status')
status = gw.get_cell(row, 'status', fresh=original_status in ['', None])
# TODO: custom status parser(?) aka should_retry_from_status
if status not in ['', None]: continue
# All checks done - archival process starts here # All checks done - archival process starts here
m = Metadata().set_url(url) m = Metadata().set_url(url)
if gw.get_cell_or_default(row, 'folder', "") is None: self._set_context(m, gw, row)
folder = '' yield m
else:
folder = slugify(gw.get_cell_or_default(row, 'folder', "").strip())
if len(folder) and self.use_sheet_names_in_stored_paths:
folder = os.path.join(folder, slugify(self.sheet), slugify(wks.title))
m.set_context('folder', folder) def _set_context(self, m: Metadata, gw: GWorksheet, row: int) -> Metadata:
m.set_context('gsheet', {"row": row, "worksheet": gw}) # TODO: Check folder value not being recognised
yield m m.set_context("gsheet", {"row": row, "worksheet": gw})
if gw.get_cell_or_default(row, 'folder', "") is None:
folder = ''
else:
folder = slugify(gw.get_cell_or_default(row, 'folder', "").strip())
if len(folder):
if self.use_sheet_names_in_stored_paths:
m.set_context("folder", os.path.join(folder, slugify(self.sheet), slugify(gw.wks.title)))
else:
m.set_context("folder", folder)
logger.success(f'Finished worksheet {wks.title}')
def should_process_sheet(self, sheet_name: str) -> bool: def should_process_sheet(self, sheet_name: str) -> bool:
if len(self.allow_worksheets) and sheet_name not in self.allow_worksheets: if len(self.allow_worksheets) and sheet_name not in self.allow_worksheets:

Wyświetl plik

@ -17,9 +17,8 @@ class HtmlFormatter(Formatter):
environment: Environment = None environment: Environment = None
template: any = None template: any = None
def setup(self, config: dict) -> None: def setup(self) -> None:
"""Sets up the Jinja2 environment and loads the template.""" """Sets up the Jinja2 environment and loads the template."""
super().setup(config) # Ensure the base class logic is executed
template_dir = os.path.join(pathlib.Path(__file__).parent.resolve(), "templates/") template_dir = os.path.join(pathlib.Path(__file__).parent.resolve(), "templates/")
self.environment = Environment(loader=FileSystemLoader(template_dir), autoescape=True) self.environment = Environment(loader=FileSystemLoader(template_dir), autoescape=True)

Wyświetl plik

@ -32,8 +32,7 @@ class InstagramAPIExtractor(Extractor):
r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com)\/(stories(?:\/highlights)?|p|reel)?\/?([^\/\?]*)\/?(\d+)?" r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com)\/(stories(?:\/highlights)?|p|reel)?\/?([^\/\?]*)\/?(\d+)?"
) )
def setup(self, config: dict) -> None: def setup(self) -> None:
super().setup(config)
if self.api_endpoint[-1] == "/": if self.api_endpoint[-1] == "/":
self.api_endpoint = self.api_endpoint[:-1] self.api_endpoint = self.api_endpoint[:-1]

Wyświetl plik

@ -25,8 +25,7 @@ class InstagramExtractor(Extractor):
profile_pattern = re.compile(r"{valid_url}(\w+)".format(valid_url=valid_url)) profile_pattern = re.compile(r"{valid_url}(\w+)".format(valid_url=valid_url))
# TODO: links to stories # TODO: links to stories
def setup(self, config: dict) -> None: def setup(self) -> None:
super().setup(config)
self.insta = instaloader.Instaloader( self.insta = instaloader.Instaloader(
download_geotags=True, download_comments=True, compress_json=False, dirname_pattern=self.download_folder, filename_pattern="{date_utc}_UTC_{target}__{typename}" download_geotags=True, download_comments=True, compress_json=False, dirname_pattern=self.download_folder, filename_pattern="{date_utc}_UTC_{target}__{typename}"

Wyświetl plik

@ -27,26 +27,36 @@ class InstagramTbotExtractor(Extractor):
https://t.me/instagram_load_bot https://t.me/instagram_load_bot
""" """
def setup(self, configs) -> None: def setup(self) -> None:
""" """
1. makes a copy of session_file that is removed in cleanup 1. makes a copy of session_file that is removed in cleanup
2. checks if the session file is valid 2. checks if the session file is valid
""" """
super().setup(configs)
logger.info(f"SETUP {self.name} checking login...") logger.info(f"SETUP {self.name} checking login...")
self._prepare_session_file()
self._initialize_telegram_client()
# make a copy of the session that is used exclusively with this archiver instance def _prepare_session_file(self):
"""
Creates a copy of the session file for exclusive use with this archiver instance.
Ensures that a valid session file exists before proceeding.
"""
new_session_file = os.path.join("secrets/", f"instabot-{time.strftime('%Y-%m-%d')}{random_str(8)}.session") new_session_file = os.path.join("secrets/", f"instabot-{time.strftime('%Y-%m-%d')}{random_str(8)}.session")
if not os.path.exists(f"{self.session_file}.session"): if not os.path.exists(f"{self.session_file}.session"):
raise FileNotFoundError(f"session file {self.session_file}.session not found, " raise FileNotFoundError(f"Session file {self.session_file}.session not found.")
f"to set this up run the setup script in scripts/telegram_setup.py")
shutil.copy(self.session_file + ".session", new_session_file) shutil.copy(self.session_file + ".session", new_session_file)
self.session_file = new_session_file.replace(".session", "") self.session_file = new_session_file.replace(".session", "")
def _initialize_telegram_client(self):
"""Initializes the Telegram client."""
try: try:
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash) self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
except OperationalError as e: except OperationalError as e:
logger.error(f"Unable to access the {self.session_file} session, please make sure you don't use the same session file here and in telethon_extractor. if you do then disable at least one of the archivers for the 1st time you setup telethon session: {e}") logger.error(
f"Unable to access the {self.session_file} session. "
"Ensure that you don't use the same session file here and in telethon_extractor. "
"If you do, disable at least one of the archivers for the first-time setup of the telethon session: {e}"
)
with self.client.start(): with self.client.start():
logger.success(f"SETUP {self.name} login works.") logger.success(f"SETUP {self.name} login works.")
@ -63,32 +73,49 @@ class InstagramTbotExtractor(Extractor):
result = Metadata() result = Metadata()
tmp_dir = self.tmp_dir tmp_dir = self.tmp_dir
with self.client.start(): with self.client.start():
chat = self.client.get_entity("instagram_load_bot")
since_id = self.client.send_message(entity=chat, message=url).id
attempts = 0 chat, since_id = self._send_url_to_bot(url)
seen_media = [] message = self._process_messages(chat, since_id, tmp_dir, result)
message = ""
time.sleep(3)
# media is added before text by the bot so it can be used as a stop-logic mechanism
while attempts < (self.timeout - 3) and (not message or not len(seen_media)):
attempts += 1
time.sleep(1)
for post in self.client.iter_messages(chat, min_id=since_id):
since_id = max(since_id, post.id)
if post.media and post.id not in seen_media:
filename_dest = os.path.join(tmp_dir, f'{chat.id}_{post.id}')
media = self.client.download_media(post.media, filename_dest)
if media:
result.add_media(Media(media))
seen_media.append(post.id)
if post.message: message += post.message
if "You must enter a URL to a post" in message: if "You must enter a URL to a post" in message:
logger.debug(f"invalid link {url=} for {self.name}: {message}") logger.debug(f"invalid link {url=} for {self.name}: {message}")
return False return False
# # TODO: It currently returns this as a success - is that intentional?
# if "Media not found or unavailable" in message:
# logger.debug(f"invalid link {url=} for {self.name}: {message}")
# return False
if message: if message:
result.set_content(message).set_title(message[:128]) result.set_content(message).set_title(message[:128])
return result.success("insta-via-bot") return result.success("insta-via-bot")
def _send_url_to_bot(self, url: str):
"""
Sends the URL to the 'instagram_load_bot' and returns (chat, since_id).
"""
chat = self.client.get_entity("instagram_load_bot")
since_message = self.client.send_message(entity=chat, message=url)
return chat, since_message.id
def _process_messages(self, chat, since_id, tmp_dir, result):
attempts = 0
seen_media = []
message = ""
time.sleep(3)
# media is added before text by the bot so it can be used as a stop-logic mechanism
while attempts < (self.timeout - 3) and (not message or not len(seen_media)):
attempts += 1
time.sleep(1)
for post in self.client.iter_messages(chat, min_id=since_id):
since_id = max(since_id, post.id)
# Skip known filler message:
if post.message == 'The bot receives information through https://hikerapi.com/p/hJqpppqi':
continue
if post.media and post.id not in seen_media:
filename_dest = os.path.join(tmp_dir, f'{chat.id}_{post.id}')
media = self.client.download_media(post.media, filename_dest)
if media:
result.add_media(Media(media))
seen_media.append(post.id)
if post.message: message += post.message
return message.strip()

Wyświetl plik

@ -3,7 +3,7 @@
"type": ["storage"], "type": ["storage"],
"requires_setup": True, "requires_setup": True,
"dependencies": { "dependencies": {
"python": ["boto3", "loguru"], "python": ["hash_enricher", "boto3", "loguru"],
}, },
"configs": { "configs": {
"path_generator": { "path_generator": {
@ -49,5 +49,6 @@
- Requires S3 credentials (API key and secret) and a bucket name to function. - Requires S3 credentials (API key and secret) and a bucket name to function.
- The `random_no_duplicate` option ensures no duplicate uploads by leveraging hash-based folder structures. - The `random_no_duplicate` option ensures no duplicate uploads by leveraging hash-based folder structures.
- Uses `boto3` for interaction with the S3 API. - Uses `boto3` for interaction with the S3 API.
- Depends on the `HashEnricher` module for hash calculation.
""" """
} }

Wyświetl plik

@ -13,8 +13,7 @@ NO_DUPLICATES_FOLDER = "no-dups/"
class S3Storage(Storage): class S3Storage(Storage):
def setup(self, config: dict) -> None: def setup(self) -> None:
super().setup(config)
self.s3 = boto3.client( self.s3 = boto3.client(
's3', 's3',
region_name=self.region, region_name=self.region,

Wyświetl plik

@ -18,13 +18,13 @@ class TelethonExtractor(Extractor):
invite_pattern = re.compile(r"t.me(\/joinchat){0,1}\/\+?(.+)") invite_pattern = re.compile(r"t.me(\/joinchat){0,1}\/\+?(.+)")
def setup(self, config: dict) -> None: def setup(self) -> None:
""" """
1. makes a copy of session_file that is removed in cleanup 1. makes a copy of session_file that is removed in cleanup
2. trigger login process for telegram or proceed if already saved in a session file 2. trigger login process for telegram or proceed if already saved in a session file
3. joins channel_invites where needed 3. joins channel_invites where needed
""" """
super().setup(config)
logger.info(f"SETUP {self.name} checking login...") logger.info(f"SETUP {self.name} checking login...")
# make a copy of the session that is used exclusively with this archiver instance # make a copy of the session that is used exclusively with this archiver instance

Wyświetl plik

@ -15,9 +15,7 @@ class TwitterApiExtractor(Extractor):
valid_url: re.Pattern = re.compile(r"(?:twitter|x).com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)") valid_url: re.Pattern = re.compile(r"(?:twitter|x).com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)")
def setup(self, config: dict) -> None: def setup(self) -> None:
super().setup(config)
self.api_index = 0 self.api_index = 0
self.apis = [] self.apis = []
if len(self.bearer_tokens): if len(self.bearer_tokens):

Wyświetl plik

@ -12,8 +12,7 @@ class VkExtractor(Extractor):
Currently only works for /wall posts Currently only works for /wall posts
""" """
def setup(self, config: dict) -> None: def setup(self) -> None:
super().setup(config)
self.vks = VkScraper(self.username, self.password, session_file=self.session_file) self.vks = VkScraper(self.username, self.password, session_file=self.session_file)
def download(self, item: Metadata) -> Metadata: def download(self, item: Metadata) -> Metadata:

Wyświetl plik

@ -18,8 +18,7 @@ class WaczExtractorEnricher(Enricher, Extractor):
When used as an archiver it will extract the media from the .WACZ archive so it can be enriched. When used as an archiver it will extract the media from the .WACZ archive so it can be enriched.
""" """
def setup(self, configs) -> None: def setup(self) -> None:
super().setup(configs)
self.use_docker = os.environ.get('WACZ_ENABLE_DOCKER') or not os.environ.get('RUNNING_IN_DOCKER') self.use_docker = os.environ.get('WACZ_ENABLE_DOCKER') or not os.environ.get('RUNNING_IN_DOCKER')
self.docker_in_docker = os.environ.get('WACZ_ENABLE_DOCKER') and os.environ.get('RUNNING_IN_DOCKER') self.docker_in_docker = os.environ.get('WACZ_ENABLE_DOCKER') and os.environ.get('RUNNING_IN_DOCKER')

Wyświetl plik

@ -6,11 +6,15 @@
"python": ["s3_storage", "loguru", "requests"], "python": ["s3_storage", "loguru", "requests"],
}, },
"configs": { "configs": {
"api_endpoint": {"default": None, "help": "WhisperApi api endpoint, eg: https://whisperbox-api.com/api/v1, a deployment of https://github.com/bellingcat/whisperbox-transcribe."}, "api_endpoint": {"required": True,
"api_key": {"default": None, "help": "WhisperApi api key for authentication"}, "help": "WhisperApi api endpoint, eg: https://whisperbox-api.com/api/v1, a deployment of https://github.com/bellingcat/whisperbox-transcribe."},
"api_key": {"required": True,
"help": "WhisperApi api key for authentication"},
"include_srt": {"default": False, "help": "Whether to include a subtitle SRT (SubRip Subtitle file) for the video (can be used in video players)."}, "include_srt": {"default": False, "help": "Whether to include a subtitle SRT (SubRip Subtitle file) for the video (can be used in video players)."},
"timeout": {"default": 90, "help": "How many seconds to wait at most for a successful job completion."}, "timeout": {"default": 90, "help": "How many seconds to wait at most for a successful job completion."},
"action": {"default": "translate", "help": "which Whisper operation to execute", "choices": ["transcribe", "translate", "language_detection"]}, "action": {"default": "translate",
"help": "which Whisper operation to execute",
"choices": ["transcribe", "translate", "language_detection"]},
}, },
"description": """ "description": """
Integrates with a Whisper API service to transcribe, translate, or detect the language of audio and video files. Integrates with a Whisper API service to transcribe, translate, or detect the language of audio and video files.
@ -25,6 +29,7 @@
### Notes ### Notes
- Requires a Whisper API endpoint and API key for authentication. - Requires a Whisper API endpoint and API key for authentication.
- Only compatible with S3-compatible storage systems for media file accessibility. - Only compatible with S3-compatible storage systems for media file accessibility.
- ** This stores the media files in S3 prior to enriching them as Whisper requires public URLs to access the media files.
- Handles multiple jobs and retries for failed or incomplete processing. - Handles multiple jobs and retries for failed or incomplete processing.
""" """
} }

Wyświetl plik

@ -4,7 +4,6 @@ from loguru import logger
from auto_archiver.core import Enricher from auto_archiver.core import Enricher
from auto_archiver.core import Metadata, Media from auto_archiver.core import Metadata, Media
from auto_archiver.modules.s3_storage import S3Storage
from auto_archiver.core.module import get_module from auto_archiver.core.module import get_module
class WhisperEnricher(Enricher): class WhisperEnricher(Enricher):
@ -14,18 +13,25 @@ class WhisperEnricher(Enricher):
Only works if an S3 compatible storage is used Only works if an S3 compatible storage is used
""" """
def enrich(self, to_enrich: Metadata) -> None: def setup(self) -> None:
if not self._get_s3_storage(): self.stores = self.config['steps']['storages']
self.s3 = get_module("s3_storage", self.config)
if not "s3_storage" in self.stores:
logger.error("WhisperEnricher: To use the WhisperEnricher you need to use S3Storage so files are accessible publicly to the whisper service being called.") logger.error("WhisperEnricher: To use the WhisperEnricher you need to use S3Storage so files are accessible publicly to the whisper service being called.")
return return
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url() url = to_enrich.get_url()
logger.debug(f"WHISPER[{self.action}]: iterating media items for {url=}.") logger.debug(f"WHISPER[{self.action}]: iterating media items for {url=}.")
job_results = {} job_results = {}
for i, m in enumerate(to_enrich.media): for i, m in enumerate(to_enrich.media):
if m.is_video() or m.is_audio(): if m.is_video() or m.is_audio():
m.store(url=url, metadata=to_enrich, storages=self.storages) # TODO: this used to pass all storage items to store now
# Now only passing S3, the rest will get added later in the usual order (?)
m.store(url=url, metadata=to_enrich, storages=[self.s3])
try: try:
job_id = self.submit_job(m) job_id = self.submit_job(m)
job_results[job_id] = False job_results[job_id] = False
@ -53,8 +59,8 @@ class WhisperEnricher(Enricher):
to_enrich.set_content(f"\n[automatic video transcript]: {v}") to_enrich.set_content(f"\n[automatic video transcript]: {v}")
def submit_job(self, media: Media): def submit_job(self, media: Media):
s3 = get_module("s3_storage", self.config)
s3_url = s3.get_cdn_url(media) s3_url = self.s3.get_cdn_url(media)
assert s3_url in media.urls, f"Could not find S3 url ({s3_url}) in list of stored media urls " assert s3_url in media.urls, f"Could not find S3 url ({s3_url}) in list of stored media urls "
payload = { payload = {
"url": s3_url, "url": s3_url,
@ -107,10 +113,3 @@ class WhisperEnricher(Enricher):
logger.debug(f"DELETE whisper {job_id=} result: {r_del.status_code}") logger.debug(f"DELETE whisper {job_id=} result: {r_del.status_code}")
return result return result
return False return False
def _get_s3_storage(self) -> S3Storage:
try:
return next(s for s in self.storages if s.__class__ == S3Storage)
except:
logger.warning("No S3Storage instance found in storages")
return

Wyświetl plik

@ -1,53 +0,0 @@
import json, gspread
from auto_archiver.core import BaseModule
class Gsheets(BaseModule):
name = "gsheets"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
self.gsheets_client = gspread.service_account(filename=self.service_account)
# TODO: config should be responsible for conversions
try: self.header = int(self.header)
except: pass
assert type(self.header) == int, f"header ({self.header}) value must be an integer not {type(self.header)}"
assert self.sheet is not None or self.sheet_id is not None, "You need to define either a 'sheet' name or a 'sheet_id' in your orchestration file when using gsheets."
# TODO merge this into gsheets processors manifest
@staticmethod
def configs() -> dict:
return {
"sheet": {"default": None, "help": "name of the sheet to archive"},
"sheet_id": {"default": None, "help": "(alternative to sheet name) the id of the sheet to archive"},
"header": {"default": 1, "help": "index of the header row (starts at 1)"},
"service_account": {"default": "secrets/service_account.json", "help": "service account JSON file path"},
"columns": {
"default": {
'url': 'link',
'status': 'archive status',
'folder': 'destination folder',
'archive': 'archive location',
'date': 'archive date',
'thumbnail': 'thumbnail',
'timestamp': 'upload timestamp',
'title': 'upload title',
'text': 'text content',
'screenshot': 'screenshot',
'hash': 'hash',
'pdq_hash': 'perceptual hashes',
'wacz': 'wacz',
'replaywebpage': 'replaywebpage',
},
"help": "names of columns in the google sheet (stringified JSON object)",
"cli_set": lambda cli_val, cur_val: dict(cur_val, **json.loads(cli_val))
},
}
def open_sheet(self):
if self.sheet:
return self.gsheets_client.open(self.sheet)
else: # self.sheet_id
return self.gsheets_client.open_by_key(self.sheet_id)

Wyświetl plik

@ -1,9 +1,7 @@
import os import os
import json import json
import uuid import uuid
from datetime import datetime from datetime import datetime, timezone
import requests import requests
import hashlib import hashlib
from loguru import logger from loguru import logger
@ -73,3 +71,34 @@ def calculate_file_hash(filename: str, hash_algo = hashlib.sha256, chunksize: in
if not buf: break if not buf: break
hash.update(buf) hash.update(buf)
return hash.hexdigest() return hash.hexdigest()
def get_current_datetime_iso() -> str:
return datetime.now(timezone.utc).replace(tzinfo=timezone.utc).isoformat()
def get_datetime_from_str(dt_str: str, fmt: str | None = None) -> datetime | None:
# parse a datetime string with option of passing a specific format
try:
return datetime.strptime(dt_str, fmt) if fmt else datetime.fromisoformat(dt_str)
except ValueError as e:
logger.error(f"Unable to parse datestring {dt_str}: {e}")
return None
def get_timestamp(ts, utc=True, iso=True) -> str | datetime | None:
# Consistent parsing of timestamps
# If utc=True, the timezone is set to UTC,
# if iso=True, the output is an iso string
if not ts: return
try:
if isinstance(ts, str): ts = datetime.fromisoformat(ts)
if isinstance(ts, (int, float)): ts = datetime.fromtimestamp(ts)
if utc: ts = ts.replace(tzinfo=timezone.utc)
if iso: return ts.isoformat()
return ts
except Exception as e:
logger.error(f"Unable to parse timestamp {ts}: {e}")
return None
def get_current_timestamp() -> str:
return get_timestamp(datetime.now())

Wyświetl plik

@ -1,7 +1,8 @@
""" """
pytest conftest file, for shared fixtures and configuration pytest conftest file, for shared fixtures and configuration
""" """
import os
import pickle
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import Dict, Tuple from typing import Dict, Tuple
import hashlib import hashlib
@ -113,4 +114,18 @@ def pytest_runtest_setup(item):
test_name = _test_failed_incremental[cls_name].get((), None) test_name = _test_failed_incremental[cls_name].get((), None)
# if name found, test has failed for the combination of class name & test name # if name found, test has failed for the combination of class name & test name
if test_name is not None: if test_name is not None:
pytest.xfail(f"previous test failed ({test_name})") pytest.xfail(f"previous test failed ({test_name})")
@pytest.fixture()
def unpickle():
"""
Returns a helper function that unpickles a file
** gets the file from the test_files directory: tests/data/test_files **
"""
def _unpickle(path):
test_data_dir = os.path.join(os.path.dirname(__file__), "data", "test_files")
with open(os.path.join(test_data_dir, path), "rb") as f:
return pickle.load(f)
return _unpickle

Wyświetl plik

@ -0,0 +1,142 @@
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
import pytest
from auto_archiver.core import Metadata, Media
from auto_archiver.modules.gsheet_db import GsheetsDb
from auto_archiver.modules.gsheet_feeder import GWorksheet
@pytest.fixture
def mock_gworksheet():
mock_gworksheet = MagicMock(spec=GWorksheet)
mock_gworksheet.col_exists.return_value = True
mock_gworksheet.get_cell.return_value = ""
mock_gworksheet.get_row.return_value = {}
return mock_gworksheet
@pytest.fixture
def mock_metadata():
metadata: Metadata = MagicMock(spec=Metadata)
metadata.get_url.return_value = "http://example.com"
metadata.status = "done"
metadata.get_title.return_value = "Example Title"
metadata.get.return_value = "Example Content"
metadata.get_timestamp.return_value = "2025-01-01T00:00:00"
metadata.get_final_media.return_value = MagicMock(spec=Media)
metadata.get_all_media.return_value = []
metadata.get_media_by_id.return_value = None
metadata.get_first_image.return_value = None
return metadata
@pytest.fixture
def metadata():
metadata = Metadata()
metadata.add_media(Media(filename="screenshot", urls=["http://example.com/screenshot.png"]))
metadata.add_media(Media(filename="browsertrix", urls=["http://example.com/browsertrix.wacz"]))
metadata.add_media(Media(filename="thumbnail", urls=["http://example.com/thumbnail.png"]))
metadata.set_url("http://example.com")
metadata.set_title("Example Title")
metadata.set_content("Example Content")
metadata.success("my-archiver")
metadata.set("timestamp", "2025-01-01T00:00:00")
metadata.set("date", "2025-02-04T18:22:24.909112+00:00")
return metadata
@pytest.fixture
def mock_media():
"""Fixture for a mock Media object."""
mock_media = MagicMock(spec=Media)
mock_media.urls = ["http://example.com/media"]
mock_media.get.return_value = "not-calculated"
return mock_media
@pytest.fixture
def gsheets_db(mock_gworksheet, setup_module):
db = setup_module("gsheet_db", {
"allow_worksheets": "set()",
"block_worksheets": "set()",
"use_sheet_names_in_stored_paths": "True",
})
db._retrieve_gsheet = MagicMock(return_value=(mock_gworksheet, 1))
return db
@pytest.fixture
def fixed_timestamp():
"""Fixture for a fixed timestamp."""
return datetime(2025, 1, 1, tzinfo=timezone.utc)
@pytest.fixture
def expected_calls(mock_media, fixed_timestamp):
"""Fixture for the expected cell updates."""
return [
(1, 'status', 'my-archiver: success'),
(1, 'archive', 'http://example.com/screenshot.png'),
(1, 'date', '2025-02-01T00:00:00+00:00'),
(1, 'title', 'Example Title'),
(1, 'text', 'Example Content'),
(1, 'timestamp', '2025-01-01T00:00:00+00:00'),
(1, 'hash', 'not-calculated'),
# (1, 'screenshot', 'http://example.com/screenshot.png'),
# (1, 'thumbnail', '=IMAGE("http://example.com/thumbnail.png")'),
# (1, 'wacz', 'http://example.com/browsertrix.wacz'),
# (1, 'replaywebpage', 'https://replayweb.page/?source=http%3A%2F%2Fexample.com%2Fbrowsertrix.wacz#view=pages&url=')
]
def test_retrieve_gsheet(gsheets_db, metadata, mock_gworksheet):
gw, row = gsheets_db._retrieve_gsheet(metadata)
assert gw == mock_gworksheet
assert row == 1
def test_started(gsheets_db, mock_metadata, mock_gworksheet):
gsheets_db.started(mock_metadata)
mock_gworksheet.set_cell.assert_called_once_with(1, 'status', 'Archive in progress')
def test_failed(gsheets_db, mock_metadata, mock_gworksheet):
reason = "Test failure"
gsheets_db.failed(mock_metadata, reason)
mock_gworksheet.set_cell.assert_called_once_with(1, 'status', f'Archive failed {reason}')
def test_aborted(gsheets_db, mock_metadata, mock_gworksheet):
gsheets_db.aborted(mock_metadata)
mock_gworksheet.set_cell.assert_called_once_with(1, 'status', '')
def test_done(gsheets_db, metadata, mock_gworksheet, expected_calls):
with patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00'):
gsheets_db.done(metadata)
mock_gworksheet.batch_set_cell.assert_called_once_with(expected_calls)
def test_done_cached(gsheets_db, metadata, mock_gworksheet):
with patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00'):
gsheets_db.done(metadata, cached=True)
# Verify the status message includes "[cached]"
call_args = mock_gworksheet.batch_set_cell.call_args[0][0]
assert any(call[2].startswith("[cached]") for call in call_args)
def test_done_missing_media(gsheets_db, metadata, mock_gworksheet):
# clear media from metadata
metadata.media = []
with patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp",
return_value='2025-02-01T00:00:00+00:00'):
gsheets_db.done(metadata)
# Verify nothing media-related gets updated
call_args = mock_gworksheet.batch_set_cell.call_args[0][0]
media_fields = {'archive', 'screenshot', 'thumbnail', 'wacz', 'replaywebpage'}
assert all(call[1] not in media_fields for call in call_args)
def test_safe_status_update(gsheets_db, metadata, mock_gworksheet):
gsheets_db._safe_status_update(metadata, "Test status")
mock_gworksheet.set_cell.assert_called_once_with(1, 'status', 'Test status')

Wyświetl plik

@ -0,0 +1,103 @@
import datetime
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import pytest
from auto_archiver.core import Metadata, Media
from auto_archiver.modules.meta_enricher import MetaEnricher
@pytest.fixture
def mock_metadata():
"""Creates a mock Metadata object."""
mock: Metadata = MagicMock(spec=Metadata)
mock.get_url.return_value = "https://example.com"
mock.is_empty.return_value = False # Default to not empty
mock.get_all_media.return_value = []
return mock
@pytest.fixture
def mock_media():
"""Creates a mock Media object."""
mock: Media = MagicMock(spec=Media)
mock.filename = "mock_file.txt"
return mock
@pytest.fixture
def metadata():
m = Metadata()
m.set_url("https://example.com")
m.set_title("Test Title")
m.set_content("Test Content")
return m
@pytest.fixture(autouse=True)
def meta_enricher(setup_module):
return setup_module(MetaEnricher, {})
def test_enrich_skips_empty_metadata(meta_enricher, mock_metadata):
"""Test that enrich() does nothing when Metadata is empty."""
mock_metadata.is_empty.return_value = True
meta_enricher.enrich(mock_metadata)
mock_metadata.get_url.assert_called_once()
def test_enrich_file_sizes(meta_enricher, metadata, tmp_path):
"""Test that enrich_file_sizes() calculates and sets file sizes correctly."""
file1 = tmp_path / "testfile_1.txt"
file2 = tmp_path / "testfile_2.txt"
file1.write_text("A" * 1000)
file2.write_text("B" * 2000)
metadata.add_media(Media(str(file1)))
metadata.add_media(Media(str(file2)))
meta_enricher.enrich_file_sizes(metadata)
# Verify individual media file sizes
media1 = metadata.get_all_media()[0]
media2 = metadata.get_all_media()[1]
assert media1.get("bytes") == 1000
assert media1.get("size") == "1000.0 bytes"
assert media2.get("bytes") == 2000
assert media2.get("size") == "2.0 KB"
assert metadata.get("total_bytes") == 3000
assert metadata.get("total_size") == "2.9 KB"
@pytest.mark.parametrize(
"size, expected",
[
(500, "500.0 bytes"),
(1024, "1.0 KB"),
(2048, "2.0 KB"),
(1048576, "1.0 MB"),
(1073741824, "1.0 GB"),
],
)
def test_human_readable_bytes(size, expected):
"""Test that human_readable_bytes() converts sizes correctly."""
enricher = MetaEnricher()
assert enricher.human_readable_bytes(size) == expected
def test_enrich_file_sizes_no_media(meta_enricher, metadata):
"""Test that enrich_file_sizes() handles empty media list gracefully."""
meta_enricher.enrich_file_sizes(metadata)
assert metadata.get("total_bytes") == 0
assert metadata.get("total_size") == "0.0 bytes"
def test_enrich_archive_duration(meta_enricher, metadata):
# Set fixed "processed at" time in the past
processed_at = datetime.now(timezone.utc) - timedelta(minutes=10, seconds=30)
metadata.set("_processed_at", processed_at)
# patch datetime
with patch("datetime.datetime") as mock_datetime:
mock_now = datetime.now(timezone.utc)
mock_datetime.now.return_value = mock_now
meta_enricher.enrich_archive_duration(metadata)
assert metadata.get("archive_duration_seconds") == 630

Wyświetl plik

@ -0,0 +1,188 @@
from datetime import datetime
from typing import Type
import pytest
from unittest.mock import patch, MagicMock
from auto_archiver.core import Metadata
from auto_archiver.modules.instagram_api_extractor.instagram_api_extractor import InstagramAPIExtractor
from .test_extractor_base import TestExtractorBase
@pytest.fixture
def mock_user_response():
return {
"user": {
"pk": "123",
"username": "test_user",
"full_name": "Test User",
"profile_pic_url_hd": "http://example.com/profile.jpg",
"profile_pic_url": "http://example.com/profile_lowres.jpg"
}
}
@pytest.fixture
def mock_post_response():
return {
"id": "post_123",
"code": "abc123",
"caption_text": "Test Caption",
"taken_at": datetime.now().timestamp(),
"video_url": "http://example.com/video.mp4",
"thumbnail_url": "http://example.com/thumbnail.jpg"
}
@pytest.fixture
def mock_story_response():
return [{
"id": "story_123",
"taken_at": datetime.now().timestamp(),
"video_url": "http://example.com/story.mp4"
}]
@pytest.fixture
def mock_highlight_response():
return {
"response": {
"reels": {
"highlight:123": {
"id": "123",
"title": "Test Highlight",
"items": [{
"id": "item_123",
"taken_at": datetime.now().timestamp(),
"video_url": "http://example.com/highlight.mp4"
}]
}
}
}
}
# @pytest.mark.incremental
class TestInstagramAPIExtractor(TestExtractorBase):
"""
Test suite for InstagramAPIExtractor.
"""
extractor_module = "instagram_api_extractor"
extractor: InstagramAPIExtractor
config = {
"access_token": "test_access_token",
"api_endpoint": "https://api.instagram.com/v1",
"full_profile": False,
# "full_profile_max_posts": 0,
# "minimize_json_output": True,
}
@pytest.fixture
def metadata(self):
m = Metadata()
m.set_url("https://instagram.com/test_user")
m.set("netloc", "instagram.com")
return m
@pytest.mark.parametrize("url,expected", [
("https://instagram.com/user", [("", "user", "")]),
("https://instagr.am/p/post_id", []),
("https://youtube.com", []),
("https://www.instagram.com/reel/reel_id", [("reel", "reel_id", "")]),
("https://instagram.com/stories/highlights/123", [("stories/highlights", "123", "")]),
("https://instagram.com/stories/user/123", [("stories", "user", "123")]),
])
def test_url_parsing(self, url, expected):
assert self.extractor.valid_url.findall(url) == expected
def test_initialize(self):
assert self.extractor.api_endpoint[-1] != "/"
@pytest.mark.parametrize("input_dict,expected", [
({"x": 0, "valid": "data"}, {"valid": "data"}),
({"nested": {"y": None, "valid": [{}]}}, {"nested": {"valid": [{}]}}),
])
def test_cleanup_dict(self, input_dict, expected):
assert self.extractor.cleanup_dict(input_dict) == expected
def test_download(self):
pass
def test_download_post(self, metadata, mock_user_response):
# test with context=reel
# test with context=post
# test with multiple images
# test gets text (metadata title)
pass
def test_download_profile_basic(self, metadata, mock_user_response):
"""Test basic profile download without full_profile"""
with patch.object(self.extractor, 'call_api') as mock_call, \
patch.object(self.extractor, 'download_from_url') as mock_download:
# Mock API responses
mock_call.return_value = mock_user_response
mock_download.return_value = "profile.jpg"
result = self.extractor.download_profile(metadata, "test_user")
assert result.status == "insta profile: success"
assert result.get_title() == "Test User"
assert result.get("data") == self.extractor.cleanup_dict(mock_user_response["user"])
# Verify profile picture download
mock_call.assert_called_once_with("v2/user/by/username", {"username": "test_user"})
mock_download.assert_called_once_with("http://example.com/profile.jpg")
assert len(result.media) == 1
assert result.media[0].filename == "profile.jpg"
def test_download_profile_full(self, metadata, mock_user_response, mock_story_response):
"""Test full profile download with stories/posts"""
with patch.object(self.extractor, 'call_api') as mock_call, \
patch.object(self.extractor, 'download_all_posts') as mock_posts, \
patch.object(self.extractor, 'download_all_highlights') as mock_highlights, \
patch.object(self.extractor, 'download_all_tagged') as mock_tagged, \
patch.object(self.extractor, '_download_stories_reusable') as mock_stories:
self.extractor.full_profile = True
mock_call.side_effect = [
mock_user_response,
mock_story_response
]
mock_highlights.return_value = None
mock_stories.return_value = mock_story_response
mock_posts.return_value = None
mock_tagged.return_value = None
result = self.extractor.download_profile(metadata, "test_user")
assert result.get("#stories") == len(mock_story_response)
mock_posts.assert_called_once_with(result, "123")
assert "errors" not in result.metadata
def test_download_profile_not_found(self, metadata):
"""Test profile not found error"""
with patch.object(self.extractor, 'call_api') as mock_call:
mock_call.return_value = {"user": None}
with pytest.raises(AssertionError) as exc_info:
self.extractor.download_profile(metadata, "invalid_user")
assert "User invalid_user not found" in str(exc_info.value)
def test_download_profile_error_handling(self, metadata, mock_user_response):
"""Test error handling in full profile mode"""
with (patch.object(self.extractor, 'call_api') as mock_call, \
patch.object(self.extractor, 'download_all_highlights') as mock_highlights, \
patch.object(self.extractor, 'download_all_tagged') as mock_tagged, \
patch.object(self.extractor, '_download_stories_reusable') as stories_tagged, \
patch.object(self.extractor, 'download_all_posts') as mock_posts
):
self.extractor.full_profile = True
mock_call.side_effect = [
mock_user_response,
Exception("Stories API failed"),
Exception("Posts API failed")
]
mock_highlights.return_value = None
mock_tagged.return_value = None
stories_tagged.return_value = None
mock_posts.return_value = None
result = self.extractor.download_profile(metadata, "test_user")
assert result.is_success()
assert "Error downloading stories for test_user" in result.metadata["errors"]

Wyświetl plik

@ -0,0 +1,94 @@
import os
from typing import Type
from unittest.mock import patch, MagicMock
import pytest
from auto_archiver.core import Metadata
from auto_archiver.core.extractor import Extractor
from auto_archiver.modules.instagram_tbot_extractor import InstagramTbotExtractor
from tests.extractors.test_extractor_base import TestExtractorBase
TESTFILES = os.path.join(os.path.dirname(__file__), "testfiles")
@pytest.fixture
def session_file(tmpdir):
"""Fixture to create a test session file."""
session_file = os.path.join(tmpdir, "test_session.session")
with open(session_file, "w") as f:
f.write("mock_session_data")
return session_file.replace(".session", "")
@pytest.fixture(autouse=True)
def patch_extractor_methods(request, setup_module):
with patch.object(InstagramTbotExtractor, '_prepare_session_file', return_value=None), \
patch.object(InstagramTbotExtractor, '_initialize_telegram_client', return_value=None):
if hasattr(request, 'cls') and hasattr(request.cls, 'config'):
request.cls.extractor = setup_module("instagram_tbot_extractor", request.cls.config)
yield
@pytest.fixture
def metadata_sample():
m = Metadata()
m.set_title("Test Title")
m.set_timestamp("2021-01-01T00:00:00Z")
m.set_url("https://www.instagram.com/p/1234567890")
return m
class TestInstagramTbotExtractor:
extractor_module = "instagram_tbot_extractor"
extractor: InstagramTbotExtractor
config = {
"api_id": 12345,
"api_hash": "test_api_hash",
"session_file": "test_session",
}
@pytest.fixture
def mock_telegram_client(self):
"""Fixture to mock TelegramClient interactions."""
with patch("auto_archiver.modules.instagram_tbot_extractor._initialize_telegram_client") as mock_client:
instance = MagicMock()
mock_client.return_value = instance
yield instance
def test_extractor_is_initialized(self):
assert self.extractor is not None
@patch("time.sleep")
@pytest.mark.parametrize("url, expected_status, bot_responses", [
("https://www.instagram.com/p/C4QgLbrIKXG", "insta-via-bot: success", [MagicMock(id=101, media=None, message="Are you new to Bellingcat? - The way we share our investigations is different. 💭\nWe want you to read our story but also learn ou")]),
("https://www.instagram.com/reel/DEVLK8qoIbg/", "insta-via-bot: success", [MagicMock(id=101, media=None, message="Our volunteer community is at the centre of many incredible Bellingcat investigations and tools. Stephanie Ladel is one such vol")]),
# todo tbot not working for stories :(
("https://www.instagram.com/stories/bellingcatofficial/3556336382743057476/", False, [MagicMock(id=101, media=None, message="Media not found or unavailable")]),
("https://www.youtube.com/watch?v=ymCMy8OffHM", False, []),
("https://www.instagram.com/p/INVALID", False, [MagicMock(id=101, media=None, message="You must enter a URL to a post")]),
])
def test_download(self, mock_sleep, url, expected_status, bot_responses, metadata_sample):
"""Test the `download()` method with various Instagram URLs."""
metadata_sample.set_url(url)
self.extractor.client = MagicMock()
result = self.extractor.download(metadata_sample)
pass
# TODO fully mock or use as authenticated test
# if expected_status:
# assert result.is_success()
# assert result.status == expected_status
# assert result.metadata.get("title") in [msg.message[:128] for msg in bot_responses if msg.message]
# else:
# assert result is False
# Test story
# Test expired story
# Test requires login/ access (?)
# Test post
# Test multiple images?

Wyświetl plik

@ -0,0 +1,273 @@
from typing import Type
import gspread
import pytest
from unittest.mock import patch, MagicMock
from auto_archiver.modules.gsheet_feeder import GsheetsFeeder
from auto_archiver.core import Metadata, Feeder
def test_setup_without_sheet_and_sheet_id(setup_module):
# Ensure setup() raises AssertionError if neither sheet nor sheet_id is set.
with patch("gspread.service_account"):
with pytest.raises(AssertionError):
setup_module(
"gsheet_feeder",
{"service_account": "dummy.json", "sheet": None, "sheet_id": None},
)
@pytest.fixture
def gsheet_feeder(setup_module) -> GsheetsFeeder:
with patch("gspread.service_account"):
feeder = setup_module(
"gsheet_feeder",
{
"service_account": "dummy.json",
"sheet": "test-auto-archiver",
"sheet_id": None,
"header": 1,
"columns": {
"url": "link",
"status": "archive status",
"folder": "destination folder",
"archive": "archive location",
"date": "archive date",
"thumbnail": "thumbnail",
"timestamp": "upload timestamp",
"title": "upload title",
"text": "text content",
"screenshot": "screenshot",
"hash": "hash",
"pdq_hash": "perceptual hashes",
"wacz": "wacz",
"replaywebpage": "replaywebpage",
},
"allow_worksheets": set(),
"block_worksheets": set(),
"use_sheet_names_in_stored_paths": True,
},
)
feeder.gsheets_client = MagicMock()
return feeder
class MockWorksheet:
"""
mimics the bits we need from gworksheet
"""
class SheetSheet:
title = "TestSheet"
rows = [
{"row": 2, "url": "http://example.com", "status": "", "folder": ""},
{"row": 3, "url": "http://example.com", "status": "", "folder": ""},
{"row": 4, "url": "", "status": "", "folder": ""},
{"row": 5, "url": "https://another.com", "status": None, "folder": ""},
{
"row": 6,
"url": "https://another.com",
"status": "success",
"folder": "some_folder",
},
]
def __init__(self):
self.wks = self.SheetSheet()
def count_rows(self):
if not self.rows:
return 0
return max(r["row"] for r in self.rows)
def get_cell(self, row, col_name, fresh=False):
matching = next((r for r in self.rows if r["row"] == row), {})
return matching.get(col_name, "")
def get_cell_or_default(self, row, col_name, default):
matching = next((r for r in self.rows if r["row"] == row), {})
return matching.get(col_name, default)
def test__process_rows(gsheet_feeder: GsheetsFeeder):
testworksheet = MockWorksheet()
metadata_items = list(gsheet_feeder._process_rows(testworksheet))
assert len(metadata_items) == 3
assert isinstance(metadata_items[0], Metadata)
assert metadata_items[0].get("url") == "http://example.com"
def test__set_metadata(gsheet_feeder: GsheetsFeeder):
worksheet = MockWorksheet()
metadata = Metadata()
gsheet_feeder._set_context(metadata, worksheet, 1)
assert metadata.get_context("gsheet") == {"row": 1, "worksheet": worksheet}
@pytest.mark.skip(reason="Not recognising folder column")
def test__set_metadata_with_folder_pickled(gsheet_feeder: GsheetsFeeder, worksheet):
gsheet_feeder._set_context(worksheet, 7)
assert Metadata.get_context("gsheet") == {"row": 1, "worksheet": worksheet}
def test__set_metadata_with_folder(gsheet_feeder: GsheetsFeeder):
testworksheet = MockWorksheet()
metadata = Metadata()
testworksheet.wks.title = "TestSheet"
gsheet_feeder._set_context(metadata, testworksheet, 6)
assert metadata.get_context("gsheet") == {"row": 6, "worksheet": testworksheet}
assert metadata.get_context("folder") == "some-folder/test-auto-archiver/testsheet"
@pytest.mark.usefixtures("setup_module")
@pytest.mark.parametrize(
"sheet, sheet_id, expected_method, expected_arg, description",
[
("TestSheet", None, "open", "TestSheet", "opening by sheet name"),
(None, "ABC123", "open_by_key", "ABC123", "opening by sheet ID"),
],
)
def test_open_sheet_with_name_or_id(
setup_module, sheet, sheet_id, expected_method, expected_arg, description
):
"""Ensure open_sheet() correctly opens by name or ID based on configuration."""
with patch("gspread.service_account") as mock_service_account:
mock_client = MagicMock()
mock_service_account.return_value = mock_client
mock_client.open.return_value = "MockSheet"
mock_client.open_by_key.return_value = "MockSheet"
# Setup module with parameterized values
feeder = setup_module(
"gsheet_feeder",
{"service_account": "dummy.json", "sheet": sheet, "sheet_id": sheet_id},
)
sheet_result = feeder.open_sheet()
# Validate the correct method was called
getattr(mock_client, expected_method).assert_called_once_with(
expected_arg
), f"Failed: {description}"
assert sheet_result == "MockSheet", f"Failed: {description}"
@pytest.mark.usefixtures("setup_module")
def test_open_sheet_with_sheet_id(setup_module):
"""Ensure open_sheet() correctly opens a sheet by ID."""
with patch("gspread.service_account") as mock_service_account:
mock_client = MagicMock()
mock_service_account.return_value = mock_client
mock_client.open_by_key.return_value = "MockSheet"
feeder = setup_module(
"gsheet_feeder",
{"service_account": "dummy.json", "sheet": None, "sheet_id": "ABC123"},
)
sheet = feeder.open_sheet()
mock_client.open_by_key.assert_called_once_with("ABC123")
assert sheet == "MockSheet"
def test_should_process_sheet(setup_module):
with patch("gspread.service_account"):
gdb = setup_module(
"gsheet_feeder",
{
"service_account": "dummy.json",
"sheet": "TestSheet",
"sheet_id": None,
"allow_worksheets": {"TestSheet", "Sheet2"},
"block_worksheets": {"Sheet3"},
},
)
assert gdb.should_process_sheet("TestSheet") == True
assert gdb.should_process_sheet("Sheet3") == False
# False if allow_worksheets is set
assert gdb.should_process_sheet("AnotherSheet") == False
@pytest.mark.skip(reason="Requires a real connection")
class TestGSheetsFeederReal:
"""Testing GSheetsFeeder class"""
module_name: str = "gsheet_feeder"
feeder: GsheetsFeeder
# You must follow the setup process explain in the docs for this to work
config: dict = {
"service_account": "secrets/service_account.json",
"sheet": "test-auto-archiver",
"sheet_id": None,
"header": 1,
"columns": {
"url": "link",
"status": "archive status",
"folder": "destination folder",
"archive": "archive location",
"date": "archive date",
"thumbnail": "thumbnail",
"timestamp": "upload timestamp",
"title": "upload title",
"text": "text content",
"screenshot": "screenshot",
"hash": "hash",
"pdq_hash": "perceptual hashes",
"wacz": "wacz",
"replaywebpage": "replaywebpage",
},
"allow_worksheets": set(),
"block_worksheets": set(),
"use_sheet_names_in_stored_paths": True,
}
@pytest.fixture(autouse=True)
def setup_feeder(self, setup_module):
assert (
self.module_name is not None
), "self.module_name must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"
self.feeder: Type[Feeder] = setup_module(self.module_name, self.config)
def reset_test_sheet(self):
"""Clears test sheet and re-adds headers to ensure consistent test results."""
client = gspread.service_account(self.config["service_account"])
sheet = client.open(self.config["sheet"])
worksheet = sheet.get_worksheet(0)
worksheet.clear()
worksheet.append_row(["Link", "Archive Status"])
def test_setup(self):
assert hasattr(self.feeder, "gsheets_client")
def test_open_sheet_real_connection(self):
"""Ensure open_sheet() connects to a real Google Sheets instance."""
sheet = self.feeder.open_sheet()
assert sheet is not None, "open_sheet() should return a valid sheet instance"
assert hasattr(
sheet, "worksheets"
), "Returned object should have worksheets method"
def test_iter_yields_metadata_real_data(self):
"""Ensure __iter__() yields Metadata objects for real test sheet data."""
self.reset_test_sheet()
client = gspread.service_account(self.config["service_account"])
sheet = client.open(self.config["sheet"])
worksheet = sheet.get_worksheet(0)
# Insert test rows as a temp method
# Next we will refactor the feeder for better testing
test_rows = [
["https://example.com", ""],
["", ""],
["https://example.com", "done"],
]
worksheet.append_rows(test_rows)
metadata_list = list(self.feeder)
# Validate that only the first row is processed
assert len(metadata_list) == 1
assert metadata_list[0].metadata.get("url") == "https://example.com"
# TODO
# Test two sheets
# test two sheets with different columns
# test folder implementation

Wyświetl plik

@ -0,0 +1,144 @@
import pytest
from unittest.mock import MagicMock
from auto_archiver.modules.gsheet_feeder import GWorksheet
class TestGWorksheet:
@pytest.fixture
def mock_worksheet(self):
mock_ws = MagicMock()
mock_ws.get_values.return_value = [
["Link", "Archive Status", "Archive Location", "Archive Date"],
["url1", "archived", "filepath1", "2023-01-01"],
["url2", "pending", "filepath2", "2023-01-02"],
]
return mock_ws
@pytest.fixture
def gworksheet(self, mock_worksheet):
return GWorksheet(mock_worksheet)
# Test initialization and basic properties
def test_initialization_sets_headers(self, gworksheet):
assert gworksheet.headers == ["link", "archive status", "archive location", "archive date"]
def test_count_rows_returns_correct_value(self, gworksheet):
# inc header row
assert gworksheet.count_rows() == 3
# Test column validation and lookup
@pytest.mark.parametrize(
"col,expected_index",
[
("url", 0),
("status", 1),
("archive", 2),
("date", 3),
],
)
def test_col_index_returns_correct_index(self, gworksheet, col, expected_index):
assert gworksheet._col_index(col) == expected_index
def test_check_col_exists_raises_for_invalid_column(self, gworksheet):
with pytest.raises(Exception, match="Column invalid_col"):
gworksheet._check_col_exists("invalid_col")
# Test data retrieval
@pytest.mark.parametrize(
"row,expected",
[
(1, ["Link", "Archive Status", "Archive Location", "Archive Date"]),
(2, ["url1", "archived", "filepath1", "2023-01-01"]),
(3, ["url2", "pending", "filepath2", "2023-01-02"]),
],
)
def test_get_row_returns_correct_data(self, gworksheet, row, expected):
assert gworksheet.get_row(row) == expected
@pytest.mark.parametrize(
"row,col,expected",
[
(2, "url", "url1"),
(2, "status", "archived"),
(3, "date", "2023-01-02"),
],
)
def test_get_cell_returns_correct_value(self, gworksheet, row, col, expected):
assert gworksheet.get_cell(row, col) == expected
def test_get_cell_handles_fresh_data(self, mock_worksheet, gworksheet):
mock_worksheet.cell.return_value.value = "fresh_value"
result = gworksheet.get_cell(2, "url", fresh=True)
assert result == "fresh_value"
mock_worksheet.cell.assert_called_once_with(2, 1)
# Test edge cases and error handling
@pytest.mark.parametrize(
"when_empty,expected",
[
(True, "default"),
(False, ""),
],
)
def test_get_cell_or_default_handles_empty_values(
self, mock_worksheet, when_empty, expected
):
mock_worksheet.get_values.return_value[1][0] = "" # Empty URL cell
g = GWorksheet(mock_worksheet)
assert (
g.get_cell_or_default(
2, "url", default="default", when_empty_use_default=when_empty
)
== expected
)
def test_get_cell_or_default_handles_missing_columns(self, gworksheet):
assert (
gworksheet.get_cell_or_default(1, "invalid_col", default="safe") == "safe"
)
# Test write operations
def test_set_cell_updates_correct_position(self, mock_worksheet, gworksheet):
gworksheet.set_cell(2, "url", "new_url")
mock_worksheet.update_cell.assert_called_once_with(2, 1, "new_url")
def test_batch_set_cell_formats_requests_correctly(
self, mock_worksheet, gworksheet
):
updates = [(2, "url", "new_url"), (3, "status", "processed")]
gworksheet.batch_set_cell(updates)
expected_batch = [
{"range": "A2", "values": [["new_url"]]},
{"range": "B3", "values": [["processed"]]},
]
mock_worksheet.batch_update.assert_called_once_with(
expected_batch, value_input_option="USER_ENTERED"
)
def test_batch_set_cell_truncates_long_values(self, mock_worksheet, gworksheet):
long_value = "x" * 50000
gworksheet.batch_set_cell([(1, "url", long_value)])
submitted_value = mock_worksheet.batch_update.call_args[0][0][0]["values"][0][0]
assert len(submitted_value) == 49999
# Test coordinate conversion
@pytest.mark.parametrize(
"row,col,expected",
[
(1, "url", "A1"),
(2, "status", "B2"),
(3, "archive", "C3"),
(4, "date", "D4"),
],
)
def test_to_a1_conversion(self, gworksheet, row, col, expected):
assert gworksheet.to_a1(row, col) == expected
# Test empty worksheet
def test_empty_worksheet_initialization(self):
mock_ws = MagicMock()
mock_ws.get_values.return_value = []
g = GWorksheet(mock_ws)
assert g.headers == []
assert g.count_rows() == 0

Wyświetl plik

@ -0,0 +1,124 @@
from typing import Type
import pytest
from unittest.mock import MagicMock, patch
from auto_archiver.core import Media
from auto_archiver.modules.s3_storage import S3Storage
class TestS3Storage:
"""
Test suite for S3Storage.
"""
module_name: str = "s3_storage"
storage: Type[S3Storage]
s3: MagicMock
config: dict = {
"path_generator": "flat",
"filename_generator": "static",
"bucket": "test-bucket",
"region": "test-region",
"key": "test-key",
"secret": "test-secret",
"random_no_duplicate": False,
"endpoint_url": "https://{region}.example.com",
"cdn_url": "https://cdn.example.com/{key}",
"private": False,
}
@patch('boto3.client')
@pytest.fixture(autouse=True)
def setup_storage(self, setup_module):
self.storage = setup_module(self.module_name, self.config)
def test_client_initialization(self):
"""Test that S3 client is initialized with correct parameters"""
assert self.storage.s3 is not None
assert self.storage.s3.meta.region_name == 'test-region'
def test_get_cdn_url_generation(self):
"""Test CDN URL formatting """
media = Media("test.txt")
media.key = "path/to/file.txt"
url = self.storage.get_cdn_url(media)
assert url == "https://cdn.example.com/path/to/file.txt"
media.key = "another/path.jpg"
assert self.storage.get_cdn_url(media) == "https://cdn.example.com/another/path.jpg"
def test_uploadf_sets_acl_public(self):
media = Media("test.txt")
mock_file = MagicMock()
with patch.object(self.storage.s3, 'upload_fileobj') as mock_s3_upload, \
patch.object(self.storage, 'is_upload_needed', return_value=True):
self.storage.uploadf(mock_file, media)
mock_s3_upload.assert_called_once_with(
mock_file,
Bucket='test-bucket',
Key=media.key,
ExtraArgs={'ACL': 'public-read', 'ContentType': 'text/plain'}
)
def test_upload_decision_logic(self):
"""Test is_upload_needed under different conditions"""
media = Media("test.txt")
# Test default state (random_no_duplicate=False)
assert self.storage.is_upload_needed(media) is True
# Set duplicate checking config to true:
self.storage.random_no_duplicate = True
with patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash') as mock_calc_hash, \
patch.object(self.storage, 'file_in_folder') as mock_file_in_folder:
mock_calc_hash.return_value = 'beepboop123beepboop123beepboop123'
mock_file_in_folder.return_value = 'existing_key.txt'
# Test duplicate result
assert self.storage.is_upload_needed(media) is False
assert media.key == 'existing_key.txt'
mock_file_in_folder.assert_called_with(
# (first 24 chars of hash)
'no-dups/beepboop123beepboop123be'
)
@patch.object(S3Storage, 'file_in_folder')
def test_skips_upload_when_duplicate_exists(self, mock_file_in_folder):
"""Test that upload skips when file_in_folder finds existing object"""
self.storage.random_no_duplicate = True
mock_file_in_folder.return_value = "existing_folder/existing_file.txt"
# Create test media with calculated hash
media = Media("test.txt")
media.key = "original_path.txt"
with patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash') as mock_calculate_hash:
mock_calculate_hash.return_value = "beepboop123beepboop123beepboop123"
# Verify upload
assert self.storage.is_upload_needed(media) is False
assert media.key == "existing_folder/existing_file.txt"
assert media.get("previously archived") is True
with patch.object(self.storage.s3, 'upload_fileobj') as mock_upload:
result = self.storage.uploadf(None, media)
mock_upload.assert_not_called()
assert result is True
@patch.object(S3Storage, 'is_upload_needed')
def test_uploads_with_correct_parameters(self, mock_upload_needed):
media = Media("test.txt")
media.key = "original_key.txt"
mock_upload_needed.return_value = True
media.mimetype = 'image/png'
mock_file = MagicMock()
with patch.object(self.storage.s3, 'upload_fileobj') as mock_upload:
self.storage.uploadf(mock_file, media)
# verify call occured with these params
mock_upload.assert_called_once_with(
mock_file,
Bucket='test-bucket',
Key='original_key.txt',
ExtraArgs={
'ACL': 'public-read',
'ContentType': 'image/png'
}
)
def test_file_in_folder_exists(self):
with patch.object(self.storage.s3, 'list_objects') as mock_list_objects:
mock_list_objects.return_value = {'Contents': [{'Key': 'path/to/file.txt'}]}
assert self.storage.file_in_folder('path/to/') == 'path/to/file.txt'

Wyświetl plik

@ -0,0 +1,68 @@
from typing import Type
import pytest
from unittest.mock import MagicMock, patch
from auto_archiver.core import Media
from auto_archiver.modules.gdrive_storage import GDriveStorage
from auto_archiver.core.metadata import Metadata
from tests.storages.test_storage_base import TestStorageBase
class TestGDriveStorage:
"""
Test suite for GDriveStorage.
"""
module_name: str = "gdrive_storage"
storage: Type[GDriveStorage]
config: dict = {'path_generator': 'url',
'filename_generator': 'static',
'root_folder_id': "fake_root_folder_id",
'oauth_token': None,
'service_account': 'fake_service_account.json'
}
@pytest.fixture(autouse=True)
def gdrive(self, setup_module):
with patch('google.oauth2.service_account.Credentials.from_service_account_file') as mock_creds:
self.storage = setup_module(self.module_name, self.config)
def test_initialize_fails_with_non_existent_creds(self):
"""
Test that the Google Drive service raises a FileNotFoundError when the service account file does not exist.
"""
# Act and Assert
with pytest.raises(FileNotFoundError) as exc_info:
self.storage.setup()
assert "No such file or directory" in str(exc_info.value)
def test_path_parts(self):
media = Media(filename="test.jpg")
media.key = "folder1/folder2/test.jpg"
@pytest.mark.skip(reason="Requires real credentials")
@pytest.mark.download
class TestGDriveStorageConnected(TestStorageBase):
"""
'Real' tests for GDriveStorage.
"""
module_name: str = "gdrive_storage"
storage: Type[GDriveStorage]
config: dict = {'path_generator': 'url',
'filename_generator': 'static',
# TODO: replace with real root folder id
'root_folder_id': "1TVY_oJt95_dmRSEdP9m5zFy7l50TeCSk",
'oauth_token': None,
'service_account': 'secrets/service_account.json'
}
def test_initialize_with_real_credentials(self):
"""
Test that the Google Drive service can be initialized with real credentials.
"""
assert self.storage.service is not None

Wyświetl plik

@ -0,0 +1,22 @@
from typing import Type
import pytest
from auto_archiver.core.metadata import Metadata
from auto_archiver.core.storage import Storage
class TestStorageBase(object):
module_name: str = None
config: dict = None
@pytest.fixture(autouse=True)
def setup_storage(self, setup_module):
assert (
self.module_name is not None
), "self.module_name must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"
self.storage: Type[Storage] = setup_module(
self.module_name, self.config
)

Wyświetl plik

@ -0,0 +1,165 @@
import pytest
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import Any
from auto_archiver.core.metadata import Metadata
@pytest.fixture
def basic_metadata():
m = Metadata()
m.set_url("https://example.com")
m.set("title", "Test Page")
return m
@dataclass
class MockMedia:
filename: str = ""
mimetype: str = ""
data: dict = None
def get(self, key: str, default: Any = None) -> Any:
return self.data.get(key, default) if self.data else default
def set(self, key: str, value: Any) -> None:
if not self.data:
self.data = {}
self.data[key] = value
@pytest.fixture
def media_file():
def _create(filename="test.txt", mimetype="text/plain", hash_value=None):
m = MockMedia(filename=filename, mimetype=mimetype)
if hash_value:
m.set("hash", hash_value)
return m
return _create
def test_initial_state():
m = Metadata()
assert m.status == "no archiver"
assert m.metadata == {"_processed_at": m.get("_processed_at")}
assert m.media == []
assert isinstance(m.get("_processed_at"), datetime)
def test_url_properties(basic_metadata):
assert basic_metadata.get_url() == "https://example.com"
assert basic_metadata.netloc == "example.com"
def test_simple_merge(basic_metadata):
right = Metadata(status="success")
right.set("title", "Test Title")
basic_metadata.merge(right)
assert basic_metadata.status == "success"
assert basic_metadata.get("title") == "Test Title"
def test_left_merge():
left = (
Metadata()
.set("tags", ["a"])
.set("stats", {"views": 10})
.set("status", "success")
)
right = (
Metadata()
.set("tags", ["b"])
.set("stats", {"likes": 5})
.set("status", "no archiver")
)
left.merge(right, overwrite_left=True)
assert left.get("status") == "no archiver"
assert left.get("tags") == ["a", "b"]
assert left.get("stats") == {"views": 10, "likes": 5}
def test_media_management(basic_metadata, media_file):
media1 = media_file(hash_value="abc")
media2 = media_file(hash_value="abc") # Duplicate
media3 = media_file(hash_value="def")
basic_metadata.add_media(media1, "m1")
basic_metadata.add_media(media2, "m2")
basic_metadata.add_media(media3)
assert len(basic_metadata.media) == 3
basic_metadata.remove_duplicate_media_by_hash()
assert len(basic_metadata.media) == 2
assert basic_metadata.get_media_by_id("m1") == media1
def test_success():
m = Metadata()
assert not m.is_success()
m.success("context")
assert m.is_success()
assert m.status == "context: success"
def test_is_empty():
m = Metadata()
assert m.is_empty()
# meaningless ids
(
m.set("url", "example.com")
.set("total_bytes", 100)
.set("archive_duration_seconds", 10)
.set("_processed_at", datetime.now(timezone.utc))
)
assert m.is_empty()
def test_store():
pass
# Test Media operations
# Test custom getter/setters
def test_get_set_url():
m = Metadata()
m.set_url("http://example.com")
assert m.get_url() == "http://example.com"
with pytest.raises(AssertionError):
m.set_url("")
assert m.get("url") == "http://example.com"
def test_set_content():
m = Metadata()
m.set_content("Some content")
assert m.get("content") == "Some content"
# Test appending
m.set_content("New content")
# Do we want to add a line break to the method?
assert m.get("content") == "Some contentNew content"
def test_choose_most_complex():
pass
def test_get_context():
m = Metadata()
m.set_context("somekey", "somevalue")
assert m.get_context("somekey") == "somevalue"
assert m.get_context("nonexistent") is None
m.set_context("anotherkey", "anothervalue")
# check the previous is retained
assert m.get_context("somekey") == "somevalue"
assert m.get_context("anotherkey") == "anothervalue"
assert len(m._context) == 2
def test_choose_most_complete():
pass