Refactor loader + step into module, use LazyBaseModule and BaseModule

pull/224/head
Patrick Robertson 2025-01-27 14:01:36 +01:00
rodzic 7fd95866a1
commit f68e2726f2
16 zmienionych plików z 232 dodań i 231 usunięć

Wyświetl plik

@ -3,13 +3,11 @@ from dataclasses import dataclass
from abc import abstractmethod, ABC
from typing import Union
from auto_archiver.core import Metadata, Step
from auto_archiver.core import Metadata, BaseModule
@dataclass
class Database(Step, ABC):
name = "database"
class Database(BaseModule):
def started(self, item: Metadata) -> None:
"""signals the DB that the given item archival has started"""

Wyświetl plik

@ -11,12 +11,11 @@ Enrichers are optional but highly useful for making the archived data more power
from __future__ import annotations
from dataclasses import dataclass
from abc import abstractmethod, ABC
from auto_archiver.core import Metadata, Step
from auto_archiver.core import Metadata, BaseModule
@dataclass
class Enricher(Step, ABC):
class Enricher(BaseModule):
"""Base classes and utilities for enrichers in the Auto-Archiver system."""
name = "enricher"
@abstractmethod
def enrich(self, to_enrich: Metadata) -> None: pass

Wyświetl plik

@ -25,7 +25,7 @@ class Extractor:
Subclasses must implement the `download` method to define platform-specific behavior.
"""
def setup(self) -> None:
def setup(self, *args, **kwargs) -> None:
# used when extractors need to login or do other one-time setup
pass

Wyświetl plik

@ -2,12 +2,11 @@ from __future__ import annotations
from dataclasses import dataclass
from abc import abstractmethod
from auto_archiver.core import Metadata
from auto_archiver.core import Step
from auto_archiver.core import BaseModule
@dataclass
class Feeder(Step):
name = "feeder"
class Feeder(BaseModule):
@abstractmethod
def __iter__(self) -> Metadata: return None

Wyświetl plik

@ -1,20 +1,11 @@
from __future__ import annotations
from dataclasses import dataclass
from abc import abstractmethod
from auto_archiver.core import Metadata, Media, Step
from auto_archiver.core import Metadata, Media, BaseModule
@dataclass
class Formatter(Step):
name = "formatter"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
def init(name: str, config: dict) -> Formatter:
# only for code typing
return Step.init(name, config, Formatter)
class Formatter(BaseModule):
@abstractmethod
def format(self, item: Metadata) -> Media: return None

Wyświetl plik

@ -6,19 +6,14 @@ import os
from auto_archiver.utils.misc import random_str
from auto_archiver.core import Media, Step, ArchivingContext, Metadata
from auto_archiver.core import Media, BaseModule, ArchivingContext, Metadata
from auto_archiver.modules.hash_enricher.hash_enricher import HashEnricher
from loguru import logger
from slugify import slugify
@dataclass
class Storage(Step):
name = "storage"
def init(name: str, config: dict) -> Storage:
# only for typing...
return Step.init(name, config, Storage)
class Storage(BaseModule):
def store(self, media: Media, url: str, metadata: Optional[Metadata]=None) -> None:
if media.is_stored():

Wyświetl plik

@ -3,7 +3,7 @@
"""
from .metadata import Metadata
from .media import Media
from .step import Step
from .module import BaseModule
from .context import ArchivingContext
# cannot import ArchivingOrchestrator/Config to avoid circular dep

Wyświetl plik

@ -9,7 +9,7 @@ import argparse
from ruamel.yaml import YAML, CommentedMap, add_representer
from copy import deepcopy
from .loader import MODULE_TYPES
from .module import MODULE_TYPES
from typing import Any, List, Type

Wyświetl plik

@ -1,173 +0,0 @@
import ast
from typing import Type
from importlib.util import find_spec
from dataclasses import dataclass
import os
import copy
from os.path import join, dirname
from typing import List
from loguru import logger
import sys
import shutil
_LOADED_MODULES = {}
MODULE_TYPES = [
'feeder',
'enricher',
'extractor',
'database',
'storage',
'formatter'
]
MANIFEST_FILE = "__manifest__.py"
_DEFAULT_MANIFEST = {
'name': '',
'author': 'Bellingcat',
'type': [],
'requires_setup': True,
'description': '',
'dependencies': {},
'entry_point': '',
'version': '1.0',
'configs': {}
}
@dataclass
class Module:
name: str
display_name: str
type: list
dependencies: dict
requires_setup: bool
configs: dict
description: str
path: str
manifest: dict
def __init__(self, module_name, path, manifest):
self.name = module_name
self.path = path
self.manifest = manifest
if manifest:
self.display_name = manifest['name']
self.type = manifest['type']
self._entry_point = manifest['entry_point']
self.dependencies = manifest['dependencies']
self.requires_setup = manifest['requires_setup']
self.configs = manifest['configs']
self.description = manifest['description']
@property
def entry_point(self):
if not self._entry_point:
# try to create the entry point from the module name
self._entry_point = f"{self.name}::{self.name.replace('_', ' ').title().replace(' ', '')}"
return self._entry_point
def __repr__(self):
return f"Module<'{self.display_name}' ({self.name})>"
def load_module(module: str) -> object: # TODO: change return type to Step
if module in _LOADED_MODULES:
return _LOADED_MODULES[module]
# load a module by name
module = get_module(module)
if not module:
return None
# check external dependencies are installed
def check_deps(deps, check):
for dep in deps:
if not check(dep):
logger.error(f"Module '{module.name}' requires external dependency '{dep}' which is not available. Have you installed the required dependencies for the '{module.name}' module? See the README for more information.")
exit(1)
check_deps(module.dependencies.get('python', []), lambda dep: find_spec(dep))
check_deps(module.dependencies.get('bin', []), lambda dep: shutil.which(dep))
qualname = f'auto_archiver.modules.{module.name}'
logger.info(f"Loading module '{module.display_name}'...")
# first import the whole module, to make sure it's working properly
__import__(qualname)
# then import the file for the entry point
file_name, class_name = module.entry_point.split('::')
sub_qualname = f'{qualname}.{file_name}'
__import__(f'{qualname}.{file_name}', fromlist=[module.entry_point])
# finally, get the class instance
instance = getattr(sys.modules[sub_qualname], class_name)()
if not getattr(instance, 'name', None):
instance.name = module.name
_LOADED_MODULES[module.name] = instance
return _LOADED_MODULES[module.name]
# finally, load the module
def load_manifest(module_path):
# print(f"Loading manifest for module {module_path}")
# load the manifest file
manifest = copy.deepcopy(_DEFAULT_MANIFEST)
with open(join(module_path, MANIFEST_FILE)) as f:
try:
manifest.update(ast.literal_eval(f.read()))
except ( ValueError, TypeError, SyntaxError, MemoryError, RecursionError) as e:
logger.error(f"Error loading manifest from file {module_path}/{MANIFEST_FILE}: {e}")
return manifest
return manifest
def get_module(module_name):
# get a module by name
try:
return available_modules(limit_to_modules=[module_name], with_manifest=True)[0]
except IndexError:
return None
def available_modules(with_manifest: bool=False, limit_to_modules: List[str]= [], additional_paths: List[str] = [], suppress_warnings: bool = False) -> List[Module]:
# search through all valid 'modules' paths. Default is 'modules' in the current directory
# see odoo/modules/module.py -> get_modules
def is_really_module(name):
if os.path.isfile(join(name, MANIFEST_FILE)):
return True
default_path = [join(dirname(dirname((__file__))), "modules")]
all_modules = []
for module_folder in default_path + additional_paths:
# walk through each module in module_folder and check if it has a valid manifest
try:
possible_modules = os.listdir(module_folder)
except FileNotFoundError:
logger.warning(f"Module folder {module_folder} does not exist")
continue
for possible_module in possible_modules:
if limit_to_modules and possible_module not in limit_to_modules:
continue
possible_module_path = join(module_folder, possible_module)
if not is_really_module(possible_module_path):
continue
# parse manifest and add to list of available modules
if with_manifest:
manifest = load_manifest(possible_module_path)
else:
manifest = {}
all_modules.append(Module(possible_module, possible_module_path, manifest))
if not suppress_warnings:
for module in limit_to_modules:
if not any(module == m.name for m in all_modules):
logger.warning(f"Module '{module}' not found. Are you sure it's installed?")
return all_modules

Wyświetl plik

@ -0,0 +1,196 @@
"""
Defines the Step abstract base class, which acts as a blueprint for steps in the archiving pipeline
by handling user configuration, validating the steps properties, and implementing dynamic instantiation.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import List
from abc import ABC
import shutil
import ast
import copy
import sys
from importlib.util import find_spec
import os
from os.path import join, dirname
from loguru import logger
_LAZY_LOADED_MODULES = {}
MODULE_TYPES = [
'feeder',
'extractor',
'enricher',
'database',
'storage',
'formatter'
]
MANIFEST_FILE = "__manifest__.py"
_DEFAULT_MANIFEST = {
'name': '',
'author': 'Bellingcat',
'type': [],
'requires_setup': True,
'description': '',
'dependencies': {},
'entry_point': '',
'version': '1.0',
'configs': {}
}
class BaseModule(ABC):
config: dict
name: str
def setup(self, config: dict):
self.config = config
for key, val in config.get(self.name, {}).items():
setattr(self, key, val)
def get_module(module_name: str, additional_paths: List[str] = []):
if module_name in _LAZY_LOADED_MODULES:
return _LAZY_LOADED_MODULES[module_name]
module = available_modules(additional_paths=additional_paths, limit_to_modules=[module_name])[0]
_LAZY_LOADED_MODULES[module_name] = module
return module
def available_modules(with_manifest: bool=False, limit_to_modules: List[str]= [], additional_paths: List[str] = [], suppress_warnings: bool = False) -> List[LazyBaseModule]:
# search through all valid 'modules' paths. Default is 'modules' in the current directory
# see odoo/modules/module.py -> get_modules
def is_really_module(module_path):
if os.path.isfile(join(module_path, MANIFEST_FILE)):
return True
default_path = [join(dirname(dirname((__file__))), "modules")]
all_modules = []
for module_folder in default_path + additional_paths:
# walk through each module in module_folder and check if it has a valid manifest
try:
possible_modules = os.listdir(module_folder)
except FileNotFoundError:
logger.warning(f"Module folder {module_folder} does not exist")
continue
for possible_module in possible_modules:
if limit_to_modules and possible_module not in limit_to_modules:
continue
possible_module_path = join(module_folder, possible_module)
if not is_really_module(possible_module_path):
continue
all_modules.append(LazyBaseModule(possible_module, possible_module_path))
if not suppress_warnings:
for module in limit_to_modules:
if not any(module == m.name for m in all_modules):
logger.warning(f"Module '{module}' not found. Are you sure it's installed?")
return all_modules
@dataclass
class LazyBaseModule:
name: str
display_name: str
type: list
requires_setup: bool
description: str
path: str
_manifest: dict = None
_instance: BaseModule = None
_entry_point: str = None
def __init__(self, module_name, path):
self.name = module_name
self.path = path
@property
def entry_point(self):
if not self._entry_point and not self.manifest['entry_point']:
# try to create the entry point from the module name
self._entry_point = f"{self.name}::{self.name.replace('_', ' ').title().replace(' ', '')}"
return self._entry_point
@property
def dependencies(self):
return self.manifest['dependencies']
@property
def configs(self):
return self.manifest['configs']
@property
def manifest(self):
if self._manifest:
return self._manifest
# print(f"Loading manifest for module {module_path}")
# load the manifest file
manifest = copy.deepcopy(_DEFAULT_MANIFEST)
with open(join(self.path, MANIFEST_FILE)) as f:
try:
manifest.update(ast.literal_eval(f.read()))
except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError) as e:
logger.error(f"Error loading manifest from file {self.path}/{MANIFEST_FILE}: {e}")
self._manifest = manifest
self.display_name = manifest['name']
self.type = manifest['type']
self._entry_point = manifest['entry_point']
self.requires_setup = manifest['requires_setup']
self.description = manifest['description']
return manifest
def load(self):
if self._instance:
return self._instance
# check external dependencies are installed
def check_deps(deps, check):
for dep in deps:
if not check(dep):
logger.error(f"Module '{self.name}' requires external dependency '{dep}' which is not available. Have you installed the required dependencies for the '{self.name}' module? See the README for more information.")
exit(1)
check_deps(self.dependencies.get('python', []), lambda dep: find_spec(dep))
check_deps(self.dependencies.get('bin', []), lambda dep: shutil.which(dep))
logger.debug(f"Loading module '{self.display_name}'...")
for qualname in [self.name, f'auto_archiver.modules.{self.name}']:
try:
# first import the whole module, to make sure it's working properly
__import__(qualname)
break
except ImportError:
pass
# then import the file for the entry point
file_name, class_name = self.entry_point.split('::')
sub_qualname = f'{qualname}.{file_name}'
__import__(f'{qualname}.{file_name}', fromlist=[self.entry_point])
# finally, get the class instance
instance = getattr(sys.modules[sub_qualname], class_name)()
if not getattr(instance, 'name', None):
instance.name = self.name
if not getattr(instance, 'display_name', None):
instance.display_name = self.display_name
self._instance = instance
return instance
def __repr__(self):
return f"Module<'{self.display_name}' ({self.name})>"

Wyświetl plik

@ -19,8 +19,9 @@ from .context import ArchivingContext
from .metadata import Metadata
from ..version import __version__
from .config import read_yaml, store_yaml, to_dot_notation, merge_dicts, EMPTY_CONFIG
from .loader import available_modules, Module, MODULE_TYPES, load_module
from .module import available_modules, LazyBaseModule, MODULE_TYPES, get_module
from . import validators
from .module import BaseModule
import tempfile, traceback
from loguru import logger
@ -107,7 +108,7 @@ class ArchivingOrchestrator:
else:
# load all modules, they're not using the 'simple' mode
self.add_module_args(available_modules(with_manifest=True), parser)
parser.set_defaults(**to_dot_notation(yaml_config))
# reload the parser with the new arguments, now that we have them
@ -147,22 +148,27 @@ class ArchivingOrchestrator:
parser.add_argument('--logging.file', action='store', dest='logging.file', help='the logging file to write to', default=None)
parser.add_argument('--logging.rotation', action='store', dest='logging.rotation', help='the logging rotation to use', default=None)
def add_module_args(self, modules: list[Module] = None, parser: argparse.ArgumentParser = None):
# additional modules
parser.add_argument('--additional-modules', dest='additional_modules', nargs='+', help='additional paths to search for modules', action=UniqueAppendAction)
def add_module_args(self, modules: list[LazyBaseModule] = None, parser: argparse.ArgumentParser = None):
if not modules:
modules = available_modules(with_manifest=True)
module: Module
module: LazyBaseModule
for module in modules:
if not module.configs:
# this module has no configs, don't show anything in the help
# (TODO: do we want to show something about this module though, like a description?)
continue
group = parser.add_argument_group(module.display_name or module.name, f"{module.description[:100]}...")
for name, kwargs in module.configs.items():
# TODO: go through all the manifests and make sure we're not breaking anything with removing cli_set
# in most cases it'll mean replacing it with 'type': 'str' or 'type': 'int' or something
kwargs.pop('cli_set', None)
kwargs['dest'] = f"{module.name}.{kwargs.pop('dest', name)}"
try:
kwargs['type'] = __builtins__.get(kwargs.get('type'), str)
@ -210,10 +216,11 @@ class ArchivingOrchestrator:
logger.error(f"Only one {module_type} is allowed, found {len(step_items)} {module_type}s. Please remove one of the following from your configuration file: {modules_to_load}")
exit()
for i, module in enumerate(modules_to_load):
for module in modules_to_load:
if module in invalid_modules:
continue
loaded_module = load_module(module)
loaded_module: BaseModule = get_module(module).load()
loaded_module.setup(self.config)
if not loaded_module:
invalid_modules.append(module)
continue
@ -238,6 +245,8 @@ class ArchivingOrchestrator:
if basic_config.help:
self.show_help()
logger.info(f"======== Welcome to the AUTO ARCHIVER ({__version__}) ==========")
# load the config file
yaml_config = {}
@ -252,12 +261,9 @@ class ArchivingOrchestrator:
self.install_modules()
logger.info("FEEDERS: " + ", ".join(m.name for m in self.config['steps']['feeders']))
logger.info("EXTRACTORS: " + ", ".join(m.name for m in self.config['steps']['extractors']))
logger.info("ENRICHERS: " + ", ".join(m.name for m in self.config['steps']['enrichers']))
logger.info("DATABASES: " + ", ".join(m.name for m in self.config['steps']['databases']))
logger.info("STORAGES: " + ", ".join(m.name for m in self.config['steps']['storages']))
logger.info("FORMATTERS: " + ", ".join(m.name for m in self.config['steps']['formatters']))
# log out the modules that were loaded
for module_type in MODULE_TYPES:
logger.info(f"{module_type.upper()}S: " + ", ".join(m.display_name for m in self.config['steps'][f"{module_type}s"]))
for item in self.feed():
pass

Wyświetl plik

@ -1,11 +0,0 @@
"""
Defines the Step abstract base class, which acts as a blueprint for steps in the archiving pipeline
by handling user configuration, validating the steps properties, and implementing dynamic instantiation.
"""
from __future__ import annotations
class Step:
# Nothing to see here :)
pass

Wyświetl plik

@ -3,3 +3,5 @@
def example_validator(value):
return "example" in value
def positive_number(value):
return value > 0

Wyświetl plik

@ -8,9 +8,9 @@
'entry_point': 'cli_feeder::CLIFeeder',
"configs": {
"urls": {
"default": None,
"help": "URL(s) to archive, either a single URL or a list of urls, should not come from config.yaml",
"nargs": "+",
"required": True,
},
},
"description": """

Wyświetl plik

@ -5,11 +5,10 @@ from auto_archiver.core import Metadata, ArchivingContext
class CLIFeeder(Feeder):
name = "cli_feeder"
def __iter__(self) -> Metadata:
for url in self.urls:
logger.debug(f"Processing {url}")
logger.debug(f"Processing URL: '{url}'")
yield Metadata().set_url(url)
ArchivingContext.set("folder", "cli")

Wyświetl plik

@ -1,5 +1,5 @@
{
"name": "csv_db",
"name": "CSV Database",
"type": ["database"],
"requires_setup": False,
"external_dependencies": {"python": ["loguru"]