Merge branch 'load_modules' into more_mainifests

# Conflicts:
#	src/auto_archiver/core/orchestrator.py
pull/183/head
erinhmclark 2025-01-23 09:19:54 +00:00
commit 9db26cdfc2
8 zmienionych plików z 119 dodań i 141 usunięć

Wyświetl plik

@ -19,22 +19,12 @@ from ..core import Metadata, Step, ArchivingContext
@dataclass
class Archiver(Step):
class Archiver:
"""
Base class for implementing archivers in the media archiving framework.
Subclasses must implement the `download` method to define platform-specific behavior.
"""
name = "archiver"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
def init(name: str, config: dict) -> Archiver:
# only for typing...
return Step.init(name, config, Archiver)
def setup(self) -> None:
# used when archivers need to login or do other one-time setup
pass

Wyświetl plik

@ -1,6 +1,10 @@
""" Core modules to handle things such as orchestration, metadata and configs..
"""
from .metadata import Metadata
from .media import Media
from .step import Step
from .context import ArchivingContext
# cannot import ArchivingOrchestrator/Config to avoid circular dep
# from .orchestrator import ArchivingOrchestrator

Wyświetl plik

@ -1,5 +1,7 @@
import ast
from dataclasses import dataclass, field
from typing import Type
from importlib.util import find_spec
from dataclasses import dataclass
import os
import copy
from os.path import join, dirname
@ -8,6 +10,8 @@ from loguru import logger
import sys
import shutil
_LOADED_MODULES = {}
MODULE_TYPES = [
'feeder',
'enricher',
@ -22,9 +26,8 @@ _DEFAULT_MANIFEST = {
'name': '',
'author': 'Bellingcat',
'requires_setup': True,
'depends': [],
'description': '',
'external_dependencies': {},
'dependencies': {},
'entry_point': '',
'version': '1.0',
'configs': {}
@ -35,9 +38,7 @@ class Module:
name: str
display_name: str
type: list
entry_point: str
depends: list
external_dependencies: dict
dependencies: dict
requires_setup: bool
configs: dict
description: str
@ -51,54 +52,50 @@ class Module:
if manifest:
self.display_name = manifest['name']
self.type = manifest['type']
self.entry_point = manifest['entry_point']
self.depends = manifest['depends']
self.external_dependencies = manifest['external_dependencies']
self._entry_point = manifest['entry_point']
self.dependencies = manifest['dependencies']
self.requires_setup = manifest['requires_setup']
self.configs = manifest['configs']
self.description = manifest['description']
@property
def entry_point(self):
if not self._entry_point:
# try to create the entry point from the module name
self._entry_point = f"{self.name}::{self.name.replace('_', ' ').title().replace(' ', '')}"
return self._entry_point
def __repr__(self):
return f"Module<'{self.display_name}' ({self.name})>"
def load_modules(modules):
modules = available_modules(limit_to_modules=modules, with_manifest=True)
for module in modules:
_load_module(module)
def load_module(module: str) -> object: # TODO: change return type to Step
if module in _LOADED_MODULES:
return _LOADED_MODULES[module]
# load a module by name
module = get_module(module)
if not module:
return None
# check external dependencies are installed
def check_deps(deps, check):
for dep in deps:
if not check(dep):
logger.error(f"Module '{module.name}' requires external dependency '{dep}' which is not available. Have you installed the required dependencies for the '{module.name}' module? See the README for more information.")
exit(1)
check_deps(module.dependencies.get('python', []), lambda dep: find_spec(dep))
check_deps(module.dependencies.get('bin', []), lambda dep: shutil.which(dep))
qualname = f'auto_archiver.modules.{module.name}'
logger.info(f"Loading module '{module.display_name}'...")
loaded_module = __import__(qualname)
_LOADED_MODULES[module.name] = getattr(sys.modules[qualname], module.entry_point)()
return _LOADED_MODULES[module.name]
def _load_module(module):
# first make sure that the 'depends' are installed and available in sys.args
for dependency in module.depends:
if dependency not in sys.modules:
logger.error(f"""
Module {module.name} depends on {dependency} which is not available.
Have you set up the '{module.name}' module correctly? See the README for more information.
""")
exit()
# then check the external dependencies, these are binary dependencies that should be available on the path
for dep_type, deps in module.external_dependencies.items():
if dep_type == 'python':
for dep in deps:
if dep not in sys.modules:
logger.error(f"""
Module {module.name} requires {dep} which is not available.
Have you installed the required dependencies for the '{module.name}' module? See the README for more information.
""")
elif dep_type == 'binary':
for dep in deps:
if not shutil.which(dep):
logger.error(f"""
Module {module.name} requires {dep} which is not available.
Have you installed the required dependencies for the '{module.name}' module? See the README for more information.
""")
# finally, load the module
logger.info(f"Loading module {module.display_name}")
module = __import__(module.entry_point, fromlist=[module.entry_point])
logger.info(f"Module {module.display_name} loaded")
def load_manifest(module_path):
# print(f"Loading manifest for module {module_path}")
@ -109,7 +106,14 @@ def load_manifest(module_path):
manifest.update(ast.literal_eval(f.read()))
return manifest
def available_modules(with_manifest: bool=False, limit_to_modules: List[str]= [], additional_paths: List[str] = [], ) -> List[Module]:
def get_module(module_name):
# get a module by name
try:
return available_modules(limit_to_modules=[module_name], with_manifest=True, suppress_warnings=True)[0]
except IndexError:
return None
def available_modules(with_manifest: bool=False, limit_to_modules: List[str]= [], additional_paths: List[str] = [], suppress_warnings: bool = False) -> List[Module]:
# search through all valid 'modules' paths. Default is 'modules' in the current directory
# see odoo/modules/module.py -> get_modules
@ -142,8 +146,9 @@ def available_modules(with_manifest: bool=False, limit_to_modules: List[str]= []
manifest = {}
all_modules.append(Module(possible_module, possible_module_path, manifest))
for module in limit_to_modules:
if not any(module == m.name for m in all_modules):
logger.warning(f"Module {module} not found in available modules. Are you sure it's installed?")
if not suppress_warnings:
for module in limit_to_modules:
if not any(module == m.name for m in all_modules):
logger.warning(f"Module '{module}' not found in available modules. Are you sure it's installed?")
return all_modules

Wyświetl plik

@ -19,7 +19,7 @@ from .context import ArchivingContext
from .metadata import Metadata
from ..version import __version__
from .config import read_yaml, store_yaml, to_dot_notation, merge_dicts, EMPTY_CONFIG
from .loader import available_modules, Module, MODULE_TYPES, load_modules
from .loader import available_modules, Module, MODULE_TYPES, load_module
import tempfile, traceback
from loguru import logger
@ -85,7 +85,7 @@ class ArchivingOrchestrator:
if modules := getattr(basic_config, f"{module_type}s", []):
enabled_modules.extend(modules)
self.add_module_args(available_modules(with_manifest=True, limit_to_modules=enabled_modules), parser)
self.add_module_args(available_modules(with_manifest=True, limit_to_modules=set(enabled_modules)), parser)
elif basic_config.mode == 'simple':
simple_modules = [module for module in available_modules(with_manifest=True) if not module.requires_setup]
self.add_module_args(simple_modules, parser)
@ -98,6 +98,7 @@ class ArchivingOrchestrator:
self.add_module_args(available_modules(with_manifest=True), parser)
breakpoint()
parser.set_defaults(**to_dot_notation(yaml_config))
# reload the parser with the new arguments, now that we have them
@ -106,32 +107,26 @@ class ArchivingOrchestrator:
logger.warning(f"Ignoring unknown/unused arguments: {unknown}\nPerhaps you don't have this module enabled?")
# merge the new config with the old one
yaml_config = merge_dicts(vars(parsed), yaml_config)
merged_yaml_config = merge_dicts(vars(parsed), yaml_config)
if basic_config.store or not os.path.isfile(join(dirname(__file__), basic_config.config_file)):
if (merged_yaml_config != yaml_config and basic_config.store) or not os.path.isfile(basic_config.config_file):
logger.info(f"Storing configuration file to {basic_config.config_file}")
store_yaml(yaml_config, basic_config.config_file)
self.config = yaml_config
self.config = merged_yaml_config
logger.info("FEEDERS: " + ", ".join(self.config['steps']['feeders']))
logger.info("EXTRACTORS: " + ", ".join(self.config['steps']['extractors']))
logger.info("ENRICHERS: " + ", ".join(self.config['steps']['enrichers']))
logger.info("DATABASES: " + ", ".join(self.config['steps']['databases']))
logger.info("STORAGES: " + ", ".join(self.config['steps']['storages']))
logger.info("FORMATTERS: " + ", ".join(self.config['steps']['formatters']))
return self.config
def add_steps_args(self, parser: argparse.ArgumentParser = None):
if not parser:
parser = self.parser
parser.add_argument('--feeders', action='store', dest='steps.feeders', nargs='+', required=True, help='the feeders to use')
parser.add_argument('--enrichers', action='store', dest='steps.enrichers', nargs='+', required=True, help='the enrichers to use')
parser.add_argument('--extractors', action='store', dest='steps.extractors', nargs='+', required=True, help='the extractors to use')
parser.add_argument('--databases', action='store', dest='steps.databases', nargs='+', required=True, help='the databases to use')
parser.add_argument('--storages', action='store', dest='steps.storages', nargs='+', required=True, help='the storages to use')
parser.add_argument('--formatters', action='store', dest='steps.formatters', nargs='+', required=True, help='the formatter to use')
parser.add_argument('--feeders', action='store', dest='steps.feeders', nargs='+', help='the feeders to use')
parser.add_argument('--enrichers', action='store', dest='steps.enrichers', nargs='+', help='the enrichers to use')
parser.add_argument('--extractors', action='store', dest='steps.extractors', nargs='+', help='the extractors to use')
parser.add_argument('--databases', action='store', dest='steps.databases', nargs='+', help='the databases to use')
parser.add_argument('--storages', action='store', dest='steps.storages', nargs='+', help='the storages to use')
parser.add_argument('--formatters', action='store', dest='steps.formatters', nargs='+', help='the formatter to use')
def add_module_args(self, modules: list[Module] = None, parser: argparse.ArgumentParser = None):
@ -164,10 +159,35 @@ class ArchivingOrchestrator:
exit()
def install_modules(self):
modules = set()
[modules.update(*m) for m in self.config['steps'].values()]
"""
Swaps out the previous 'strings' in the config with the actual modules
"""
for module_type in MODULE_TYPES:
if module_type == 'enricher':
breakpoint()
step_items = []
modules_to_load = self.config['steps'][f"{module_type}s"]
load_modules(modules)
def check_steps_ok():
if not len(step_items):
logger.error(f"NO {module_type.upper()}S LOADED. Please check your configuration file and try again. Tried to load the following modules, but none were available: {modules_to_load}")
exit()
if (module_type == 'feeder' or module_type == 'formatter') and len(step_items) > 1:
logger.error(f"Only one feeder is allowed, found {len(step_items)} {module_type}s. Please remove one of the following from your configuration file: {modules_to_load}")
exit()
for i, module in enumerate(modules_to_load):
loaded_module = load_module(module)
if loaded_module:
step_items.append(loaded_module)
check_steps_ok()
self.config['steps'][f"{module_type}s"] = step_items
assert len(step_items) > 0, f"No {module_type}s were loaded. Please check your configuration file and try again."
self.config['steps'][f"{module_type}s"] = step_items
def run(self) -> None:
self.setup_basic_parser()
@ -190,16 +210,26 @@ class ArchivingOrchestrator:
yaml_config = read_yaml(basic_config.config_file)
self.setup_complete_parser(basic_config, yaml_config, unused_args)
self.install_modules()
logger.info("FEEDERS: " + ", ".join(m.name for m in self.config['steps']['feeders']))
logger.info("EXTRACTORS: " + ", ".join(m.name for m in self.config['steps']['extractors']))
logger.info("ENRICHERS: " + ", ".join(m.name for m in self.config['steps']['enrichers']))
logger.info("DATABASES: " + ", ".join(m.name for m in self.config['steps']['databases']))
logger.info("STORAGES: " + ", ".join(m.name for m in self.config['steps']['storages']))
logger.info("FORMATTERS: " + ", ".join(m.name for m in self.config['steps']['formatters']))
for item in self.feed():
pass
def cleanup(self)->None:
logger.info("Cleaning up")
for a in self.all_archivers_for_setup(): a.cleanup()
for e in self.config['steps']['extractors']:
breakpoint()
e.cleanup()
def feed(self) -> Generator[Metadata]:
for feeder in self.config['steps']['feeders']:
@ -221,12 +251,12 @@ class ArchivingOrchestrator:
except KeyboardInterrupt:
# catches keyboard interruptions to do a clean exit
logger.warning(f"caught interrupt on {item=}")
for d in self.databases: d.aborted(item)
for d in self.config['steps']['databases']: d.aborted(item)
self.cleanup()
exit()
except Exception as e:
logger.error(f'Got unexpected error on item {item}: {e}\n{traceback.format_exc()}')
for d in self.databases:
for d in self.config['steps']['databases']:
if type(e) == AssertionError: d.failed(item, str(e))
else: d.failed(item)
@ -316,7 +346,4 @@ class ArchivingOrchestrator:
assert ip.is_global, f"Invalid IP used"
assert not ip.is_reserved, f"Invalid IP used"
assert not ip.is_link_local, f"Invalid IP used"
assert not ip.is_private, f"Invalid IP used"
def all_archivers_for_setup(self) -> List[Archiver]:
return self.archivers + [e for e in self.enrichers if isinstance(e, Archiver)]
assert not ip.is_private, f"Invalid IP used"

Wyświetl plik

@ -5,44 +5,7 @@ by handling user configuration, validating the steps properties, and implementin
"""
from __future__ import annotations
from dataclasses import dataclass
from inspect import ClassFoundException
from typing import Type
from abc import ABC
@dataclass
class Step(ABC):
name: str = None
def __init__(self, config: dict) -> None:
# Initialises each step by reading the relevant entries
# reads the configs into object properties
# self.config = config[self.name]
for k, v in config.get(self.name, {}).items():
self.__setattr__(k, v)
@staticmethod
def configs() -> dict: return {}
def init(name: str, config: dict, child: Type[Step]) -> Step:
"""
Attempts to instantiate a subclass of the provided `child` type
matching the given `name`.
Raises ClassFoundException if no matching subclass is found.
TODO: cannot find subclasses of child.subclasses
"""
for sub in child.__subclasses__():
if sub.name == name:
return sub(config)
raise ClassFoundException(f"Unable to initialize STEP with {name=}, check your configuration file/step names, and make sure you made the step discoverable by putting it into __init__.py")
def assert_valid_string(self, prop: str) -> None:
"""
Receives a property name and ensures it exists and is a valid non-empty string,
raising an AssertionError if not.
TODO: replace assertions with custom exceptions
"""
assert hasattr(self, prop), f"property {prop} not found"
s = getattr(self, prop)
assert s is not None and type(s) == str and len(s) > 0, f"invalid property {prop} value '{s}', it should be a valid string"
class Step:
# TODO: try and get this name from the manifest, so we don't have to set it twice
name: str

Wyświetl plik

@ -0,0 +1 @@
from .generic_extractor import GenericExtractor

Wyświetl plik

@ -2,11 +2,10 @@
'name': 'Generic Extractor',
'version': '0.1.0',
'author': 'Bellingcat',
'type': ['extractor'],
'entry_point': 'generic_extractor:GenericExtractor',
'type': ['extractor', 'feeder', 'enricher'],
'entry_point': 'GenericExtractor', # this class should be present in the __init__.py
'requires_setup': False,
'depends': ['core'],
'external_dependencies': {
'dependencies': {
'python': ['yt_dlp', 'requests', 'loguru', 'slugify'],
},
'description': """

Wyświetl plik

@ -12,17 +12,6 @@ class GenericExtractor(Archiver):
name = "youtubedl_archiver" #left as is for backwards compat
_dropins = {}
def __init__(self, config: dict) -> None:
super().__init__(config)
self.subtitles = bool(self.subtitles)
self.comments = bool(self.comments)
self.livestreams = bool(self.livestreams)
self.live_from_start = bool(self.live_from_start)
self.end_means_success = bool(self.end_means_success)
self.allow_playlist = bool(self.allow_playlist)
self.max_downloads = self.max_downloads
def suitable_extractors(self, url: str) -> list[str]:
"""
Returns a list of valid extractors for the given URL"""