kopia lustrzana https://github.com/bellingcat/auto-archiver
Linting tests
rodzic
79f576be1d
commit
753c3c6214
|
@ -52,7 +52,7 @@ def generate_module_docs():
|
|||
for type in manifest["type"]:
|
||||
modules_by_type.setdefault(type, []).append(module)
|
||||
|
||||
description = "\n".join(l.lstrip() for l in manifest["description"].split("\n"))
|
||||
description = "\n".join(line.lstrip() for line in manifest["description"].split("\n"))
|
||||
types = ", ".join(type_color[t] for t in manifest["type"])
|
||||
readme_str = f"""
|
||||
# {manifest["name"]}
|
||||
|
|
|
@ -51,6 +51,7 @@ The invocations below will run the auto-archiver Docker image using a configurat
|
|||
docker run --rm -v $PWD/secrets:/app/secrets -v $PWD/local_archive:/app/local_archive bellingcat/auto-archiver
|
||||
|
||||
# uses the same configuration, but with the `gsheet_feeder`, a header on row 2 and with some different column names
|
||||
# Note this expects you to have followed the [Google Sheets setup](how_to/google_sheets.md) and added your service_account.json to the `secrets/` folder
|
||||
# notice that columns is a dictionary so you need to pass it as JSON and it will override only the values provided
|
||||
docker run --rm -v $PWD/secrets:/app/secrets -v $PWD/local_archive:/app/local_archive bellingcat/auto-archiver --feeders=gsheet_feeder --gsheet_feeder.sheet="use it on another sheets doc" --gsheet_feeder.header=2 --gsheet_feeder.columns='{"url": "link"}'
|
||||
# Runs auto-archiver for the first time, but in 'full' mode, enabling all modules to get a full settings file
|
||||
|
|
|
@ -96,7 +96,7 @@ markers = [
|
|||
#exclude = ["docs"]
|
||||
line-length = 120
|
||||
# Remove this for a more detailed lint report
|
||||
output-format = "concise"
|
||||
#output-format = "concise"
|
||||
|
||||
|
||||
[tool.ruff.lint]
|
||||
|
@ -104,7 +104,7 @@ output-format = "concise"
|
|||
# I : isort
|
||||
# UP : upgrade, e.g. use fstrings
|
||||
# ANN : annotations
|
||||
#extend-select = ["B"]
|
||||
extend-select = ["B"]
|
||||
|
||||
# Ignore unused imports as some are currently required for lazy loading
|
||||
# This can be removed for a `lint check` run which is manually reviewed
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import os.path
|
||||
import click, json
|
||||
import click
|
||||
import json
|
||||
|
||||
from google.auth.transport.requests import Request
|
||||
from google.oauth2.credentials import Credentials
|
||||
|
|
|
@ -14,7 +14,7 @@ class TiktokTikwmExtractor(Extractor):
|
|||
"""
|
||||
TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}"
|
||||
|
||||
def download(self, item: Metadata) -> Metadata:
|
||||
def download(self, item: Metadata) -> bool | Metadata:
|
||||
url = item.get_url()
|
||||
|
||||
if not re.match(TikTokIE._VALID_URL, url):
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import datetime
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import pytest
|
||||
|
|
|
@ -15,9 +15,9 @@ def mock_selenium_env(mocker):
|
|||
mock_which = mocker.patch("shutil.which")
|
||||
mock_driver_class = mocker.patch("auto_archiver.utils.webdriver.CookieSettingDriver")
|
||||
mock_binary_paths = mocker.patch("selenium.webdriver.common.selenium_manager.SeleniumManager.binary_paths")
|
||||
mock_is_file = mocker.patch("pathlib.Path.is_file", return_value=True)
|
||||
mocker.patch("pathlib.Path.is_file", return_value=True)
|
||||
mock_popen = mocker.patch("subprocess.Popen")
|
||||
mock_is_connectable = mocker.patch("selenium.webdriver.common.service.Service.is_connectable", return_value=True)
|
||||
mocker.patch("selenium.webdriver.common.service.Service.is_connectable", return_value=True)
|
||||
mock_firefox_options = mocker.patch("selenium.webdriver.FirefoxOptions")
|
||||
|
||||
# Define side effect for `shutil.which`
|
||||
|
@ -157,13 +157,12 @@ def test_pdf_creation(mocker, screenshot_enricher, metadata_with_video, mock_sel
|
|||
# Mock the print_page method to return base64-encoded content
|
||||
mock_driver.print_page.return_value = base64.b64encode(b"fake_pdf_content").decode("utf-8")
|
||||
# Patch functions with mocker
|
||||
mock_os_path_join = mocker.patch("os.path.join", side_effect=lambda *args: f"{args[-1]}")
|
||||
mock_random_str = mocker.patch(
|
||||
mocker.patch("os.path.join", side_effect=lambda *args: f"{args[-1]}")
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.screenshot_enricher.screenshot_enricher.random_str",
|
||||
return_value="fixed123",
|
||||
)
|
||||
mock_open = mocker.patch("builtins.open", new_callable=mocker.mock_open)
|
||||
mock_log_error = mocker.patch("loguru.logger.error")
|
||||
|
||||
screenshot_enricher.enrich(metadata_with_video)
|
||||
# Verify screenshot and PDF creation
|
||||
|
|
|
@ -39,7 +39,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
|
|||
mock_get, mock_logger = self.get_mockers(mocker)
|
||||
if valid_url:
|
||||
mock_get.return_value.status_code = 404
|
||||
assert self.extractor.download(make_item(url)) == False
|
||||
assert self.extractor.download(make_item(url)) is False
|
||||
assert mock_get.call_count == int(valid_url)
|
||||
assert mock_logger.error.call_count == int(valid_url)
|
||||
|
||||
|
@ -47,7 +47,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
|
|||
mock_get, mock_logger = self.get_mockers(mocker)
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.json.side_effect = ValueError
|
||||
assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) == False
|
||||
assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) is False
|
||||
mock_get.assert_called_once()
|
||||
mock_get.return_value.json.assert_called_once()
|
||||
mock_logger.error.assert_called_once()
|
||||
|
@ -68,7 +68,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
|
|||
mock_get, mock_logger = self.get_mockers(mocker)
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.json.return_value = response
|
||||
assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) == False
|
||||
assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) is False
|
||||
mock_get.assert_called_once()
|
||||
mock_get.return_value.json.assert_called_once()
|
||||
mock_logger.error.assert_called_once()
|
||||
|
@ -86,7 +86,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
|
|||
|
||||
result = self.extractor.download(make_item(self.VALID_EXAMPLE_URL))
|
||||
if not has_vid:
|
||||
assert result == False
|
||||
assert result is False
|
||||
else:
|
||||
assert result.is_success()
|
||||
assert len(result.media) == 1
|
||||
|
@ -99,7 +99,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
|
|||
else:
|
||||
mock_logger.error.assert_not_called()
|
||||
|
||||
def test_correct_extraction(self, mocker, make_item):
|
||||
def test_correct_data_extracted(self, mocker, make_item):
|
||||
mock_get, _ = self.get_mockers(mocker)
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.json.return_value = {"msg": "success", "data": {
|
||||
|
|
|
@ -172,10 +172,10 @@ def test_should_process_sheet(setup_module, mocker):
|
|||
"block_worksheets": {"Sheet3"},
|
||||
},
|
||||
)
|
||||
assert gdb.should_process_sheet("TestSheet") == True
|
||||
assert gdb.should_process_sheet("Sheet3") == False
|
||||
assert gdb.should_process_sheet("TestSheet") is True
|
||||
assert gdb.should_process_sheet("Sheet3") is False
|
||||
# False if allow_worksheets is set
|
||||
assert gdb.should_process_sheet("AnotherSheet") == False
|
||||
assert gdb.should_process_sheet("AnotherSheet") is False
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Requires a real connection")
|
||||
|
|
|
@ -61,7 +61,7 @@ class TestS3Storage:
|
|||
media = Media("test.txt")
|
||||
assert self.storage.is_upload_needed(media) is True
|
||||
self.storage.random_no_duplicate = True
|
||||
mock_calc_hash = mocker.patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash', return_value='beepboop123beepboop123beepboop123')
|
||||
mocker.patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash', return_value='beepboop123beepboop123beepboop123')
|
||||
mock_file_in_folder = mocker.patch.object(self.storage, 'file_in_folder', return_value='existing_key.txt')
|
||||
assert self.storage.is_upload_needed(media) is False
|
||||
assert media.key == 'existing_key.txt'
|
||||
|
@ -70,10 +70,10 @@ class TestS3Storage:
|
|||
def test_skips_upload_when_duplicate_exists(self, mocker):
|
||||
"""Test that upload skips when file_in_folder finds existing object"""
|
||||
self.storage.random_no_duplicate = True
|
||||
mock_file_in_folder = mocker.patch.object(S3Storage, 'file_in_folder', return_value="existing_folder/existing_file.txt")
|
||||
mocker.patch.object(S3Storage, 'file_in_folder', return_value="existing_folder/existing_file.txt")
|
||||
media = Media("test.txt")
|
||||
media._key = "original_path.txt"
|
||||
mock_calculate_hash = mocker.patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash', return_value="beepboop123beepboop123beepboop123")
|
||||
mocker.patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash', return_value="beepboop123beepboop123beepboop123")
|
||||
assert self.storage.is_upload_needed(media) is False
|
||||
assert media.key == "existing_folder/existing_file.txt"
|
||||
assert media.get("previously archived") is True
|
||||
|
@ -101,5 +101,5 @@ class TestS3Storage:
|
|||
)
|
||||
|
||||
def test_file_in_folder_exists(self, mocker):
|
||||
mock_list_objects = mocker.patch.object(self.storage.s3, 'list_objects', return_value={'Contents': [{'Key': 'path/to/file.txt'}]})
|
||||
mocker.patch.object(self.storage.s3, 'list_objects', return_value={'Contents': [{'Key': 'path/to/file.txt'}]})
|
||||
assert self.storage.file_in_folder('path/to/') == 'path/to/file.txt'
|
||||
|
|
|
@ -94,7 +94,6 @@ def test_upload_not_uploaded(tmp_path, atlos_storage: AtlosStorage, metadata: Me
|
|||
call_args = post_mock.call_args[0]
|
||||
assert call_args[0] == expected_endpoint
|
||||
call_kwargs = post_mock.call_args[1]
|
||||
expected_headers = {"Authorization": f"Bearer {atlos_storage.api_token}"}
|
||||
expected_params = {"title": media.properties}
|
||||
assert call_kwargs["params"] == expected_params
|
||||
file_tuple = call_kwargs["files"]["file"]
|
||||
|
|
|
@ -9,9 +9,8 @@ from tests.storages.test_storage_base import TestStorageBase
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def gdrive_storage(setup_module, mocker):
|
||||
def gdrive_storage(setup_module, mocker) -> GDriveStorage:
|
||||
module_name: str = "gdrive_storage"
|
||||
storage: GDriveStorage
|
||||
config: dict = {
|
||||
"path_generator": "url",
|
||||
"filename_generator": "static",
|
||||
|
|
|
@ -77,15 +77,15 @@ def test_merge_dicts():
|
|||
|
||||
|
||||
def test_check_types():
|
||||
assert config.is_list_type([]) == True
|
||||
assert config.is_list_type(()) == True
|
||||
assert config.is_list_type(set()) == True
|
||||
assert config.is_list_type({}) == False
|
||||
assert config.is_list_type("") == False
|
||||
assert config.is_dict_type({}) == True
|
||||
assert config.is_dict_type(CommentedMap()) == True
|
||||
assert config.is_dict_type([]) == False
|
||||
assert config.is_dict_type("") == False
|
||||
assert config.is_list_type([]) is True
|
||||
assert config.is_list_type(()) is True
|
||||
assert config.is_list_type(set()) is True
|
||||
assert config.is_list_type({}) is False
|
||||
assert config.is_list_type("") is False
|
||||
assert config.is_dict_type({}) is True
|
||||
assert config.is_dict_type(CommentedMap()) is True
|
||||
assert config.is_dict_type([]) is False
|
||||
assert config.is_dict_type("") is False
|
||||
|
||||
|
||||
def test_from_dot_notation():
|
||||
|
|
|
@ -9,10 +9,8 @@ def example_module():
|
|||
import auto_archiver
|
||||
|
||||
module_factory = ModuleFactory()
|
||||
|
||||
previous_path = auto_archiver.modules.__path__
|
||||
# previous_path = auto_archiver.modules.__path__
|
||||
auto_archiver.modules.__path__.append("tests/data/test_modules/")
|
||||
|
||||
return module_factory.get_module_lazy("example_module")
|
||||
|
||||
|
||||
|
@ -84,6 +82,8 @@ def test_load_modules(module_name):
|
|||
# check that default settings are applied
|
||||
default_config = module.configs
|
||||
assert loaded_module.name in loaded_module.config.keys()
|
||||
defaults = {k: v.get("default") for k, v in default_config.items()}
|
||||
assert loaded_module.config[module_name] == defaults
|
||||
|
||||
|
||||
@pytest.mark.parametrize("module_name", ["local_storage", "generic_extractor", "html_formatter", "csv_db"])
|
||||
|
|
|
@ -67,7 +67,7 @@ def test_version(basic_parser, capsys):
|
|||
|
||||
def test_help(orchestrator, basic_parser, capsys):
|
||||
args = basic_parser.parse_args(["--help"])
|
||||
assert args.help == True
|
||||
assert args.help is True
|
||||
|
||||
# test the show_help() on orchestrator
|
||||
with pytest.raises(SystemExit) as exit_error:
|
||||
|
@ -116,8 +116,8 @@ def test_check_required_values(orchestrator, caplog, test_args):
|
|||
# drop the example_module.required_field from the test_args
|
||||
test_args = test_args[:-2]
|
||||
|
||||
with pytest.raises(SystemExit) as exit_error:
|
||||
config = orchestrator.setup_config(test_args)
|
||||
with pytest.raises(SystemExit):
|
||||
orchestrator.setup_config(test_args)
|
||||
|
||||
assert caplog.records[1].message == "the following arguments are required: --example_module.required_field"
|
||||
|
||||
|
@ -212,7 +212,7 @@ def test_multiple_orchestrator(test_args):
|
|||
]
|
||||
o1 = ArchivingOrchestrator()
|
||||
|
||||
with pytest.raises(ValueError) as exit_error:
|
||||
with pytest.raises(ValueError):
|
||||
# this should fail because the gsheet_feeder_db requires a sheet_id / sheet
|
||||
o1.setup(o1_args)
|
||||
|
||||
|
|
Ładowanie…
Reference in New Issue