kopia lustrzana https://github.com/bellingcat/auto-archiver
Fix up unit tests for new structure
rodzic
9635449ac0
commit
7a4871db6b
|
@ -51,7 +51,7 @@ class BaseModule(ABC):
|
|||
for key, val in config.get(self.name, {}).items():
|
||||
setattr(self, key, val)
|
||||
|
||||
def get_module(module_name: str, additional_paths: List[str] = []):
|
||||
def get_module(module_name: str, additional_paths: List[str] = []) -> LazyBaseModule:
|
||||
if module_name in _LAZY_LOADED_MODULES:
|
||||
return _LAZY_LOADED_MODULES[module_name]
|
||||
|
||||
|
@ -119,19 +119,19 @@ class LazyBaseModule:
|
|||
return self._entry_point
|
||||
|
||||
@property
|
||||
def dependencies(self):
|
||||
def dependencies(self) -> dict:
|
||||
return self.manifest['dependencies']
|
||||
|
||||
@property
|
||||
def configs(self):
|
||||
def configs(self) -> dict:
|
||||
return self.manifest['configs']
|
||||
|
||||
@property
|
||||
def requires_setup(self):
|
||||
def requires_setup(self) -> bool:
|
||||
return self.manifest['requires_setup']
|
||||
|
||||
@property
|
||||
def manifest(self):
|
||||
def manifest(self) -> dict:
|
||||
if self._manifest:
|
||||
return self._manifest
|
||||
# print(f"Loading manifest for module {module_path}")
|
||||
|
@ -149,10 +149,11 @@ class LazyBaseModule:
|
|||
self.type = manifest['type']
|
||||
self._entry_point = manifest['entry_point']
|
||||
self.description = manifest['description']
|
||||
self.version = manifest['version']
|
||||
|
||||
return manifest
|
||||
|
||||
def load(self):
|
||||
def load(self) -> BaseModule:
|
||||
|
||||
if self._instance:
|
||||
return self._instance
|
||||
|
|
|
@ -172,7 +172,6 @@ class GenericExtractor(Extractor):
|
|||
return self.add_metadata(data, info_extractor, url, result)
|
||||
|
||||
def dropin_for_name(self, dropin_name: str, additional_paths = [], package=__package__) -> Type[InfoExtractor]:
|
||||
|
||||
dropin_name = dropin_name.lower()
|
||||
|
||||
if dropin_name == "generic":
|
||||
|
|
|
@ -14,21 +14,16 @@ from auto_archiver.core import Metadata,Media
|
|||
class TwitterApiExtractor(Extractor):
|
||||
link_pattern = re.compile(r"(?:twitter|x).com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)")
|
||||
|
||||
def __init__(self, config: dict) -> None:
|
||||
super().__init__(config)
|
||||
def setup(self, config: dict) -> None:
|
||||
super().setup(config)
|
||||
|
||||
self.api_index = 0
|
||||
self.apis = []
|
||||
if len(self.bearer_tokens):
|
||||
self.apis.extend([Api(bearer_token=bearer_token) for bearer_token in self.bearer_tokens])
|
||||
if self.bearer_token:
|
||||
self.assert_valid_string("bearer_token")
|
||||
self.apis.append(Api(bearer_token=self.bearer_token))
|
||||
if self.consumer_key and self.consumer_secret and self.access_token and self.access_secret:
|
||||
self.assert_valid_string("consumer_key")
|
||||
self.assert_valid_string("consumer_secret")
|
||||
self.assert_valid_string("access_token")
|
||||
self.assert_valid_string("access_secret")
|
||||
self.apis.append(Api(consumer_key=self.consumer_key, consumer_secret=self.consumer_secret,
|
||||
access_token=self.access_token, access_secret=self.access_secret))
|
||||
assert self.api_client is not None, "Missing Twitter API configurations, please provide either AND/OR (consumer_key, consumer_secret, access_token, access_secret) to use this archiver, you can provide both for better rate-limit results."
|
||||
|
|
|
@ -3,9 +3,10 @@ pytest conftest file, for shared fixtures and configuration
|
|||
"""
|
||||
|
||||
from typing import Dict, Tuple
|
||||
|
||||
import hashlib
|
||||
import pytest
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
from auto_archiver.core.module import get_module, _LAZY_LOADED_MODULES
|
||||
|
||||
# Test names inserted into this list will be run last. This is useful for expensive/costly tests
|
||||
# that you only want to run if everything else succeeds (e.g. API calls). The order here is important
|
||||
|
@ -13,6 +14,36 @@ from auto_archiver.core.metadata import Metadata
|
|||
# format is the name of the module (python file) without the .py extension
|
||||
TESTS_TO_RUN_LAST = ['test_twitter_api_archiver']
|
||||
|
||||
@pytest.fixture
|
||||
def setup_module(request):
|
||||
def _setup_module(module_name, config={}):
|
||||
|
||||
if isinstance(module_name, type):
|
||||
# get the module name:
|
||||
# if the class does not have a .name, use the name of the parent folder
|
||||
module_name = module_name.__module__.rsplit(".",2)[-2]
|
||||
|
||||
m = get_module(module_name).load()
|
||||
m.name = module_name
|
||||
m.setup({module_name : config})
|
||||
|
||||
|
||||
def cleanup():
|
||||
_LAZY_LOADED_MODULES.pop(module_name)
|
||||
request.addfinalizer(cleanup)
|
||||
|
||||
return m
|
||||
|
||||
return _setup_module
|
||||
|
||||
@pytest.fixture
|
||||
def check_hash():
|
||||
def _check_hash(filename: str, hash: str):
|
||||
with open(filename, "rb") as f:
|
||||
buf = f.read()
|
||||
assert hash == hashlib.sha256(buf).hexdigest()
|
||||
|
||||
return _check_hash
|
||||
|
||||
@pytest.fixture
|
||||
def make_item():
|
||||
|
|
|
@ -3,13 +3,11 @@ from auto_archiver.modules.csv_db import CSVDb
|
|||
from auto_archiver.core import Metadata
|
||||
|
||||
|
||||
def test_store_item(tmp_path):
|
||||
def test_store_item(tmp_path, setup_module):
|
||||
"""Tests storing an item in the CSV database"""
|
||||
|
||||
temp_db = tmp_path / "temp_db.csv"
|
||||
db = CSVDb({
|
||||
"csv_db": {"csv_file": temp_db.as_posix()}
|
||||
})
|
||||
db = setup_module(CSVDb, {"csv_file": temp_db.as_posix()})
|
||||
|
||||
item = Metadata().set_url("http://example.com").set_title("Example").set_content("Example content").success("my-archiver")
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ import pytest
|
|||
|
||||
from auto_archiver.modules.hash_enricher import HashEnricher
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.core.module import get_module
|
||||
|
||||
@pytest.mark.parametrize("algorithm, filename, expected_hash", [
|
||||
("SHA-256", "tests/data/testfile_1.txt", "1b4f0e9851971998e732078544c96b36c3d01cedf7caa332359d6f1d83567014"),
|
||||
|
@ -9,36 +10,29 @@ from auto_archiver.core import Metadata, Media
|
|||
("SHA3-512", "tests/data/testfile_1.txt", "d2d8cc4f369b340130bd2b29b8b54e918b7c260c3279176da9ccaa37c96eb71735fc97568e892dc6220bf4ae0d748edb46bd75622751556393be3f482e6f794e"),
|
||||
("SHA3-512", "tests/data/testfile_2.txt", "e35970edaa1e0d8af7d948491b2da0450a49fd9cc1e83c5db4c6f175f9550cf341f642f6be8cfb0bfa476e4258e5088c5ad549087bf02811132ac2fa22b734c6")
|
||||
])
|
||||
def test_calculate_hash(algorithm, filename, expected_hash):
|
||||
def test_calculate_hash(algorithm, filename, expected_hash, setup_module):
|
||||
# test SHA-256
|
||||
he = HashEnricher({"algorithm": algorithm, "chunksize": 1})
|
||||
he = setup_module(HashEnricher, {"algorithm": algorithm, "chunksize": 1})
|
||||
assert he.calculate_hash(filename) == expected_hash
|
||||
|
||||
def test_default_config_values():
|
||||
he = HashEnricher(config={})
|
||||
def test_default_config_values(setup_module):
|
||||
he = setup_module(HashEnricher)
|
||||
assert he.algorithm == "SHA-256"
|
||||
assert he.chunksize == 16000000
|
||||
|
||||
def test_invalid_chunksize():
|
||||
with pytest.raises(AssertionError):
|
||||
he = HashEnricher({"chunksize": "-100"})
|
||||
|
||||
def test_invalid_algorithm():
|
||||
with pytest.raises(AssertionError):
|
||||
HashEnricher({"algorithm": "SHA-123"})
|
||||
|
||||
def test_config():
|
||||
# test default config
|
||||
c = HashEnricher.configs()
|
||||
c = get_module('hash_enricher').configs
|
||||
assert c["algorithm"]["default"] == "SHA-256"
|
||||
assert c["chunksize"]["default"] == 16000000
|
||||
assert c["algorithm"]["choices"] == ["SHA-256", "SHA3-512"]
|
||||
assert c["algorithm"]["help"] == "hash algorithm to use"
|
||||
assert c["chunksize"]["help"] == "number of bytes to use when reading files in chunks (if this value is too large you will run out of RAM), default is 16MB"
|
||||
|
||||
def test_hash_media():
|
||||
|
||||
he = HashEnricher({"algorithm": "SHA-256", "chunksize": 1})
|
||||
def test_hash_media(setup_module):
|
||||
|
||||
he = setup_module(HashEnricher, {"algorithm": "SHA-256", "chunksize": 1})
|
||||
|
||||
# generate metadata with two test files
|
||||
m = Metadata().set_url("https://example.com")
|
||||
|
|
|
@ -1,17 +1,18 @@
|
|||
import pytest
|
||||
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
from auto_archiver.base_processors.extractor import Extractor
|
||||
class TestArchiverBase(object):
|
||||
from auto_archiver.core.extractor import Extractor
|
||||
from auto_archiver.core.module import get_module
|
||||
class TestExtractorBase(object):
|
||||
|
||||
archiver_class: str = None
|
||||
extractor_module: str = None
|
||||
config: dict = None
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_archiver(self):
|
||||
assert self.archiver_class is not None, "self.archiver_class must be set on the subclass"
|
||||
def setup_archiver(self, setup_module):
|
||||
assert self.extractor_module is not None, "self.extractor_module must be set on the subclass"
|
||||
assert self.config is not None, "self.config must be a dict set on the subclass"
|
||||
self.archiver: Extractor = self.archiver_class({self.archiver_class.name: self.config})
|
||||
self.extractor: Extractor = setup_module(self.extractor_module, self.config)
|
||||
|
||||
def assertValidResponseMetadata(self, test_response: Metadata, title: str, timestamp: str, status: str = ""):
|
||||
assert test_response is not False
|
|
@ -6,13 +6,15 @@ from os.path import dirname
|
|||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.archivers.generic_extractor.generic_extractor import GenericExtractor
|
||||
from .test_archiver_base import TestArchiverBase
|
||||
from auto_archiver.modules.generic_extractor.generic_extractor import GenericExtractor
|
||||
from .test_extractor_base import TestExtractorBase
|
||||
|
||||
class TestGenericExtractor(TestArchiverBase):
|
||||
"""Tests Base Archiver
|
||||
class TestGenericExtractor(TestExtractorBase):
|
||||
"""Tests Generic Extractor
|
||||
"""
|
||||
archiver_class = GenericExtractor
|
||||
extractor_module = 'generic_extractor'
|
||||
extractor: GenericExtractor
|
||||
|
||||
config = {
|
||||
'subtitles': False,
|
||||
'comments': False,
|
||||
|
@ -28,12 +30,12 @@ class TestGenericExtractor(TestArchiverBase):
|
|||
|
||||
def test_load_dropin(self):
|
||||
# test loading dropins that are in the generic_archiver package
|
||||
package = "auto_archiver.archivers.generic_archiver"
|
||||
assert self.archiver.dropin_for_name("bluesky", package=package)
|
||||
package = "auto_archiver.modules.generic_extractor"
|
||||
assert self.extractor.dropin_for_name("bluesky", package=package)
|
||||
|
||||
# test loading dropings via filepath
|
||||
path = os.path.join(dirname(dirname(__file__)), "data/")
|
||||
assert self.archiver.dropin_for_name("dropin", additional_paths=[path])
|
||||
assert self.extractor.dropin_for_name("dropin", additional_paths=[path])
|
||||
|
||||
|
||||
|
||||
|
@ -51,12 +53,12 @@ class TestGenericExtractor(TestArchiverBase):
|
|||
This behaviour may be changed in the future (e.g. if we want the youtubedl archiver to just handle URLs it has extractors for,
|
||||
and then if and only if all archivers fails, does it fall back to the generic archiver)
|
||||
"""
|
||||
assert self.archiver.suitable(url) == is_suitable
|
||||
assert self.extractor.suitable(url) == is_suitable
|
||||
|
||||
@pytest.mark.download
|
||||
def test_download_tiktok(self, make_item):
|
||||
item = make_item("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970")
|
||||
result = self.archiver.download(item)
|
||||
result = self.extractor.download(item)
|
||||
assert result.get_url() == "https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970"
|
||||
|
||||
@pytest.mark.download
|
||||
|
@ -72,7 +74,7 @@ class TestGenericExtractor(TestArchiverBase):
|
|||
It should return 'False'
|
||||
"""
|
||||
item = make_item(url)
|
||||
result = self.archiver.download(item)
|
||||
result = self.extractor.download(item)
|
||||
assert not result
|
||||
|
||||
|
||||
|
@ -80,7 +82,7 @@ class TestGenericExtractor(TestArchiverBase):
|
|||
def test_youtube_download(self, make_item):
|
||||
# url https://www.youtube.com/watch?v=5qap5aO4i9A
|
||||
item = make_item("https://www.youtube.com/watch?v=J---aiyznGQ")
|
||||
result = self.archiver.download(item)
|
||||
result = self.extractor.download(item)
|
||||
assert result.get_url() == "https://www.youtube.com/watch?v=J---aiyznGQ"
|
||||
assert result.get_title() == "Keyboard Cat! - THE ORIGINAL!"
|
||||
assert result.get('description') == "Buy NEW Keyboard Cat Merch! https://keyboardcat.creator-spring.com\n\nxo Keyboard Cat memes make your day better!\nhttp://www.keyboardcatstore.com/\nhttps://www.facebook.com/thekeyboardcat\nhttp://www.charlieschmidt.com/"
|
||||
|
@ -91,78 +93,78 @@ class TestGenericExtractor(TestArchiverBase):
|
|||
@pytest.mark.download
|
||||
def test_bluesky_download_multiple_images(self, make_item):
|
||||
item = make_item("https://bsky.app/profile/bellingcat.com/post/3lffjoxcu7k2w")
|
||||
result = self.archiver.download(item)
|
||||
result = self.extractor.download(item)
|
||||
assert result is not False
|
||||
|
||||
@pytest.mark.download
|
||||
def test_bluesky_download_single_image(self, make_item):
|
||||
item = make_item("https://bsky.app/profile/bellingcat.com/post/3lfn3hbcxgc2q")
|
||||
result = self.archiver.download(item)
|
||||
result = self.extractor.download(item)
|
||||
assert result is not False
|
||||
|
||||
@pytest.mark.download
|
||||
def test_bluesky_download_no_media(self, make_item):
|
||||
item = make_item("https://bsky.app/profile/bellingcat.com/post/3lfphwmcs4c2z")
|
||||
result = self.archiver.download(item)
|
||||
result = self.extractor.download(item)
|
||||
assert result is not False
|
||||
|
||||
@pytest.mark.download
|
||||
def test_bluesky_download_video(self, make_item):
|
||||
item = make_item("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i")
|
||||
result = self.archiver.download(item)
|
||||
result = self.extractor.download(item)
|
||||
assert result is not False
|
||||
|
||||
@pytest.mark.download
|
||||
def test_truthsocial_download_video(self, make_item):
|
||||
item = make_item("https://truthsocial.com/@DaynaTrueman/posts/110602446619561579")
|
||||
result = self.archiver.download(item)
|
||||
result = self.extractor.download(item)
|
||||
assert len(result.media) == 1
|
||||
assert result is not False
|
||||
|
||||
@pytest.mark.download
|
||||
def test_truthsocial_download_no_media(self, make_item):
|
||||
item = make_item("https://truthsocial.com/@bbcnewa/posts/109598702184774628")
|
||||
result = self.archiver.download(item)
|
||||
result = self.extractor.download(item)
|
||||
assert result is not False
|
||||
|
||||
@pytest.mark.download
|
||||
def test_truthsocial_download_poll(self, make_item):
|
||||
item = make_item("https://truthsocial.com/@CNN_US/posts/113724326568555098")
|
||||
result = self.archiver.download(item)
|
||||
result = self.extractor.download(item)
|
||||
assert result is not False
|
||||
|
||||
@pytest.mark.download
|
||||
def test_truthsocial_download_single_image(self, make_item):
|
||||
item = make_item("https://truthsocial.com/@mariabartiromo/posts/113861116433335006")
|
||||
result = self.archiver.download(item)
|
||||
result = self.extractor.download(item)
|
||||
assert len(result.media) == 1
|
||||
assert result is not False
|
||||
|
||||
@pytest.mark.download
|
||||
def test_truthsocial_download_multiple_images(self, make_item):
|
||||
item = make_item("https://truthsocial.com/@trrth/posts/113861302149349135")
|
||||
result = self.archiver.download(item)
|
||||
result = self.extractor.download(item)
|
||||
assert len(result.media) == 3
|
||||
|
||||
@pytest.mark.download
|
||||
def test_twitter_download_nonexistend_tweet(self, make_item):
|
||||
# this tweet does not exist
|
||||
url = "https://x.com/Bellingcat/status/17197025860711058"
|
||||
response = self.archiver.download(make_item(url))
|
||||
response = self.extractor.download(make_item(url))
|
||||
assert not response
|
||||
|
||||
@pytest.mark.download
|
||||
def test_twitter_download_malformed_tweetid(self, make_item):
|
||||
# this tweet does not exist
|
||||
url = "https://x.com/Bellingcat/status/1719702a586071100058"
|
||||
response = self.archiver.download(make_item(url))
|
||||
response = self.extractor.download(make_item(url))
|
||||
assert not response
|
||||
|
||||
@pytest.mark.download
|
||||
def test_twitter_download_tweet_no_media(self, make_item):
|
||||
|
||||
item = make_item("https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w")
|
||||
post = self.archiver.download(item)
|
||||
post = self.extractor.download(item)
|
||||
|
||||
self.assertValidResponseMetadata(
|
||||
post,
|
||||
|
@ -174,7 +176,7 @@ class TestGenericExtractor(TestArchiverBase):
|
|||
@pytest.mark.download
|
||||
def test_twitter_download_video(self, make_item):
|
||||
url = "https://x.com/bellingcat/status/1871552600346415571"
|
||||
post = self.archiver.download(make_item(url))
|
||||
post = self.extractor.download(make_item(url))
|
||||
self.assertValidResponseMetadata(
|
||||
post,
|
||||
"Bellingcat - This month's Bellingchat Premium is with @KolinaKoltai. She reveals how she investigated a platform allowing users to create AI-generated child sexual abuse material and explains why it's crucial to investigate the people behind these services",
|
||||
|
@ -193,7 +195,7 @@ class TestGenericExtractor(TestArchiverBase):
|
|||
|
||||
"""Download tweets with sensitive media"""
|
||||
|
||||
post = self.archiver.download(make_item(url))
|
||||
post = self.extractor.download(make_item(url))
|
||||
self.assertValidResponseMetadata(
|
||||
post,
|
||||
title,
|
|
@ -1,17 +1,18 @@
|
|||
import os
|
||||
import datetime
|
||||
|
||||
import hashlib
|
||||
import pytest
|
||||
|
||||
from pytwitter.models.media import MediaVariant
|
||||
from .test_archiver_base import TestArchiverBase
|
||||
from auto_archiver.archivers import TwitterApiArchiver
|
||||
from .test_extractor_base import TestExtractorBase
|
||||
from auto_archiver.modules.twitter_api_extractor import TwitterApiExtractor
|
||||
|
||||
|
||||
@pytest.mark.incremental
|
||||
class TestTwitterApiArchiver(TestArchiverBase):
|
||||
class TestTwitterApiExtractor(TestExtractorBase):
|
||||
|
||||
extractor_module = 'twitter_api_extractor'
|
||||
|
||||
archiver_class = TwitterApiArchiver
|
||||
config = {
|
||||
"bearer_tokens": [],
|
||||
"bearer_token": os.environ.get("TWITTER_BEARER_TOKEN", "TEST_KEY"),
|
||||
|
@ -30,7 +31,7 @@ class TestTwitterApiArchiver(TestArchiverBase):
|
|||
("https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w", "https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"), # shouldn't strip params from non-twitter/x URLs
|
||||
])
|
||||
def test_sanitize_url(self, url, expected):
|
||||
assert expected == self.archiver.sanitize_url(url)
|
||||
assert expected == self.extractor.sanitize_url(url)
|
||||
|
||||
@pytest.mark.parametrize("url, exptected_username, exptected_tweetid", [
|
||||
("https://twitter.com/bellingcat/status/1874097816571961839", "bellingcat", "1874097816571961839"),
|
||||
|
@ -39,7 +40,7 @@ class TestTwitterApiArchiver(TestArchiverBase):
|
|||
])
|
||||
def test_get_username_tweet_id_from_url(self, url, exptected_username, exptected_tweetid):
|
||||
|
||||
username, tweet_id = self.archiver.get_username_tweet_id(url)
|
||||
username, tweet_id = self.extractor.get_username_tweet_id(url)
|
||||
assert exptected_username == username
|
||||
assert exptected_tweetid == tweet_id
|
||||
|
||||
|
@ -50,7 +51,7 @@ class TestTwitterApiArchiver(TestArchiverBase):
|
|||
MediaVariant(bit_rate=832000, content_type='video/mp4', url='https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/640x360/uiDZDSmZ8MZn9hsi.mp4?tag=12'),
|
||||
MediaVariant(bit_rate=2176000, content_type='video/mp4', url='https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/1280x720/6Y340Esh568WZnRZ.mp4?tag=12')
|
||||
]
|
||||
chosen_variant = self.archiver.choose_variant(variant_list)
|
||||
chosen_variant = self.extractor.choose_variant(variant_list)
|
||||
assert chosen_variant == variant_list[3]
|
||||
|
||||
@pytest.mark.skipif(not os.environ.get("TWITTER_BEARER_TOKEN"), reason="No Twitter bearer token provided")
|
||||
|
@ -58,7 +59,7 @@ class TestTwitterApiArchiver(TestArchiverBase):
|
|||
def test_download_nonexistent_tweet(self, make_item):
|
||||
# this tweet does not exist
|
||||
url = "https://x.com/Bellingcat/status/17197025860711058"
|
||||
response = self.archiver.download(make_item(url))
|
||||
response = self.extractor.download(make_item(url))
|
||||
assert not response
|
||||
|
||||
@pytest.mark.skipif(not os.environ.get("TWITTER_BEARER_TOKEN"), reason="No Twitter bearer token provided")
|
||||
|
@ -66,7 +67,7 @@ class TestTwitterApiArchiver(TestArchiverBase):
|
|||
def test_download_malformed_tweetid(self, make_item):
|
||||
# this tweet does not exist
|
||||
url = "https://x.com/Bellingcat/status/1719702586071100058"
|
||||
response = self.archiver.download(make_item(url))
|
||||
response = self.extractor.download(make_item(url))
|
||||
assert not response
|
||||
|
||||
@pytest.mark.skipif(not os.environ.get("TWITTER_BEARER_TOKEN"), reason="No Twitter bearer token provided")
|
||||
|
@ -74,7 +75,7 @@ class TestTwitterApiArchiver(TestArchiverBase):
|
|||
def test_download_tweet_no_media(self, make_item):
|
||||
|
||||
item = make_item("https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w")
|
||||
post = self.archiver.download(item)
|
||||
post = self.extractor.download(item)
|
||||
|
||||
self.assertValidResponseMetadata(
|
||||
post,
|
||||
|
@ -87,7 +88,7 @@ class TestTwitterApiArchiver(TestArchiverBase):
|
|||
@pytest.mark.download
|
||||
def test_download_video(self, make_item):
|
||||
url = "https://x.com/bellingcat/status/1871552600346415571"
|
||||
post = self.archiver.download(make_item(url))
|
||||
post = self.extractor.download(make_item(url))
|
||||
self.assertValidResponseMetadata(
|
||||
post,
|
||||
"This month's Bellingchat Premium is with @KolinaKoltai. She reveals how she investigated a platform allowing users to create AI-generated child sexual abuse material and explains why it's crucial to investigate the people behind these services https://t.co/SfBUq0hSD0 https://t.co/rIHx0WlKp8",
|
||||
|
@ -95,22 +96,23 @@ class TestTwitterApiArchiver(TestArchiverBase):
|
|||
)
|
||||
|
||||
@pytest.mark.skipif(not os.environ.get("TWITTER_BEARER_TOKEN"), reason="No Twitter bearer token provided")
|
||||
@pytest.mark.parametrize("url, title, timestamp, image_src", [
|
||||
("https://x.com/SozinhoRamalho/status/1876710769913450647", "ignore tweet, testing sensitivity warning nudity https://t.co/t3u0hQsSB1", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "https://pbs.twimg.com/media/GgtqkomWkAAHUUl.jpg"),
|
||||
("https://x.com/SozinhoRamalho/status/1876710875475681357", "ignore tweet, testing sensitivity warning violence https://t.co/syYDSkpjZD", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "https://pbs.twimg.com/media/GgtqkomWkAAHUUl.jpg"),
|
||||
("https://x.com/SozinhoRamalho/status/1876711053813227618", "ignore tweet, testing sensitivity warning sensitive https://t.co/XE7cRdjzYq", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "https://pbs.twimg.com/media/GgtqkomWkAAHUUl.jpg"),
|
||||
("https://x.com/SozinhoRamalho/status/1876711141314801937", "ignore tweet, testing sensitivity warning nudity, violence, sensitivity https://t.co/YxCFbbhYE3", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "https://pbs.twimg.com/media/GgtqkomWkAAHUUl.jpg"),
|
||||
@pytest.mark.parametrize("url, title, timestamp", [
|
||||
("https://x.com/SozinhoRamalho/status/1876710769913450647", "ignore tweet, testing sensitivity warning nudity https://t.co/t3u0hQsSB1", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc)),
|
||||
("https://x.com/SozinhoRamalho/status/1876710875475681357", "ignore tweet, testing sensitivity warning violence https://t.co/syYDSkpjZD", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc)),
|
||||
("https://x.com/SozinhoRamalho/status/1876711053813227618", "ignore tweet, testing sensitivity warning sensitive https://t.co/XE7cRdjzYq", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc)),
|
||||
("https://x.com/SozinhoRamalho/status/1876711141314801937", "ignore tweet, testing sensitivity warning nudity, violence, sensitivity https://t.co/YxCFbbhYE3", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc)),
|
||||
])
|
||||
@pytest.mark.download
|
||||
def test_download_sensitive_media(self, url, title, timestamp, image_src, make_item):
|
||||
def test_download_sensitive_media(self, url, title, timestamp, check_hash, make_item):
|
||||
|
||||
"""Download tweets with sensitive media"""
|
||||
|
||||
post = self.archiver.download(make_item(url))
|
||||
post = self.extractor.download(make_item(url))
|
||||
self.assertValidResponseMetadata(
|
||||
post,
|
||||
title,
|
||||
timestamp
|
||||
)
|
||||
assert len(post.media) == 1
|
||||
assert post.media[0].get('src') == image_src
|
||||
# check the SHA1 hash (quick) of the media, to make sure it's valid
|
||||
check_hash(post.media[0].filename, "3eea9c03b2dcedd1eb9a169d8bfd1cf877996fab4961de019a96eb9d32d2d733")
|
|
@ -2,8 +2,9 @@ from auto_archiver.modules.html_formatter import HtmlFormatter
|
|||
from auto_archiver.core import Metadata, Media
|
||||
|
||||
|
||||
def test_format():
|
||||
formatter = HtmlFormatter({})
|
||||
def test_format(setup_module):
|
||||
formatter = setup_module(HtmlFormatter)
|
||||
|
||||
metadata = Metadata().set("content", "Hello, world!").set_url('https://example.com')
|
||||
|
||||
final_media = formatter.format(metadata)
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
import pytest
|
||||
from auto_archiver.core.module import get_module, BaseModule, LazyBaseModule
|
||||
|
||||
@pytest.mark.parametrize("module_name", ["cli_feeder", "local_storage", "generic_extractor", "html_formatter", "csv_db"])
|
||||
def test_load_modules(module_name):
|
||||
# test that specific modules can be loaded
|
||||
module = get_module(module_name)
|
||||
assert module is not None
|
||||
assert isinstance(module, LazyBaseModule)
|
||||
assert module.name == module_name
|
||||
|
||||
loaded_module = module.load()
|
||||
assert isinstance(loaded_module, BaseModule)
|
||||
|
||||
# test module setup
|
||||
loaded_module.setup(config={})
|
||||
|
||||
assert loaded_module.config == {}
|
||||
|
||||
|
||||
@pytest.mark.parametrize("module_name", ["cli_feeder", "local_storage", "generic_extractor", "html_formatter", "csv_db"])
|
||||
def test_lazy_base_module(module_name):
|
||||
lazy_module = get_module(module_name)
|
||||
|
||||
assert lazy_module is not None
|
||||
assert isinstance(lazy_module, LazyBaseModule)
|
||||
assert lazy_module.name == module_name
|
||||
assert len(lazy_module.display_name) > 0
|
||||
assert module_name in lazy_module.path
|
||||
assert isinstance(lazy_module.manifest, dict)
|
||||
|
||||
assert lazy_module.requires_setup == lazy_module.manifest.get("requires_setup", True)
|
||||
assert len(lazy_module.entry_point) > 0
|
||||
assert len(lazy_module.configs) > 0
|
||||
assert len(lazy_module.description) > 0
|
||||
assert len(lazy_module.version) > 0
|
||||
|
||||
|
Ładowanie…
Reference in New Issue