Further cleanup

* Removes (partly) the ArchivingOrchestrator
* Removes the cli_feeder module, and makes it the 'default', allowing you to pass URLs directly on the command line, without having to use the cumbersome --cli_feeder.urls. Just do auto-archiver https://my.url.com
* More unit tests
* Improved error handling
pull/189/head
Patrick Robertson 2025-01-30 16:43:09 +01:00
rodzic 953011f368
commit d6b4b7a932
27 zmienionych plików z 417 dodań i 191 usunięć

Wyświetl plik

@ -3,7 +3,7 @@ from auto_archiver.core.orchestrator import ArchivingOrchestrator
import sys import sys
def main(): def main():
ArchivingOrchestrator().run(sys.argv) ArchivingOrchestrator().run(sys.argv[1:])
if __name__ == "__main__": if __name__ == "__main__":
main() main()

Wyświetl plik

@ -0,0 +1,100 @@
from urllib.parse import urlparse
from typing import Mapping, Any
from abc import ABC
from copy import deepcopy, copy
from tempfile import TemporaryDirectory
from loguru import logger
class BaseModule(ABC):
"""
Base module class. All modules should inherit from this class.
The exact methods a class implements will depend on the type of module it is,
however all modules have a .setup(config: dict) method to run any setup code
(e.g. logging in to a site, spinning up a browser etc.)
See BaseModule.MODULE_TYPES for the types of modules you can create, noting that
a subclass can be of multiple types. For example, a module that extracts data from
a website and stores it in a database would be both an 'extractor' and a 'database' module.
Each module is a python package, and should have a __manifest__.py file in the
same directory as the module file. The __manifest__.py specifies the module information
like name, author, version, dependencies etc. See BaseModule._DEFAULT_MANIFEST for the
default manifest structure.
"""
MODULE_TYPES = [
'feeder',
'extractor',
'enricher',
'database',
'storage',
'formatter'
]
_DEFAULT_MANIFEST = {
'name': '', # the display name of the module
'author': 'Bellingcat', # creator of the module, leave this as Bellingcat or set your own name!
'type': [], # the type of the module, can be one or more of BaseModule.MODULE_TYPES
'requires_setup': True, # whether or not this module requires additional setup such as setting API Keys or installing additional softare
'description': '', # a description of the module
'dependencies': {}, # external dependencies, e.g. python packages or binaries, in dictionary format
'entry_point': '', # the entry point for the module, in the format 'module_name::ClassName'. This can be left blank to use the default entry point of module_name::ModuleName
'version': '1.0', # the version of the module
'configs': {} # any configuration options this module has, these will be exposed to the user in the config file or via the command line
}
config: Mapping[str, Any]
authentication: Mapping[str, Mapping[str, str]]
name: str
# this is set by the orchestrator prior to archiving
tmp_dir: TemporaryDirectory = None
def setup(self, config: dict):
authentication = config.get('authentication', {})
# extract out contatenated sites
for key, val in copy(authentication).items():
if "," in key:
for site in key.split(","):
authentication[site] = val
del authentication[key]
# this is important. Each instance is given its own deepcopied config, so modules cannot
# change values to affect other modules
config = deepcopy(config)
authentication = deepcopy(config.pop('authentication', {}))
self.authentication = authentication
self.config = config
for key, val in config.get(self.name, {}).items():
setattr(self, key, val)
def repr(self):
return f"Module<'{self.display_name}' (config: {self.config[self.name]})>"
def auth_for_site(self, site: str) -> dict:
# TODO: think about if/how we can deal with sites that have multiple domains (main one is x.com/twitter.com)
# for now, just hard code those.
# SECURITY: parse the domain using urllib
site = urlparse(site).netloc
# add the 'www' version of the site to the list of sites to check
for to_try in [site, f"www.{site}"]:
if to_try in self.authentication:
return self.authentication[to_try]
# do a fuzzy string match just to print a warning - don't use it since it's insecure
for key in self.authentication.keys():
if key in site or site in key:
logger.warning(f"Could not find exact authentication information for site '{site}'. \
did find information for '{key}' which is close, is this what you meant? \
If so, edit your authentication settings to make sure it exactly matches.")
return {}

Wyświetl plik

@ -15,8 +15,14 @@ from .module import BaseModule
from typing import Any, List, Type, Tuple from typing import Any, List, Type, Tuple
yaml = YAML() yaml: YAML = YAML()
b = yaml.load("""
# This is a comment
site.com,site2.com:
key: value
key2: value2
""")
EMPTY_CONFIG = yaml.load(""" EMPTY_CONFIG = yaml.load("""
# Auto Archiver Configuration # Auto Archiver Configuration
# Steps are the modules that will be run in the order they are defined # Steps are the modules that will be run in the order they are defined
@ -25,6 +31,24 @@ steps:""" + "".join([f"\n {module}s: []" for module in BaseModule.MODULE_TYPES
""" """
# Global configuration # Global configuration
# Authentication
# a dictionary of authentication information that can be used by extractors to login to website.
# you can use a comma separated list for multiple domains on the same line (common usecase: x.com,twitter.com)
# Common login 'types' are username/password, cookie, api key/token.
# Some Examples:
# facebook.com:
# username: "my_username"
# password: "my_password"
# or for a site that uses an API key:
# twitter.com,x.com:
# api_key
# api_secret
# youtube.com:
# cookie: "login_cookie=value ; other_cookie=123" # multiple 'key=value' pairs should be separated by ;
authentication: {}
# These are the global configurations that are used by the modules # These are the global configurations that are used by the modules
logging: logging:
@ -136,12 +160,9 @@ def read_yaml(yaml_filename: str) -> CommentedMap:
# TODO: make this tidier/find a way to notify of which keys should not be stored # TODO: make this tidier/find a way to notify of which keys should not be stored
def store_yaml(config: CommentedMap, yaml_filename: str, do_not_store_keys: List[Tuple[str, str]] = []) -> None: def store_yaml(config: CommentedMap, yaml_filename: str) -> None:
config_to_save = deepcopy(config) config_to_save = deepcopy(config)
for key1, key2 in do_not_store_keys: config.pop('urls', None)
if key1 in config_to_save and key2 in config_to_save[key1]:
del config_to_save[key1][key2]
with open(yaml_filename, "w", encoding="utf-8") as outf: with open(yaml_filename, "w", encoding="utf-8") as outf:
yaml.dump(config_to_save, outf) yaml.dump(config_to_save, outf)

Wyświetl plik

@ -54,11 +54,3 @@ class ArchivingContext:
ac.configs = {k: v for k, v in ac.configs.items() if k in ac.keep_on_reset} ac.configs = {k: v for k, v in ac.configs.items() if k in ac.keep_on_reset}
# ---- custom getters/setters for widely used context values # ---- custom getters/setters for widely used context values
@staticmethod
def set_tmp_dir(tmp_dir: str):
ArchivingContext.get_instance().configs["tmp_dir"] = tmp_dir
@staticmethod
def get_tmp_dir() -> str:
return ArchivingContext.get_instance().configs.get("tmp_dir")

Wyświetl plik

@ -12,7 +12,6 @@ from dataclasses import dataclass
import mimetypes import mimetypes
import os import os
import mimetypes import mimetypes
import requests import requests
from loguru import logger from loguru import logger
from retrying import retry from retrying import retry
@ -71,7 +70,7 @@ class Extractor(BaseModule):
to_filename = url.split('/')[-1].split('?')[0] to_filename = url.split('/')[-1].split('?')[0]
if len(to_filename) > 64: if len(to_filename) > 64:
to_filename = to_filename[-64:] to_filename = to_filename[-64:]
to_filename = os.path.join(ArchivingContext.get_tmp_dir(), to_filename) to_filename = os.path.join(self.tmp_dir, to_filename)
if verbose: logger.debug(f"downloading {url[0:50]=} {to_filename=}") if verbose: logger.debug(f"downloading {url[0:50]=} {to_filename=}")
headers = { headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36' 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'

Wyświetl plik

@ -7,7 +7,6 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import List from typing import List
from abc import ABC
import shutil import shutil
import ast import ast
import copy import copy
@ -17,63 +16,12 @@ import os
from os.path import join, dirname from os.path import join, dirname
from loguru import logger from loguru import logger
import auto_archiver import auto_archiver
from .base_module import BaseModule
_LAZY_LOADED_MODULES = {} _LAZY_LOADED_MODULES = {}
MANIFEST_FILE = "__manifest__.py" MANIFEST_FILE = "__manifest__.py"
class BaseModule(ABC):
"""
Base module class. All modules should inherit from this class.
The exact methods a class implements will depend on the type of module it is,
however all modules have a .setup(config: dict) method to run any setup code
(e.g. logging in to a site, spinning up a browser etc.)
See BaseModule.MODULE_TYPES for the types of modules you can create, noting that
a subclass can be of multiple types. For example, a module that extracts data from
a website and stores it in a database would be both an 'extractor' and a 'database' module.
Each module is a python package, and should have a __manifest__.py file in the
same directory as the module file. The __manifest__.py specifies the module information
like name, author, version, dependencies etc. See BaseModule._DEFAULT_MANIFEST for the
default manifest structure.
"""
MODULE_TYPES = [
'feeder',
'extractor',
'enricher',
'database',
'storage',
'formatter'
]
_DEFAULT_MANIFEST = {
'name': '', # the display name of the module
'author': 'Bellingcat', # creator of the module, leave this as Bellingcat or set your own name!
'type': [], # the type of the module, can be one or more of BaseModule.MODULE_TYPES
'requires_setup': True, # whether or not this module requires additional setup such as setting API Keys or installing additional softare
'description': '', # a description of the module
'dependencies': {}, # external dependencies, e.g. python packages or binaries, in dictionary format
'entry_point': '', # the entry point for the module, in the format 'module_name::ClassName'. This can be left blank to use the default entry point of module_name::ModuleName
'version': '1.0', # the version of the module
'configs': {} # any configuration options this module has, these will be exposed to the user in the config file or via the command line
}
config: dict
name: str
def setup(self, config: dict):
self.config = config
for key, val in config.get(self.name, {}).items():
setattr(self, key, val)
def repr(self):
return f"Module<'{self.display_name}' (config: {self.config[self.name]})>"
def setup_paths(paths: list[str]) -> None: def setup_paths(paths: list[str]) -> None:
""" """

Wyświetl plik

@ -5,12 +5,15 @@
""" """
from __future__ import annotations from __future__ import annotations
from typing import Generator, Union, List from typing import Generator, Union, List, Type
from urllib.parse import urlparse from urllib.parse import urlparse
from ipaddress import ip_address from ipaddress import ip_address
import argparse import argparse
import os import os
import sys import sys
import json
from tempfile import TemporaryDirectory
import traceback
from rich_argparse import RichHelpFormatter from rich_argparse import RichHelpFormatter
@ -18,17 +21,46 @@ from .context import ArchivingContext
from .metadata import Metadata from .metadata import Metadata
from ..version import __version__ from ..version import __version__
from .config import read_yaml, store_yaml, to_dot_notation, merge_dicts, EMPTY_CONFIG, DefaultValidatingParser from .config import yaml, read_yaml, store_yaml, to_dot_notation, merge_dicts, EMPTY_CONFIG, DefaultValidatingParser
from .module import available_modules, LazyBaseModule, get_module, setup_paths from .module import available_modules, LazyBaseModule, get_module, setup_paths
from . import validators from . import validators, Feeder, Extractor, Database, Storage, Formatter, Enricher
from .module import BaseModule from .module import BaseModule
import tempfile, traceback
from loguru import logger from loguru import logger
DEFAULT_CONFIG_FILE = "orchestration.yaml" DEFAULT_CONFIG_FILE = "orchestration.yaml"
class JsonParseAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
try:
setattr(namespace, self.dest, json.loads(values))
except json.JSONDecodeError as e:
raise argparse.ArgumentTypeError(f"Invalid JSON input for argument '{self.dest}': {e}")
class AuthenticationJsonParseAction(JsonParseAction):
def __call__(self, parser, namespace, values, option_string=None):
super().__call__(parser, namespace, values, option_string)
auth_dict = getattr(namespace, self.dest)
if isinstance(auth_dict, str):
# if it's a string
try:
with open(auth_dict, 'r') as f:
try:
auth_dict = json.load(f)
except json.JSONDecodeError:
# maybe it's yaml, try that
auth_dict = yaml.load(f)
except:
pass
if not isinstance(auth_dict, dict):
raise argparse.ArgumentTypeError("Authentication must be a dictionary of site names and their authentication methods")
for site, auth in auth_dict.items():
if not isinstance(site, str) or not isinstance(auth, dict):
raise argparse.ArgumentTypeError("Authentication must be a dictionary of site names and their authentication methods")
setattr(namespace, self.dest, auth_dict)
class UniqueAppendAction(argparse.Action): class UniqueAppendAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None): def __call__(self, parser, namespace, values, option_string=None):
if not hasattr(namespace, self.dest): if not hasattr(namespace, self.dest):
@ -39,8 +71,6 @@ class UniqueAppendAction(argparse.Action):
class ArchivingOrchestrator: class ArchivingOrchestrator:
_do_not_store_keys = []
def setup_basic_parser(self): def setup_basic_parser(self):
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog="auto-archiver", prog="auto-archiver",
@ -52,7 +82,7 @@ class ArchivingOrchestrator:
epilog="Check the code at https://github.com/bellingcat/auto-archiver", epilog="Check the code at https://github.com/bellingcat/auto-archiver",
formatter_class=RichHelpFormatter, formatter_class=RichHelpFormatter,
) )
parser.add_argument('--help', '-h', action='store_true', dest='help', help='show this help message and exit') parser.add_argument('--help', '-h', action='store_true', dest='help', help='show a full help message and exit')
parser.add_argument('--version', action='version', version=__version__) parser.add_argument('--version', action='version', version=__version__)
parser.add_argument('--config', action='store', dest="config_file", help='the filename of the YAML configuration file (defaults to \'config.yaml\')', default=DEFAULT_CONFIG_FILE) parser.add_argument('--config', action='store', dest="config_file", help='the filename of the YAML configuration file (defaults to \'config.yaml\')', default=DEFAULT_CONFIG_FILE)
parser.add_argument('--mode', action='store', dest='mode', type=str, choices=['simple', 'full'], help='the mode to run the archiver in', default='simple') parser.add_argument('--mode', action='store', dest='mode', type=str, choices=['simple', 'full'], help='the mode to run the archiver in', default='simple')
@ -80,7 +110,6 @@ class ArchivingOrchestrator:
# only load the modules enabled in config # only load the modules enabled in config
# TODO: if some steps are empty (e.g. 'feeders' is empty), should we default to the 'simple' ones? Or only if they are ALL empty? # TODO: if some steps are empty (e.g. 'feeders' is empty), should we default to the 'simple' ones? Or only if they are ALL empty?
enabled_modules = [] enabled_modules = []
# first loads the modules from the config file, then from the command line # first loads the modules from the config file, then from the command line
for config in [yaml_config['steps'], basic_config.__dict__]: for config in [yaml_config['steps'], basic_config.__dict__]:
for module_type in BaseModule.MODULE_TYPES: for module_type in BaseModule.MODULE_TYPES:
@ -120,7 +149,7 @@ class ArchivingOrchestrator:
if (self.config != yaml_config and basic_config.store) or not os.path.isfile(basic_config.config_file): if (self.config != yaml_config and basic_config.store) or not os.path.isfile(basic_config.config_file):
logger.info(f"Storing configuration file to {basic_config.config_file}") logger.info(f"Storing configuration file to {basic_config.config_file}")
store_yaml(self.config, basic_config.config_file, self._do_not_store_keys) store_yaml(self.config, basic_config.config_file)
return self.config return self.config
@ -128,18 +157,29 @@ class ArchivingOrchestrator:
if not parser: if not parser:
parser = self.parser parser = self.parser
parser.add_argument('--feeders', dest='steps.feeders', nargs='+', help='the feeders to use', action=UniqueAppendAction)
# allow passing URLs directly on the command line
parser.add_argument('urls', nargs='*', default=[], help='URL(s) to archive, either a single URL or a list of urls, should not come from config.yaml')
parser.add_argument('--feeders', dest='steps.feeders', nargs='+', default=['cli_feeder'], help='the feeders to use', action=UniqueAppendAction)
parser.add_argument('--enrichers', dest='steps.enrichers', nargs='+', help='the enrichers to use', action=UniqueAppendAction) parser.add_argument('--enrichers', dest='steps.enrichers', nargs='+', help='the enrichers to use', action=UniqueAppendAction)
parser.add_argument('--extractors', dest='steps.extractors', nargs='+', help='the extractors to use', action=UniqueAppendAction) parser.add_argument('--extractors', dest='steps.extractors', nargs='+', help='the extractors to use', action=UniqueAppendAction)
parser.add_argument('--databases', dest='steps.databases', nargs='+', help='the databases to use', action=UniqueAppendAction) parser.add_argument('--databases', dest='steps.databases', nargs='+', help='the databases to use', action=UniqueAppendAction)
parser.add_argument('--storages', dest='steps.storages', nargs='+', help='the storages to use', action=UniqueAppendAction) parser.add_argument('--storages', dest='steps.storages', nargs='+', help='the storages to use', action=UniqueAppendAction)
parser.add_argument('--formatters', dest='steps.formatters', nargs='+', help='the formatter to use', action=UniqueAppendAction) parser.add_argument('--formatters', dest='steps.formatters', nargs='+', help='the formatter to use', action=UniqueAppendAction)
parser.add_argument('--authentication', dest='authentication', help='A dictionary of sites and their authentication methods \
(token, username etc.) that extractors can use to log into \
a website. If passing this on the command line, use a JSON string. \
You may also pass a path to a valid JSON/YAML file which will be parsed.',\
default={},
action=AuthenticationJsonParseAction)
# logging arguments # logging arguments
parser.add_argument('--logging.level', action='store', dest='logging.level', choices=['INFO', 'DEBUG', 'ERROR', 'WARNING'], help='the logging level to use', default='INFO') parser.add_argument('--logging.level', action='store', dest='logging.level', choices=['INFO', 'DEBUG', 'ERROR', 'WARNING'], help='the logging level to use', default='INFO')
parser.add_argument('--logging.file', action='store', dest='logging.file', help='the logging file to write to', default=None) parser.add_argument('--logging.file', action='store', dest='logging.file', help='the logging file to write to', default=None)
parser.add_argument('--logging.rotation', action='store', dest='logging.rotation', help='the logging rotation to use', default=None) parser.add_argument('--logging.rotation', action='store', dest='logging.rotation', help='the logging rotation to use', default=None)
def add_module_args(self, modules: list[LazyBaseModule] = None, parser: argparse.ArgumentParser = None) -> None: def add_module_args(self, modules: list[LazyBaseModule] = None, parser: argparse.ArgumentParser = None) -> None:
if not modules: if not modules:
@ -147,6 +187,7 @@ class ArchivingOrchestrator:
module: LazyBaseModule module: LazyBaseModule
for module in modules: for module in modules:
if not module.configs: if not module.configs:
# this module has no configs, don't show anything in the help # this module has no configs, don't show anything in the help
# (TODO: do we want to show something about this module though, like a description?) # (TODO: do we want to show something about this module though, like a description?)
@ -155,12 +196,6 @@ class ArchivingOrchestrator:
group = parser.add_argument_group(module.display_name or module.name, f"{module.description[:100]}...") group = parser.add_argument_group(module.display_name or module.name, f"{module.description[:100]}...")
for name, kwargs in module.configs.items(): for name, kwargs in module.configs.items():
# TODO: go through all the manifests and make sure we're not breaking anything with removing cli_set
# in most cases it'll mean replacing it with 'type': 'str' or 'type': 'int' or something
do_not_store = kwargs.pop('do_not_store', False)
if do_not_store:
self._do_not_store_keys.append((module.name, name))
if not kwargs.get('metavar', None): if not kwargs.get('metavar', None):
# make a nicer metavar, metavar is what's used in the help, e.g. --cli_feeder.urls [METAVAR] # make a nicer metavar, metavar is what's used in the help, e.g. --cli_feeder.urls [METAVAR]
kwargs['metavar'] = name.upper() kwargs['metavar'] = name.upper()
@ -208,8 +243,7 @@ class ArchivingOrchestrator:
step_items = [] step_items = []
modules_to_load = self.config['steps'][f"{module_type}s"] modules_to_load = self.config['steps'][f"{module_type}s"]
assert modules_to_load, f"No {module_type}s were configured. Make sure to set at least one {module_type} \ assert modules_to_load, f"No {module_type}s were configured. Make sure to set at least one {module_type} in your configuration file or on the command line (using --{module_type}s)"
in your configuration file or on the command line (using --{module_type}s)"
def check_steps_ok(): def check_steps_ok():
if not len(step_items): if not len(step_items):
@ -223,12 +257,37 @@ class ArchivingOrchestrator:
exit() exit()
for module in modules_to_load: for module in modules_to_load:
if module == 'cli_feeder':
urls = self.config['urls']
if not urls:
logger.error("No URLs provided. Please provide at least one URL to archive, or set up a feeder.")
self.basic_parser.print_help()
exit()
# cli_feeder is a pseudo module, it just takes the command line args
def feed(self) -> Generator[Metadata]:
for url in urls:
logger.debug(f"Processing URL: '{url}'")
yield Metadata().set_url(url)
ArchivingContext.set("folder", "cli")
pseudo_module = type('CLIFeeder', (Feeder,), {
'name': 'cli_feeder',
'display_name': 'CLI Feeder',
'__iter__': feed
})()
pseudo_module.__iter__ = feed
step_items.append(pseudo_module)
continue
if module in invalid_modules: if module in invalid_modules:
continue continue
try: try:
loaded_module: BaseModule = get_module(module, self.config) loaded_module: BaseModule = get_module(module, self.config)
except (KeyboardInterrupt, Exception) as e: except (KeyboardInterrupt, Exception) as e:
logger.error(f"Error during setup of archivers: {e}\n{traceback.format_exc()}") logger.error(f"Error during setup of modules: {e}\n{traceback.format_exc()}")
if module_type == 'extractor' and loaded_module.name == module: if module_type == 'extractor' and loaded_module.name == module:
loaded_module.cleanup() loaded_module.cleanup()
exit() exit()
@ -285,13 +344,18 @@ class ArchivingOrchestrator:
def cleanup(self)->None: def cleanup(self)->None:
logger.info("Cleaning up") logger.info("Cleaning up")
for e in self.config['steps']['extractors']: for e in self.extractors:
e.cleanup() e.cleanup()
def feed(self) -> Generator[Metadata]: def feed(self) -> Generator[Metadata]:
for feeder in self.config['steps']['feeders']:
url_count = 0
for feeder in self.feeders:
for item in feeder: for item in feeder:
yield self.feed_item(item) yield self.feed_item(item)
url_count += 1
logger.success(f"Processed {url_count} URL(s)")
self.cleanup() self.cleanup()
def feed_item(self, item: Metadata) -> Metadata: def feed_item(self, item: Metadata) -> Metadata:
@ -300,22 +364,33 @@ class ArchivingOrchestrator:
- catches keyboard interruptions to do a clean exit - catches keyboard interruptions to do a clean exit
- catches any unexpected error, logs it, and does a clean exit - catches any unexpected error, logs it, and does a clean exit
""" """
tmp_dir: TemporaryDirectory = None
try: try:
ArchivingContext.reset() tmp_dir = TemporaryDirectory(dir="./")
with tempfile.TemporaryDirectory(dir="./") as tmp_dir: # set tmp_dir on all modules
ArchivingContext.set_tmp_dir(tmp_dir) for m in self.all_modules:
return self.archive(item) m.tmp_dir = tmp_dir.name
return self.archive(item)
except KeyboardInterrupt: except KeyboardInterrupt:
# catches keyboard interruptions to do a clean exit # catches keyboard interruptions to do a clean exit
logger.warning(f"caught interrupt on {item=}") logger.warning(f"caught interrupt on {item=}")
for d in self.config['steps']['databases']: d.aborted(item) for d in self.databases:
d.aborted(item)
self.cleanup() self.cleanup()
exit() exit()
except Exception as e: except Exception as e:
logger.error(f'Got unexpected error on item {item}: {e}\n{traceback.format_exc()}') logger.error(f'Got unexpected error on item {item}: {e}\n{traceback.format_exc()}')
for d in self.config['steps']['databases']: for d in self.databases:
if type(e) == AssertionError: d.failed(item, str(e)) if type(e) == AssertionError:
else: d.failed(item, reason="unexpected error") d.failed(item, str(e))
else:
d.failed(item, reason="unexpected error")
finally:
if tmp_dir:
# remove the tmp_dir from all modules
for m in self.all_modules:
m.tmp_dir = None
tmp_dir.cleanup()
def archive(self, result: Metadata) -> Union[Metadata, None]: def archive(self, result: Metadata) -> Union[Metadata, None]:
@ -328,31 +403,38 @@ class ArchivingOrchestrator:
5. Store all downloaded/generated media 5. Store all downloaded/generated media
6. Call selected Formatter and store formatted if needed 6. Call selected Formatter and store formatted if needed
""" """
original_url = result.get_url().strip() original_url = result.get_url().strip()
self.assert_valid_url(original_url) try:
self.assert_valid_url(original_url)
except AssertionError as e:
logger.error(f"Error archiving URL {original_url}: {e}")
raise e
# 1 - sanitize - each archiver is responsible for cleaning/expanding its own URLs # 1 - sanitize - each archiver is responsible for cleaning/expanding its own URLs
url = original_url url = original_url
for a in self.config["steps"]["extractors"]: url = a.sanitize_url(url) for a in self.extractors:
url = a.sanitize_url(url)
result.set_url(url) result.set_url(url)
if original_url != url: result.set("original_url", original_url) if original_url != url: result.set("original_url", original_url)
# 2 - notify start to DBs, propagate already archived if feature enabled in DBs # 2 - notify start to DBs, propagate already archived if feature enabled in DBs
cached_result = None cached_result = None
for d in self.config["steps"]["databases"]: for d in self.databases:
d.started(result) d.started(result)
if (local_result := d.fetch(result)): if (local_result := d.fetch(result)):
cached_result = (cached_result or Metadata()).merge(local_result) cached_result = (cached_result or Metadata()).merge(local_result)
if cached_result: if cached_result:
logger.debug("Found previously archived entry") logger.debug("Found previously archived entry")
for d in self.config["steps"]["databases"]: for d in self.databases:
try: d.done(cached_result, cached=True) try: d.done(cached_result, cached=True)
except Exception as e: except Exception as e:
logger.error(f"ERROR database {d.name}: {e}: {traceback.format_exc()}") logger.error(f"ERROR database {d.name}: {e}: {traceback.format_exc()}")
return cached_result return cached_result
# 3 - call extractors until one succeeds # 3 - call extractors until one succeeds
for a in self.config["steps"]["extractors"]: for a in self.extractors:
logger.info(f"Trying extractor {a.name} for {url}") logger.info(f"Trying extractor {a.name} for {url}")
try: try:
result.merge(a.download(result)) result.merge(a.download(result))
@ -361,7 +443,7 @@ class ArchivingOrchestrator:
logger.error(f"ERROR archiver {a.name}: {e}: {traceback.format_exc()}") logger.error(f"ERROR archiver {a.name}: {e}: {traceback.format_exc()}")
# 4 - call enrichers to work with archived content # 4 - call enrichers to work with archived content
for e in self.config["steps"]["enrichers"]: for e in self.enrichers:
try: e.enrich(result) try: e.enrich(result)
except Exception as exc: except Exception as exc:
logger.error(f"ERROR enricher {e.name}: {exc}: {traceback.format_exc()}") logger.error(f"ERROR enricher {e.name}: {exc}: {traceback.format_exc()}")
@ -370,7 +452,7 @@ class ArchivingOrchestrator:
result.store() result.store()
# 6 - format and store formatted if needed # 6 - format and store formatted if needed
if final_media := self.config["steps"]["formatters"][0].format(result): if final_media := self.formatters[0].format(result):
final_media.store(url=url, metadata=result) final_media.store(url=url, metadata=result)
result.set_final_media(final_media) result.set_final_media(final_media)
@ -378,7 +460,7 @@ class ArchivingOrchestrator:
result.status = "nothing archived" result.status = "nothing archived"
# signal completion to databases and archivers # signal completion to databases and archivers
for d in self.config["steps"]["databases"]: for d in self.databases:
try: d.done(result) try: d.done(result)
except Exception as e: except Exception as e:
logger.error(f"ERROR database {d.name}: {e}: {traceback.format_exc()}") logger.error(f"ERROR database {d.name}: {e}: {traceback.format_exc()}")
@ -404,3 +486,43 @@ class ArchivingOrchestrator:
assert not ip.is_reserved, f"Invalid IP used" assert not ip.is_reserved, f"Invalid IP used"
assert not ip.is_link_local, f"Invalid IP used" assert not ip.is_link_local, f"Invalid IP used"
assert not ip.is_private, f"Invalid IP used" assert not ip.is_private, f"Invalid IP used"
# Helper Properties
@property
def feeders(self) -> List[Type[Feeder]]:
return self._get_property('feeders')
@property
def extractors(self) -> List[Type[Extractor]]:
return self._get_property('extractors')
@property
def enrichers(self) -> List[Type[Enricher]]:
return self._get_property('enrichers')
@property
def databases(self) -> List[Type[Database]]:
return self._get_property('databases')
@property
def storages(self) -> List[Type[Storage]]:
return self._get_property('storages')
@property
def formatters(self) -> List[Type[Formatter]]:
return self._get_property('formatters')
@property
def all_modules(self) -> List[Type[BaseModule]]:
return self.feeders + self.extractors + self.enrichers + self.databases + self.storages + self.formatters
def _get_property(self, prop):
try:
f = self.config['steps'][prop]
if not (isinstance(f[0], BaseModule) or isinstance(f[0], LazyBaseModule)):
raise TypeError
return f
except:
exit("Property called prior to full initialisation")

Wyświetl plik

@ -0,0 +1,40 @@
from loguru import logger
import time, os
from selenium.common.exceptions import TimeoutException
from auto_archiver.core import Enricher
from ..utils import Webdriver, UrlUtil, random_str
from ..core import Media, Metadata
class ScreenshotEnricher(Enricher):
name = "screenshot_enricher"
@staticmethod
def configs() -> dict:
return {
"width": {"default": 1280, "help": "width of the screenshots"},
"height": {"default": 720, "help": "height of the screenshots"},
"timeout": {"default": 60, "help": "timeout for taking the screenshot"},
"sleep_before_screenshot": {"default": 4, "help": "seconds to wait for the pages to load before taking screenshot"},
"http_proxy": {"default": "", "help": "http proxy to use for the webdriver, eg http://proxy-user:password@proxy-ip:port"},
}
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
if UrlUtil.is_auth_wall(url):
logger.debug(f"[SKIP] SCREENSHOT since url is behind AUTH WALL: {url=}")
return
logger.debug(f"Enriching screenshot for {url=}")
with Webdriver(self.width, self.height, self.timeout, 'facebook.com' in url, http_proxy=self.http_proxy) as driver:
try:
driver.get(url)
time.sleep(int(self.sleep_before_screenshot))
screenshot_file = os.path.join(self.tmp_dir, f"screenshot_{random_str(8)}.png")
driver.save_screenshot(screenshot_file)
to_enrich.add_media(Media(filename=screenshot_file), id="screenshot")
except TimeoutException:
logger.info("TimeoutException loading page for screenshot")
except Exception as e:
logger.error(f"Got error while loading webdriver for screenshot enricher: {e}")

Wyświetl plik

@ -0,0 +1,38 @@
from loguru import logger
import csv
from . import Feeder
from ..core import Metadata, ArchivingContext
from ..utils import url_or_none
class CSVFeeder(Feeder):
@staticmethod
def configs() -> dict:
return {
"files": {
"default": None,
"help": "Path to the input file(s) to read the URLs from, comma separated. \
Input files should be formatted with one URL per line",
"cli_set": lambda cli_val, cur_val: list(set(cli_val.split(",")))
},
"column": {
"default": None,
"help": "Column number or name to read the URLs from, 0-indexed",
}
}
def __iter__(self) -> Metadata:
url_column = self.column or 0
for file in self.files:
with open(file, "r") as f:
reader = csv.reader(f)
first_row = next(reader)
if not(url_or_none(first_row[url_column])):
# it's a header row, skip it
for row in reader:
url = row[0]
logger.debug(f"Processing {url}")
yield Metadata().set_url(url)
ArchivingContext.set("folder", "cli")

Wyświetl plik

@ -40,5 +40,3 @@ class AtlosFeeder(Feeder):
if len(data["results"]) == 0 or cursor is None: if len(data["results"]) == 0 or cursor is None:
break break
logger.success(f"Processed {count} URL(s)")

Wyświetl plik

@ -1 +0,0 @@
from .cli_feeder import CLIFeeder

Wyświetl plik

@ -1,27 +0,0 @@
{
"name": "CLI Feeder",
"type": ["feeder"],
"requires_setup": False,
"dependencies": {
"python": ["loguru"],
},
'entry_point': 'cli_feeder::CLIFeeder',
"configs": {
"urls": {
"help": "URL(s) to archive, either a single URL or a list of urls, should not come from config.yaml",
"nargs": "+",
"required": True,
"do_not_store": True,
"metavar": "INPUT URLS",
},
},
"description": """
Processes URLs to archive passed via the command line and feeds them into the archiving pipeline.
### Features
- Takes a single URL or a list of URLs provided via the command line.
- Converts each URL into a `Metadata` object and yields it for processing.
- Ensures URLs are processed only if they are explicitly provided.
"""
}

Wyświetl plik

@ -1,15 +0,0 @@
from loguru import logger
from auto_archiver.core import Feeder
from auto_archiver.core import Metadata, ArchivingContext
class CLIFeeder(Feeder):
def __iter__(self) -> Metadata:
for url in self.urls:
logger.debug(f"Processing URL: '{url}'")
yield Metadata().set_url(url)
ArchivingContext.set("folder", "cli")
logger.success(f"Processed {len(self.urls)} URL(s)")

Wyświetl plik

@ -26,7 +26,6 @@
- Supports reading URLs from multiple input files, specified as a comma-separated list. - Supports reading URLs from multiple input files, specified as a comma-separated list.
- Allows specifying the column number or name to extract URLs from. - Allows specifying the column number or name to extract URLs from.
- Skips header rows if the first value is not a valid URL. - Skips header rows if the first value is not a valid URL.
- Integrates with the `ArchivingContext` to manage URL feeding.
### Setu N ### Setu N
- Input files should be formatted with one URL per line. - Input files should be formatted with one URL per line.

Wyświetl plik

@ -21,5 +21,3 @@ class CSVFeeder(Feeder):
logger.debug(f"Processing {url}") logger.debug(f"Processing {url}")
yield Metadata().set_url(url) yield Metadata().set_url(url)
ArchivingContext.set("folder", "cli") ArchivingContext.set("folder", "cli")
logger.success(f"Processed {len(self.urls)} URL(s)")

Wyświetl plik

@ -270,7 +270,11 @@ class GenericExtractor(Extractor):
logger.debug('Using Facebook cookie') logger.debug('Using Facebook cookie')
yt_dlp.utils.std_headers['cookie'] = self.facebook_cookie yt_dlp.utils.std_headers['cookie'] = self.facebook_cookie
ydl_options = {'outtmpl': os.path.join(ArchivingContext.get_tmp_dir(), f'%(id)s.%(ext)s'), 'quiet': False, 'noplaylist': not self.allow_playlist , 'writesubtitles': self.subtitles, 'writeautomaticsub': self.subtitles, "live_from_start": self.live_from_start, "proxy": self.proxy, "max_downloads": self.max_downloads, "playlistend": self.max_downloads} ydl_options = {'outtmpl': os.path.join(self.tmp_dir, f'%(id)s.%(ext)s'),
'quiet': False, 'noplaylist': not self.allow_playlist ,
'writesubtitles': self.subtitles,'writeautomaticsub': self.subtitles,
"live_from_start": self.live_from_start, "proxy": self.proxy,
"max_downloads": self.max_downloads, "playlistend": self.max_downloads}
if item.netloc in ['youtube.com', 'www.youtube.com']: if item.netloc in ['youtube.com', 'www.youtube.com']:
if self.cookies_from_browser: if self.cookies_from_browser:

Wyświetl plik

@ -7,7 +7,7 @@ import json
import base64 import base64
from auto_archiver.version import __version__ from auto_archiver.version import __version__
from auto_archiver.core import Metadata, Media, ArchivingContext from auto_archiver.core import Metadata, Media
from auto_archiver.core import Formatter from auto_archiver.core import Formatter
from auto_archiver.modules.hash_enricher import HashEnricher from auto_archiver.modules.hash_enricher import HashEnricher
from auto_archiver.utils.misc import random_str from auto_archiver.utils.misc import random_str
@ -46,7 +46,7 @@ class HtmlFormatter(Formatter):
version=__version__ version=__version__
) )
html_path = os.path.join(ArchivingContext.get_tmp_dir(), f"formatted{random_str(24)}.html") html_path = os.path.join(self.tmp_dir, f"formatted{random_str(24)}.html")
with open(html_path, mode="w", encoding="utf-8") as outf: with open(html_path, mode="w", encoding="utf-8") as outf:
outf.write(content) outf.write(content)
final_media = Media(filename=html_path, _mimetype="text/html") final_media = Media(filename=html_path, _mimetype="text/html")

Wyświetl plik

@ -7,7 +7,7 @@ from selenium.common.exceptions import TimeoutException
from auto_archiver.core import Enricher from auto_archiver.core import Enricher
from auto_archiver.utils import Webdriver, UrlUtil, random_str from auto_archiver.utils import Webdriver, UrlUtil, random_str
from auto_archiver.core import Media, Metadata, ArchivingContext from auto_archiver.core import Media, Metadata
class ScreenshotEnricher(Enricher): class ScreenshotEnricher(Enricher):
@ -23,11 +23,11 @@ class ScreenshotEnricher(Enricher):
try: try:
driver.get(url) driver.get(url)
time.sleep(int(self.sleep_before_screenshot)) time.sleep(int(self.sleep_before_screenshot))
screenshot_file = os.path.join(ArchivingContext.get_tmp_dir(), f"screenshot_{random_str(8)}.png") screenshot_file = os.path.join(self.tmp_dir, f"screenshot_{random_str(8)}.png")
driver.save_screenshot(screenshot_file) driver.save_screenshot(screenshot_file)
to_enrich.add_media(Media(filename=screenshot_file), id="screenshot") to_enrich.add_media(Media(filename=screenshot_file), id="screenshot")
if self.save_to_pdf: if self.save_to_pdf:
pdf_file = os.path.join(ArchivingContext.get_tmp_dir(), f"pdf_{random_str(8)}.pdf") pdf_file = os.path.join(self.tmp_dir, f"pdf_{random_str(8)}.pdf")
pdf = driver.print_page(driver.print_options) pdf = driver.print_page(driver.print_options)
with open(pdf_file, "wb") as f: with open(pdf_file, "wb") as f:
f.write(base64.b64decode(pdf)) f.write(base64.b64decode(pdf))

Wyświetl plik

@ -23,6 +23,6 @@ class SSLEnricher(Enricher):
logger.debug(f"fetching SSL certificate for {domain=} in {url=}") logger.debug(f"fetching SSL certificate for {domain=} in {url=}")
cert = ssl.get_server_certificate((domain, 443)) cert = ssl.get_server_certificate((domain, 443))
cert_fn = os.path.join(ArchivingContext.get_tmp_dir(), f"{slugify(domain)}.pem") cert_fn = os.path.join(self.tmp_dir, f"{slugify(domain)}.pem")
with open(cert_fn, "w") as f: f.write(cert) with open(cert_fn, "w") as f: f.write(cert)
to_enrich.add_media(Media(filename=cert_fn), id="ssl_certificate") to_enrich.add_media(Media(filename=cert_fn), id="ssl_certificate")

Wyświetl plik

@ -9,7 +9,7 @@ from tqdm import tqdm
import re, time, json, os import re, time, json, os
from auto_archiver.core import Extractor from auto_archiver.core import Extractor
from auto_archiver.core import Metadata, Media, ArchivingContext from auto_archiver.core import Metadata, Media
from auto_archiver.utils import random_str from auto_archiver.utils import random_str
@ -120,7 +120,7 @@ class TelethonArchiver(Extractor):
media_posts = self._get_media_posts_in_group(chat, post) media_posts = self._get_media_posts_in_group(chat, post)
logger.debug(f'got {len(media_posts)=} for {url=}') logger.debug(f'got {len(media_posts)=} for {url=}')
tmp_dir = ArchivingContext.get_tmp_dir() tmp_dir = self.tmp_dir
group_id = post.grouped_id if post.grouped_id is not None else post.id group_id = post.grouped_id if post.grouped_id is not None else post.id
title = post.message title = post.message

Wyświetl plik

@ -28,7 +28,7 @@ class ThumbnailEnricher(Enricher):
logger.debug(f"generating thumbnails for {to_enrich.get_url()}") logger.debug(f"generating thumbnails for {to_enrich.get_url()}")
for m_id, m in enumerate(to_enrich.media[::]): for m_id, m in enumerate(to_enrich.media[::]):
if m.is_video(): if m.is_video():
folder = os.path.join(ArchivingContext.get_tmp_dir(), random_str(24)) folder = os.path.join(self.tmp_dir, random_str(24))
os.makedirs(folder, exist_ok=True) os.makedirs(folder, exist_ok=True)
logger.debug(f"generating thumbnails for {m.filename}") logger.debug(f"generating thumbnails for {m.filename}")
duration = m.get("duration") duration = m.get("duration")

Wyświetl plik

@ -9,9 +9,7 @@ from asn1crypto import pem
import certifi import certifi
from auto_archiver.core import Enricher from auto_archiver.core import Enricher
from auto_archiver.core import Metadata, ArchivingContext, Media from auto_archiver.core import Metadata, Media
from auto_archiver.core import Extractor
class TimestampingEnricher(Enricher): class TimestampingEnricher(Enricher):
""" """
@ -33,7 +31,7 @@ class TimestampingEnricher(Enricher):
logger.warning(f"No hashes found in {url=}") logger.warning(f"No hashes found in {url=}")
return return
tmp_dir = ArchivingContext.get_tmp_dir() tmp_dir = self.tmp_dir
hashes_fn = os.path.join(tmp_dir, "hashes.txt") hashes_fn = os.path.join(tmp_dir, "hashes.txt")
data_to_sign = "\n".join(hashes) data_to_sign = "\n".join(hashes)
@ -93,7 +91,7 @@ class TimestampingEnricher(Enricher):
cert_chain = [] cert_chain = []
for cert in path: for cert in path:
cert_fn = os.path.join(ArchivingContext.get_tmp_dir(), f"{str(cert.serial_number)[:20]}.crt") cert_fn = os.path.join(self.tmp_dir, f"{str(cert.serial_number)[:20]}.crt")
with open(cert_fn, "wb") as f: with open(cert_fn, "wb") as f:
f.write(cert.dump()) f.write(cert.dump())
cert_chain.append(Media(filename=cert_fn).set("subject", cert.subject.native["common_name"])) cert_chain.append(Media(filename=cert_fn).set("subject", cert.subject.native["common_name"]))

Wyświetl plik

@ -3,7 +3,7 @@ from vk_url_scraper import VkScraper
from auto_archiver.utils.misc import dump_payload from auto_archiver.utils.misc import dump_payload
from auto_archiver.core import Extractor from auto_archiver.core import Extractor
from auto_archiver.core import Metadata, Media, ArchivingContext from auto_archiver.core import Metadata, Media
class VkExtractor(Extractor): class VkExtractor(Extractor):
@ -35,7 +35,7 @@ class VkExtractor(Extractor):
result.set_content(dump_payload(vk_scrapes)) result.set_content(dump_payload(vk_scrapes))
filenames = self.vks.download_media(vk_scrapes, ArchivingContext.get_tmp_dir()) filenames = self.vks.download_media(vk_scrapes, self.tmp_dir)
for filename in filenames: for filename in filenames:
result.add_media(Media(filename)) result.add_media(Media(filename))

Wyświetl plik

@ -5,7 +5,7 @@ from zipfile import ZipFile
from loguru import logger from loguru import logger
from warcio.archiveiterator import ArchiveIterator from warcio.archiveiterator import ArchiveIterator
from auto_archiver.core import Media, Metadata, ArchivingContext from auto_archiver.core import Media, Metadata
from auto_archiver.core import Extractor, Enricher from auto_archiver.core import Extractor, Enricher
from auto_archiver.utils import UrlUtil, random_str from auto_archiver.utils import UrlUtil, random_str
@ -51,7 +51,7 @@ class WaczExtractorEnricher(Enricher, Extractor):
url = to_enrich.get_url() url = to_enrich.get_url()
collection = random_str(8) collection = random_str(8)
browsertrix_home_host = self.browsertrix_home_host or os.path.abspath(ArchivingContext.get_tmp_dir()) browsertrix_home_host = self.browsertrix_home_host or os.path.abspath(self.tmp_dir)
browsertrix_home_container = self.browsertrix_home_container or browsertrix_home_host browsertrix_home_container = self.browsertrix_home_container or browsertrix_home_host
cmd = [ cmd = [
@ -154,7 +154,7 @@ class WaczExtractorEnricher(Enricher, Extractor):
logger.info(f"WACZ extract_media or extract_screenshot flag is set, extracting media from {wacz_filename=}") logger.info(f"WACZ extract_media or extract_screenshot flag is set, extracting media from {wacz_filename=}")
# unzipping the .wacz # unzipping the .wacz
tmp_dir = ArchivingContext.get_tmp_dir() tmp_dir = self.tmp_dir
unzipped_dir = os.path.join(tmp_dir, "unzipped") unzipped_dir = os.path.join(tmp_dir, "unzipped")
with ZipFile(wacz_filename, 'r') as z_obj: with ZipFile(wacz_filename, 'r') as z_obj:
z_obj.extractall(path=unzipped_dir) z_obj.extractall(path=unzipped_dir)

Wyświetl plik

@ -3,4 +3,3 @@ import tempfile
from auto_archiver.core.context import ArchivingContext from auto_archiver.core.context import ArchivingContext
ArchivingContext.reset(full_reset=True) ArchivingContext.reset(full_reset=True)
ArchivingContext.set_tmp_dir(tempfile.gettempdir())

Wyświetl plik

@ -2,6 +2,7 @@
pytest conftest file, for shared fixtures and configuration pytest conftest file, for shared fixtures and configuration
""" """
from tempfile import TemporaryDirectory
from typing import Dict, Tuple from typing import Dict, Tuple
import hashlib import hashlib
import pytest import pytest
@ -25,8 +26,13 @@ def setup_module(request):
m = get_module(module_name, {module_name: config}) m = get_module(module_name, {module_name: config})
# add the tmp_dir to the module
tmp_dir = TemporaryDirectory()
m.tmp_dir = tmp_dir
def cleanup(): def cleanup():
_LAZY_LOADED_MODULES.pop(module_name) _LAZY_LOADED_MODULES.pop(module_name)
tmp_dir.cleanup()
request.addfinalizer(cleanup) request.addfinalizer(cleanup)
return m return m

Wyświetl plik

@ -1,6 +1,6 @@
import pytest import pytest
import sys import sys
from argparse import ArgumentParser from argparse import ArgumentParser, ArgumentTypeError
from auto_archiver.core.orchestrator import ArchivingOrchestrator from auto_archiver.core.orchestrator import ArchivingOrchestrator
from auto_archiver.version import __version__ from auto_archiver.version import __version__
from auto_archiver.core.config import read_yaml, store_yaml from auto_archiver.core.config import read_yaml, store_yaml
@ -113,16 +113,23 @@ def test_get_required_values_from_config(orchestrator, test_args, tmp_path):
# run the orchestrator # run the orchestrator
orchestrator.run(["--config", tmp_file, "--module_paths", TEST_MODULES]) orchestrator.run(["--config", tmp_file, "--module_paths", TEST_MODULES])
assert orchestrator.config is not None
# should run OK, since there are no missing required fields def test_load_authentication_string(orchestrator, test_args):
# basic_args = basic_parser.parse_known_args(test_args) orchestrator.run(test_args + ["--authentication", '{"facebook.com": {"username": "my_username", "password": "my_password"}}'])
# test_yaml = read_yaml(TEST_ORCHESTRATION) assert orchestrator.config['authentication'] == {"facebook.com": {"username": "my_username", "password": "my_password"}}
# test_yaml['example_module'] = {'required_field': 'some_value'}
# # monkey patch the example_module to have a 'configs' setting of 'my_var' with required=True def test_load_authentication_string_concat_site(orchestrator, test_args):
# # load the module first
# m = get_module_lazy("example_module")
# orchestrator.setup_complete_parser(basic_args, test_yaml, unused_args=[]) orchestrator.run(test_args + ["--authentication", '{"x.com,twitter.com": {"api_key": "my_key"}}'])
# assert orchestrator.config is not None assert orchestrator.config['authentication'] == {"x.com": {"api_key": "my_key"},
"twitter.com": {"api_key": "my_key"}}
def test_load_invalid_authentication_string(orchestrator, test_args):
with pytest.raises(ArgumentTypeError):
orchestrator.run(test_args + ["--authentication", "{\''invalid_json"])
def test_load_authentication_invalid_dict(orchestrator, test_args):
with pytest.raises(ArgumentTypeError):
orchestrator.run(test_args + ["--authentication", "[true, false]"])