kopia lustrzana https://github.com/bellingcat/auto-archiver
Fix up loading/storing configs + unit tests
rodzic
65ef46d01e
commit
b27bf8ffeb
|
@ -6,12 +6,17 @@ flexible setup in various environments.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import yaml
|
from ruamel.yaml import YAML, CommentedMap
|
||||||
|
from ruamel.yaml.comments import CommentedMap
|
||||||
|
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
from collections.abc import Iterable
|
||||||
|
from copy import deepcopy
|
||||||
from .loader import MODULE_TYPES
|
from .loader import MODULE_TYPES
|
||||||
|
|
||||||
|
from typing import Any, List
|
||||||
|
|
||||||
# configurable_parents = [
|
# configurable_parents = [
|
||||||
# Feeder,
|
# Feeder,
|
||||||
# Enricher,
|
# Enricher,
|
||||||
|
@ -50,21 +55,16 @@ from .loader import MODULE_TYPES
|
||||||
# parser.add_argument('--config', action='store', dest='config', help='the filename of the YAML configuration file (defaults to \'config.yaml\')', default='orchestration.yaml')
|
# parser.add_argument('--config', action='store', dest='config', help='the filename of the YAML configuration file (defaults to \'config.yaml\')', default='orchestration.yaml')
|
||||||
# parser.add_argument('--version', action='version', version=__version__)
|
# parser.add_argument('--version', action='version', version=__version__)
|
||||||
|
|
||||||
EMPTY_CONFIG = {
|
EMPTY_CONFIG = CommentedMap(**{
|
||||||
"steps": dict((f"{module_type}s", []) for module_type in MODULE_TYPES)
|
"steps": dict((f"{module_type}s", []) for module_type in MODULE_TYPES)
|
||||||
}
|
})
|
||||||
class LoadFromFile (argparse.Action):
|
|
||||||
def __call__ (self, parser, namespace, values, option_string = None):
|
|
||||||
with values as f:
|
|
||||||
# parse arguments in the file and store them in the target namespace
|
|
||||||
parser.parse_args(f.read().split(), namespace)
|
|
||||||
|
|
||||||
def to_dot_notation(yaml_conf: str) -> argparse.ArgumentParser:
|
def to_dot_notation(yaml_conf: CommentedMap | dict) -> argparse.ArgumentParser:
|
||||||
dotdict = {}
|
dotdict = {}
|
||||||
|
|
||||||
def process_subdict(subdict, prefix=""):
|
def process_subdict(subdict, prefix=""):
|
||||||
for key, value in subdict.items():
|
for key, value in subdict.items():
|
||||||
if type(value) == dict:
|
if is_dict_type(value):
|
||||||
process_subdict(value, f"{prefix}{key}.")
|
process_subdict(value, f"{prefix}{key}.")
|
||||||
else:
|
else:
|
||||||
dotdict[f"{prefix}{key}"] = value
|
dotdict[f"{prefix}{key}"] = value
|
||||||
|
@ -72,31 +72,64 @@ def to_dot_notation(yaml_conf: str) -> argparse.ArgumentParser:
|
||||||
process_subdict(yaml_conf)
|
process_subdict(yaml_conf)
|
||||||
return dotdict
|
return dotdict
|
||||||
|
|
||||||
def merge_dicts(dotdict, yaml_dict):
|
def from_dot_notation(dotdict: dict) -> dict:
|
||||||
def process_subdict(subdict, prefix=""):
|
normal_dict = {}
|
||||||
for key, value in subdict.items():
|
|
||||||
if "." in key:
|
def add_part(key, value, current_dict):
|
||||||
keys = key.split(".")
|
if "." in key:
|
||||||
subdict = yaml_dict
|
key_parts = key.split(".")
|
||||||
for k in keys[:-1]:
|
current_dict.setdefault(key_parts[0], {})
|
||||||
subdict = subdict.setdefault(k, {})
|
add_part(".".join(key_parts[1:]), value, current_dict[key_parts[0]])
|
||||||
subdict[keys[-1]] = value
|
else:
|
||||||
else:
|
current_dict[key] = value
|
||||||
yaml_dict[key] = value
|
|
||||||
|
for key, value in dotdict.items():
|
||||||
|
add_part(key, value, normal_dict)
|
||||||
|
|
||||||
|
return normal_dict
|
||||||
|
|
||||||
|
|
||||||
|
def is_list_type(value):
|
||||||
|
return isinstance(value, list) or isinstance(value, tuple) or isinstance(value, set)
|
||||||
|
|
||||||
|
def is_dict_type(value):
|
||||||
|
return isinstance(value, dict) or isinstance(value, CommentedMap)
|
||||||
|
|
||||||
|
def merge_dicts(dotdict: dict, yaml_dict: CommentedMap) -> CommentedMap:
|
||||||
|
yaml_dict: CommentedMap = deepcopy(yaml_dict)
|
||||||
|
|
||||||
|
# first deal with lists, since 'update' replaces lists from a in b, but we want to extend
|
||||||
|
def update_dict(subdict, yaml_subdict):
|
||||||
|
for key, value in yaml_subdict.items():
|
||||||
|
if not subdict.get(key):
|
||||||
|
continue
|
||||||
|
|
||||||
|
if is_dict_type(value):
|
||||||
|
update_dict(subdict[key], value)
|
||||||
|
elif is_list_type(value):
|
||||||
|
yaml_subdict[key].extend(s for s in subdict[key] if s not in yaml_subdict[key])
|
||||||
|
else:
|
||||||
|
yaml_subdict[key] = subdict[key]
|
||||||
|
|
||||||
|
update_dict(from_dot_notation(dotdict), yaml_dict)
|
||||||
|
|
||||||
process_subdict(dotdict)
|
|
||||||
return yaml_dict
|
return yaml_dict
|
||||||
|
|
||||||
def read_yaml(yaml_filename: str) -> dict:
|
yaml = YAML()
|
||||||
|
|
||||||
|
def read_yaml(yaml_filename: str) -> CommentedMap:
|
||||||
|
config = None
|
||||||
try:
|
try:
|
||||||
with open(yaml_filename, "r", encoding="utf-8") as inf:
|
with open(yaml_filename, "r", encoding="utf-8") as inf:
|
||||||
config = yaml.safe_load(inf)
|
config = yaml.load(inf)
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if not config:
|
||||||
config = EMPTY_CONFIG
|
config = EMPTY_CONFIG
|
||||||
|
|
||||||
return config
|
return config
|
||||||
|
|
||||||
def store_yaml(config: dict, yaml_filename: str):
|
def store_yaml(config: CommentedMap, yaml_filename: str):
|
||||||
with open(yaml_filename, "w", encoding="utf-8") as outf:
|
with open(yaml_filename, "w", encoding="utf-8") as outf:
|
||||||
yaml.dump(config, outf, default_flow_style=False)
|
yaml.dump(config, outf)
|
|
@ -91,7 +91,11 @@ def load_module(module: str) -> object: # TODO: change return type to Step
|
||||||
|
|
||||||
logger.info(f"Loading module '{module.display_name}'...")
|
logger.info(f"Loading module '{module.display_name}'...")
|
||||||
loaded_module = __import__(qualname)
|
loaded_module = __import__(qualname)
|
||||||
_LOADED_MODULES[module.name] = getattr(sys.modules[qualname], module.entry_point)()
|
instance = getattr(sys.modules[qualname], module.entry_point)()
|
||||||
|
if not getattr(instance, 'name', None):
|
||||||
|
instance.name = module.name
|
||||||
|
|
||||||
|
_LOADED_MODULES[module.name] = instance
|
||||||
return _LOADED_MODULES[module.name]
|
return _LOADED_MODULES[module.name]
|
||||||
|
|
||||||
|
|
||||||
|
@ -109,7 +113,7 @@ def load_manifest(module_path):
|
||||||
def get_module(module_name):
|
def get_module(module_name):
|
||||||
# get a module by name
|
# get a module by name
|
||||||
try:
|
try:
|
||||||
return available_modules(limit_to_modules=[module_name], with_manifest=True, suppress_warnings=True)[0]
|
return available_modules(limit_to_modules=[module_name], with_manifest=True)[0]
|
||||||
except IndexError:
|
except IndexError:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -149,6 +153,6 @@ def available_modules(with_manifest: bool=False, limit_to_modules: List[str]= []
|
||||||
if not suppress_warnings:
|
if not suppress_warnings:
|
||||||
for module in limit_to_modules:
|
for module in limit_to_modules:
|
||||||
if not any(module == m.name for m in all_modules):
|
if not any(module == m.name for m in all_modules):
|
||||||
logger.warning(f"Module '{module}' not found in available modules. Are you sure it's installed?")
|
logger.warning(f"Module '{module}' not found. Are you sure it's installed?")
|
||||||
|
|
||||||
return all_modules
|
return all_modules
|
|
@ -10,7 +10,7 @@ from urllib.parse import urlparse
|
||||||
from ipaddress import ip_address
|
from ipaddress import ip_address
|
||||||
import argparse
|
import argparse
|
||||||
import os
|
import os
|
||||||
from os.path import join, dirname
|
import sys
|
||||||
|
|
||||||
from rich_argparse import RichHelpFormatter
|
from rich_argparse import RichHelpFormatter
|
||||||
|
|
||||||
|
@ -27,6 +27,14 @@ from loguru import logger
|
||||||
|
|
||||||
DEFAULT_CONFIG_FILE = "orchestration.yaml"
|
DEFAULT_CONFIG_FILE = "orchestration.yaml"
|
||||||
|
|
||||||
|
class UniqueAppendAction(argparse.Action):
|
||||||
|
def __call__(self, parser, namespace, values, option_string=None):
|
||||||
|
if not hasattr(namespace, self.dest):
|
||||||
|
setattr(namespace, self.dest, [])
|
||||||
|
for value in values:
|
||||||
|
if value not in getattr(namespace, self.dest):
|
||||||
|
getattr(namespace, self.dest).append(value)
|
||||||
|
|
||||||
class ArchivingOrchestrator:
|
class ArchivingOrchestrator:
|
||||||
|
|
||||||
# def __init__(self, config: Config) -> None:
|
# def __init__(self, config: Config) -> None:
|
||||||
|
@ -59,20 +67,22 @@ class ArchivingOrchestrator:
|
||||||
parser.add_argument('--mode', action='store', dest='mode', type=str, choices=['simple', 'full'], help='the mode to run the archiver in', default='simple')
|
parser.add_argument('--mode', action='store', dest='mode', type=str, choices=['simple', 'full'], help='the mode to run the archiver in', default='simple')
|
||||||
# override the default 'help' so we can inject all the configs and show those
|
# override the default 'help' so we can inject all the configs and show those
|
||||||
parser.add_argument('-h', '--help', action='store_true', dest='help', help='show this help message and exit')
|
parser.add_argument('-h', '--help', action='store_true', dest='help', help='show this help message and exit')
|
||||||
parser.add_argument('-s', '--store', action='store_true', dest='store', help='Store the created config in the config file')
|
parser.add_argument('-s', '--store', dest='store', default=True, help='Store the created config in the config file', action=argparse.BooleanOptionalAction)
|
||||||
|
|
||||||
self.basic_parser = parser
|
self.basic_parser = parser
|
||||||
|
|
||||||
def setup_complete_parser(self, basic_config: dict, yaml_config: dict, unused_args: list[str]) -> None:
|
def setup_complete_parser(self, basic_config: dict, yaml_config: dict, unused_args: list[str]) -> None:
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
parents = [self.basic_parser],
|
|
||||||
add_help=False,
|
add_help=False,
|
||||||
)
|
)
|
||||||
self.add_steps_args(parser)
|
self.add_additional_args(parser)
|
||||||
|
|
||||||
# check what mode we're in
|
# check what mode we're in
|
||||||
# if we have a config file, use that to decide which modules to load
|
# if we have a config file, use that to decide which modules to load
|
||||||
# if simple, we'll load just the modules that has requires_setup = False
|
# if simple, we'll load just the modules that has requires_setup = False
|
||||||
# if full, we'll load all modules
|
# if full, we'll load all modules
|
||||||
|
# TODO: BUG** - basic_config won't have steps in it, since these args aren't added to 'basic_parser'
|
||||||
|
# but should we add them? Or should we just add them to the 'complete' parser?
|
||||||
if yaml_config != EMPTY_CONFIG:
|
if yaml_config != EMPTY_CONFIG:
|
||||||
# only load the modules enabled in config
|
# only load the modules enabled in config
|
||||||
# TODO: if some steps are empty (e.g. 'feeders' is empty), should we default to the 'simple' ones? Or only if they are ALL empty?
|
# TODO: if some steps are empty (e.g. 'feeders' is empty), should we default to the 'simple' ones? Or only if they are ALL empty?
|
||||||
|
@ -85,7 +95,7 @@ class ArchivingOrchestrator:
|
||||||
if modules := getattr(basic_config, f"{module_type}s", []):
|
if modules := getattr(basic_config, f"{module_type}s", []):
|
||||||
enabled_modules.extend(modules)
|
enabled_modules.extend(modules)
|
||||||
|
|
||||||
self.add_module_args(available_modules(with_manifest=True, limit_to_modules=set(enabled_modules)), parser)
|
self.add_module_args(available_modules(with_manifest=True, limit_to_modules=set(enabled_modules), suppress_warnings=True), parser)
|
||||||
elif basic_config.mode == 'simple':
|
elif basic_config.mode == 'simple':
|
||||||
simple_modules = [module for module in available_modules(with_manifest=True) if not module.requires_setup]
|
simple_modules = [module for module in available_modules(with_manifest=True) if not module.requires_setup]
|
||||||
self.add_module_args(simple_modules, parser)
|
self.add_module_args(simple_modules, parser)
|
||||||
|
@ -97,36 +107,45 @@ class ArchivingOrchestrator:
|
||||||
# load all modules, they're not using the 'simple' mode
|
# load all modules, they're not using the 'simple' mode
|
||||||
self.add_module_args(available_modules(with_manifest=True), parser)
|
self.add_module_args(available_modules(with_manifest=True), parser)
|
||||||
|
|
||||||
|
|
||||||
breakpoint()
|
|
||||||
parser.set_defaults(**to_dot_notation(yaml_config))
|
parser.set_defaults(**to_dot_notation(yaml_config))
|
||||||
|
|
||||||
|
breakpoint()
|
||||||
# reload the parser with the new arguments, now that we have them
|
# reload the parser with the new arguments, now that we have them
|
||||||
parsed, unknown = parser.parse_known_args(unused_args)
|
parsed, unknown = parser.parse_known_args(unused_args)
|
||||||
|
|
||||||
|
# merge the new config with the old one
|
||||||
|
self.config = merge_dicts(vars(parsed), yaml_config)
|
||||||
|
# clean out args from the base_parser that we don't want in the config
|
||||||
|
for key in vars(basic_config):
|
||||||
|
self.config.pop(key, None)
|
||||||
|
|
||||||
|
# setup the logging
|
||||||
|
self.setup_logging()
|
||||||
|
|
||||||
if unknown:
|
if unknown:
|
||||||
logger.warning(f"Ignoring unknown/unused arguments: {unknown}\nPerhaps you don't have this module enabled?")
|
logger.warning(f"Ignoring unknown/unused arguments: {unknown}\nPerhaps you don't have this module enabled?")
|
||||||
|
|
||||||
# merge the new config with the old one
|
if (self.config != yaml_config and basic_config.store) or not os.path.isfile(basic_config.config_file):
|
||||||
merged_yaml_config = merge_dicts(vars(parsed), yaml_config)
|
|
||||||
|
|
||||||
if (merged_yaml_config != yaml_config and basic_config.store) or not os.path.isfile(basic_config.config_file):
|
|
||||||
logger.info(f"Storing configuration file to {basic_config.config_file}")
|
logger.info(f"Storing configuration file to {basic_config.config_file}")
|
||||||
store_yaml(yaml_config, basic_config.config_file)
|
store_yaml(self.config, basic_config.config_file)
|
||||||
|
|
||||||
self.config = merged_yaml_config
|
|
||||||
|
|
||||||
return self.config
|
return self.config
|
||||||
|
|
||||||
def add_steps_args(self, parser: argparse.ArgumentParser = None):
|
def add_additional_args(self, parser: argparse.ArgumentParser = None):
|
||||||
if not parser:
|
if not parser:
|
||||||
parser = self.parser
|
parser = self.parser
|
||||||
|
|
||||||
parser.add_argument('--feeders', action='store', dest='steps.feeders', nargs='+', help='the feeders to use')
|
parser.add_argument('--feeders', dest='steps.feeders', nargs='+', help='the feeders to use', action=UniqueAppendAction)
|
||||||
parser.add_argument('--enrichers', action='store', dest='steps.enrichers', nargs='+', help='the enrichers to use')
|
parser.add_argument('--enrichers', dest='steps.enrichers', nargs='+', help='the enrichers to use', action=UniqueAppendAction)
|
||||||
parser.add_argument('--extractors', action='store', dest='steps.extractors', nargs='+', help='the extractors to use')
|
parser.add_argument('--extractors', dest='steps.extractors', nargs='+', help='the extractors to use', action=UniqueAppendAction)
|
||||||
parser.add_argument('--databases', action='store', dest='steps.databases', nargs='+', help='the databases to use')
|
parser.add_argument('--databases', dest='steps.databases', nargs='+', help='the databases to use', action=UniqueAppendAction)
|
||||||
parser.add_argument('--storages', action='store', dest='steps.storages', nargs='+', help='the storages to use')
|
parser.add_argument('--storages', dest='steps.storages', nargs='+', help='the storages to use', action=UniqueAppendAction)
|
||||||
parser.add_argument('--formatters', action='store', dest='steps.formatters', nargs='+', help='the formatter to use')
|
parser.add_argument('--formatters', dest='steps.formatters', nargs='+', help='the formatter to use', action=UniqueAppendAction)
|
||||||
|
|
||||||
|
# logging arguments
|
||||||
|
parser.add_argument('--logging.level', action='store', dest='logging.level', choices=['INFO', 'DEBUG', 'ERROR', 'WARNING'], help='the logging level to use', default='INFO')
|
||||||
|
parser.add_argument('--logging.file', action='store', dest='logging.file', help='the logging file to write to', default=None)
|
||||||
|
parser.add_argument('--logging.rotation', action='store', dest='logging.rotation', help='the logging rotation to use', default=None)
|
||||||
|
|
||||||
def add_module_args(self, modules: list[Module] = None, parser: argparse.ArgumentParser = None):
|
def add_module_args(self, modules: list[Module] = None, parser: argparse.ArgumentParser = None):
|
||||||
|
|
||||||
|
@ -152,20 +171,29 @@ class ArchivingOrchestrator:
|
||||||
# for the help message, we want to load *all* possible modules and show the help
|
# for the help message, we want to load *all* possible modules and show the help
|
||||||
# add configs as arg parser arguments
|
# add configs as arg parser arguments
|
||||||
|
|
||||||
self.add_steps_args(self.basic_parser)
|
self.add_additional_args(self.basic_parser)
|
||||||
self.add_module_args(parser=self.basic_parser)
|
self.add_module_args(parser=self.basic_parser)
|
||||||
|
|
||||||
self.basic_parser.print_help()
|
self.basic_parser.print_help()
|
||||||
exit()
|
exit()
|
||||||
|
|
||||||
|
def setup_logging(self):
|
||||||
|
# setup loguru logging
|
||||||
|
logger.remove() # remove the default logger
|
||||||
|
|
||||||
|
logging_config = self.config['logging']
|
||||||
|
logger.add(sys.stderr, level=logging_config['level'])
|
||||||
|
if log_file := logging_config['file']:
|
||||||
|
logger.add(log_file, rotation=logging_config['logging.rotation'])
|
||||||
|
|
||||||
|
|
||||||
def install_modules(self):
|
def install_modules(self):
|
||||||
"""
|
"""
|
||||||
Swaps out the previous 'strings' in the config with the actual modules
|
Swaps out the previous 'strings' in the config with the actual modules
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
invalid_modules = []
|
||||||
for module_type in MODULE_TYPES:
|
for module_type in MODULE_TYPES:
|
||||||
if module_type == 'enricher':
|
|
||||||
breakpoint()
|
|
||||||
step_items = []
|
step_items = []
|
||||||
modules_to_load = self.config['steps'][f"{module_type}s"]
|
modules_to_load = self.config['steps'][f"{module_type}s"]
|
||||||
|
|
||||||
|
@ -179,7 +207,12 @@ class ArchivingOrchestrator:
|
||||||
exit()
|
exit()
|
||||||
|
|
||||||
for i, module in enumerate(modules_to_load):
|
for i, module in enumerate(modules_to_load):
|
||||||
|
if module in invalid_modules:
|
||||||
|
continue
|
||||||
loaded_module = load_module(module)
|
loaded_module = load_module(module)
|
||||||
|
if not loaded_module:
|
||||||
|
invalid_modules.append(module)
|
||||||
|
continue
|
||||||
if loaded_module:
|
if loaded_module:
|
||||||
step_items.append(loaded_module)
|
step_items.append(loaded_module)
|
||||||
check_steps_ok()
|
check_steps_ok()
|
||||||
|
@ -228,7 +261,6 @@ class ArchivingOrchestrator:
|
||||||
def cleanup(self)->None:
|
def cleanup(self)->None:
|
||||||
logger.info("Cleaning up")
|
logger.info("Cleaning up")
|
||||||
for e in self.config['steps']['extractors']:
|
for e in self.config['steps']['extractors']:
|
||||||
breakpoint()
|
|
||||||
e.cleanup()
|
e.cleanup()
|
||||||
|
|
||||||
def feed(self) -> Generator[Metadata]:
|
def feed(self) -> Generator[Metadata]:
|
||||||
|
|
|
@ -7,5 +7,5 @@ by handling user configuration, validating the steps properties, and implementin
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
class Step:
|
class Step:
|
||||||
# TODO: try and get this name from the manifest, so we don't have to set it twice
|
# Nothing to see here :)
|
||||||
name: str
|
pass
|
|
@ -0,0 +1,99 @@
|
||||||
|
import pytest
|
||||||
|
from auto_archiver.core import config
|
||||||
|
from ruamel.yaml.scanner import ScannerError
|
||||||
|
from ruamel.yaml.comments import CommentedMap
|
||||||
|
|
||||||
|
def test_return_default_config_for_nonexistent_file():
|
||||||
|
assert config.read_yaml("nonexistent_file.yaml") == config.EMPTY_CONFIG
|
||||||
|
|
||||||
|
def test_return_default_config_for_empty_file(tmp_path):
|
||||||
|
empty_file = tmp_path / "empty_file.yaml"
|
||||||
|
empty_file.write_text("")
|
||||||
|
assert config.read_yaml(empty_file) == config.EMPTY_CONFIG
|
||||||
|
|
||||||
|
def test_raise_error_on_invalid_yaml(tmp_path):
|
||||||
|
invalid_yaml = tmp_path / "invalid_yaml.yaml"
|
||||||
|
invalid_yaml.write_text("key: \"value_without_end_quote")
|
||||||
|
# make sure it raises ScannerError
|
||||||
|
with pytest.raises(ScannerError):
|
||||||
|
config.read_yaml(invalid_yaml)
|
||||||
|
|
||||||
|
def test_write_yaml(tmp_path):
|
||||||
|
yaml_file = tmp_path / "write_yaml.yaml"
|
||||||
|
config.store_yaml(config.EMPTY_CONFIG, yaml_file.as_posix())
|
||||||
|
assert "steps:\n" in yaml_file.read_text()
|
||||||
|
|
||||||
|
def test_round_trip_comments(tmp_path):
|
||||||
|
yaml_file = tmp_path / "round_trip_comments.yaml"
|
||||||
|
|
||||||
|
with open(yaml_file, "w") as f:
|
||||||
|
f.write("generic_extractor:\n facebook_cookie: abc # end of line comment\n subtitles: true\n # comments: false\n # livestreams: false\n list_type:\n - value1\n - value2")
|
||||||
|
|
||||||
|
loaded = config.read_yaml(yaml_file)
|
||||||
|
# check the comments are preserved
|
||||||
|
assert loaded['generic_extractor']['facebook_cookie'] == "abc"
|
||||||
|
assert loaded['generic_extractor'].ca.items['facebook_cookie'][2].value == "# end of line comment\n"
|
||||||
|
|
||||||
|
# add some more items to my_settings
|
||||||
|
loaded['generic_extractor']['list_type'].append("bellingcat")
|
||||||
|
config.store_yaml(loaded, yaml_file.as_posix())
|
||||||
|
|
||||||
|
assert "# comments: false" in yaml_file.read_text()
|
||||||
|
assert "facebook_cookie: abc # end of line comment" in yaml_file.read_text()
|
||||||
|
assert "abc # end of line comment" in yaml_file.read_text()
|
||||||
|
assert "- value2\n - bellingcat" in yaml_file.read_text()
|
||||||
|
|
||||||
|
def test_merge_dicts():
|
||||||
|
yaml_dict = config.EMPTY_CONFIG
|
||||||
|
yaml_dict['settings'] = CommentedMap(**{
|
||||||
|
"key1": ["a"],
|
||||||
|
"key2": "old_value",
|
||||||
|
"key3": ["a", "b", "c"],
|
||||||
|
})
|
||||||
|
|
||||||
|
dotdict = {
|
||||||
|
"settings.key1": ["b", "c"],
|
||||||
|
"settings.key2": "new_value",
|
||||||
|
"settings.key3": ["b", "c", "d"],
|
||||||
|
}
|
||||||
|
merged = config.merge_dicts(dotdict, yaml_dict)
|
||||||
|
assert merged["settings"]["key1"] == ["a", "b", "c"]
|
||||||
|
assert merged["settings"]["key2"] == "new_value"
|
||||||
|
assert merged["settings"]["key3"] == ["a", "b", "c", "d"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_types():
|
||||||
|
assert config.is_list_type([]) == True
|
||||||
|
assert config.is_list_type(()) == True
|
||||||
|
assert config.is_list_type(set()) == True
|
||||||
|
assert config.is_list_type({}) == False
|
||||||
|
assert config.is_list_type("") == False
|
||||||
|
assert config.is_dict_type({}) == True
|
||||||
|
assert config.is_dict_type(CommentedMap()) == True
|
||||||
|
assert config.is_dict_type([]) == False
|
||||||
|
assert config.is_dict_type("") == False
|
||||||
|
|
||||||
|
def test_from_dot_notation():
|
||||||
|
dotdict = {
|
||||||
|
"settings.key1": ["a", "b", "c"],
|
||||||
|
"settings.key2": "new_value",
|
||||||
|
"settings.key3.key4": "value",
|
||||||
|
}
|
||||||
|
normal_dict = config.from_dot_notation(dotdict)
|
||||||
|
assert normal_dict["settings"]["key1"] == ["a", "b", "c"]
|
||||||
|
assert normal_dict["settings"]["key2"] == "new_value"
|
||||||
|
assert normal_dict["settings"]["key3"]["key4"] == "value"
|
||||||
|
|
||||||
|
def test_to_dot_notation():
|
||||||
|
yaml_dict = config.EMPTY_CONFIG
|
||||||
|
yaml_dict['settings'] = {
|
||||||
|
"key1": ["a", "b", "c"],
|
||||||
|
"key2": "new_value",
|
||||||
|
"key3": {
|
||||||
|
"key4": "value",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dotdict = config.to_dot_notation(yaml_dict)
|
||||||
|
assert dotdict["settings.key1"] == ["a", "b", "c"]
|
||||||
|
assert dotdict["settings.key2"] == "new_value"
|
||||||
|
assert dotdict["settings.key3.key4"] == "value"
|
Ładowanie…
Reference in New Issue