diff --git a/src/auto_archiver/archivers/twitter_archiver.py b/src/auto_archiver/archivers/twitter_archiver.py index 6735488..1a356ca 100644 --- a/src/auto_archiver/archivers/twitter_archiver.py +++ b/src/auto_archiver/archivers/twitter_archiver.py @@ -2,7 +2,6 @@ import re, requests, mimetypes, json from typing import Union from datetime import datetime from loguru import logger -from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo from yt_dlp import YoutubeDL from yt_dlp.extractor.twitter import TwitterIE from slugify import slugify @@ -49,7 +48,7 @@ class TwitterArchiver(Archiver): username, tweet_id = self.get_username_tweet_id(url) if not username: return False - strategies = [self.download_yt_dlp, self.download_snscrape, self.download_syndication] + strategies = [self.download_yt_dlp, self.download_syndication] for strategy in strategies: logger.debug(f"Trying {strategy.__name__} for {url=}") try: @@ -61,45 +60,6 @@ class TwitterArchiver(Archiver): logger.warning(f"No free strategy worked for {url}") return False - - def download_snscrape(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]: - scr = TwitterTweetScraper(tweet_id) - try: - tweet = next(scr.get_items()) - except Exception as ex: - logger.warning(f"SNSCRAPE FAILED, can't get tweet: {type(ex).__name__} occurred. args: {ex.args}") - return False - - result = Metadata() - result.set_title(tweet.content).set_content(tweet.json()).set_timestamp(tweet.date) - if tweet.media is None: - logger.debug(f'No media found, archiving tweet text only') - return result - - for i, tweet_media in enumerate(tweet.media): - media = Media(filename="") - mimetype = "" - if type(tweet_media) == Video: - variant = max( - [v for v in tweet_media.variants if v.bitrate], key=lambda v: v.bitrate) - media.set("src", variant.url).set("duration", tweet_media.duration) - mimetype = variant.contentType - elif type(tweet_media) == Gif: - variant = tweet_media.variants[0] - media.set("src", variant.url) - mimetype = variant.contentType - elif type(tweet_media) == Photo: - media.set("src", UrlUtil.twitter_best_quality_url(tweet_media.fullUrl)) - mimetype = "image/jpeg" - else: - logger.warning(f"Could not get media URL of {tweet_media}") - continue - ext = mimetypes.guess_extension(mimetype) - media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}') - result.add_media(media) - - return result.success("twitter-snscrape") - def download_syndication(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]: """ Hack alternative working again. diff --git a/tests/archivers/test_archiver_base.py b/tests/archivers/test_archiver_base.py new file mode 100644 index 0000000..f99850a --- /dev/null +++ b/tests/archivers/test_archiver_base.py @@ -0,0 +1,22 @@ +from auto_archiver.core import Metadata + +class TestArchiverBase(object): + + archiver_class = None + config = None + + def setUp(self): + assert self.archiver_class is not None, "self.archiver_class must be set on the subclass" + assert self.config is not None, "self.config must be a dict set on the subclass" + self.archiver = self.archiver_class(self.config) + + def create_item(self, url, **kwargs): + item = Metadata().set_url(url) + for key, value in kwargs.items(): + item.set(key, value) + return item + + def assertValidResponseMetadata(self, test_response, title, timestamp): + assert test_response.is_success() + assert title == test_response.get_title() + assert timestamp, test_response.get("timestamp") diff --git a/tests/archivers/test_bluesky_archiver.py b/tests/archivers/test_bluesky_archiver.py index 17719f5..de5074c 100644 --- a/tests/archivers/test_bluesky_archiver.py +++ b/tests/archivers/test_bluesky_archiver.py @@ -1,8 +1,9 @@ from auto_archiver.archivers.bluesky_archiver import BlueskyArchiver +from .test_archiver_base import TestArchiverBase from vcr.unittest import VCRMixin import unittest -class TestBlueskyArchiver(VCRMixin, unittest.TestCase): +class TestBlueskyArchiver(TestArchiverBase, unittest.TestCase): """Tests Bluesky Archiver Note that these tests will download API responses from the bluesky API, so they may be slow. @@ -10,24 +11,12 @@ class TestBlueskyArchiver(VCRMixin, unittest.TestCase): and also test the archiver's ability to download media. """ - # def _download_bsky_embeds(self, post): - # # method to override actual method, and monkey patch requests.get so as to not actually download - # # the media files - # old_requests_get = requests.get - # def mock_requests_get(*args, **kwargs): - # return {"status_code": 200, "json": lambda: {"data": "fake data"}} - # requests.get = mock_requests_get - # media = self.bsky._download_bsky_embeds(post) - # requests.get = old_requests_get - # return media + archiver_class = BlueskyArchiver + config = {} - def setUp(self): - self.bsky = BlueskyArchiver({}) - return super().setUp() - def test_download_media_with_images(self): # url https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y - post = self.bsky._get_post_from_uri("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y") + post = self.archiver._get_post_from_uri("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y") # just make sure bsky haven't changed their format, images should be under "record/embed/media/images" # there should be 2 images @@ -38,7 +27,7 @@ class TestBlueskyArchiver(VCRMixin, unittest.TestCase): assert len(post["record"]["embed"]["media"]["images"]) == 2 # try downloading the media files - media = self.bsky._download_bsky_embeds(post) + media = self.archiver._download_bsky_embeds(post) assert len(media) == 2 # check the IDs @@ -47,7 +36,7 @@ class TestBlueskyArchiver(VCRMixin, unittest.TestCase): def test_download_post_with_single_image(self): # url https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l - post = self.bsky._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l") + post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l") # just make sure bsky haven't changed their format, images should be under "record/embed/images" # there should be 1 image @@ -56,7 +45,7 @@ class TestBlueskyArchiver(VCRMixin, unittest.TestCase): assert "images" in post["record"]["embed"] assert len(post["record"]["embed"]["images"]) == 1 - media = self.bsky._download_bsky_embeds(post) + media = self.archiver._download_bsky_embeds(post) assert len(media) == 1 # check the ID @@ -65,17 +54,17 @@ class TestBlueskyArchiver(VCRMixin, unittest.TestCase): def test_download_post_with_video(self): # url https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i - post = self.bsky._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i") + post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i") # just make sure bsky haven't changed their format, video should be under "record/embed/video" assert "record" in post assert "embed" in post["record"] assert "video" in post["record"]["embed"] - media = self.bsky._download_bsky_embeds(post) + media = self.archiver._download_bsky_embeds(post) assert len(media) == 1 # check the ID assert "bafkreiaiskn2nt5cxjnxbgcqqcrnurvkr2ni3unekn6zvhvgr5nrqg6u2q" in media[0].get('src') - \ No newline at end of file + diff --git a/tests/archivers/test_twitter_archiver.py b/tests/archivers/test_twitter_archiver.py new file mode 100644 index 0000000..2329200 --- /dev/null +++ b/tests/archivers/test_twitter_archiver.py @@ -0,0 +1,95 @@ +import unittest +import datetime + +from auto_archiver.archivers.twitter_archiver import TwitterArchiver + +from .test_archiver_base import TestArchiverBase + + +class TestTwitterArchiver(TestArchiverBase, unittest.TestCase): + + archiver_class = TwitterArchiver + config = {} + + def test_sanitize_url(self): + + # should expand t.co URLs + t_co_url = "https://t.co/yl3oOJatFp" + t_co_resolved_url = "https://www.bellingcat.com/category/resources/" + assert t_co_resolved_url == self.archiver.sanitize_url(t_co_url) + + # shouldn't alter valid x URLs + x_url = "https://x.com/bellingcat/status/1874097816571961839" + assert x_url == self.archiver.sanitize_url(x_url) + + # shouldn't alter valid twitter.com URLs + twitter_url = "https://twitter.com/bellingcat/status/1874097816571961839" + assert twitter_url == self.archiver.sanitize_url(twitter_url) + + # should strip tracking params + tracking_url = "https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w" + assert "https://twitter.com/bellingcat/status/1874097816571961839" == self.archiver.sanitize_url(tracking_url) + + # shouldn't alter non-twitter/x URLs + test_url = "https://www.bellingcat.com/category/resources/" + assert test_url == self.archiver.sanitize_url(test_url) + + # shouldn't strip params from non-twitter/x URLs + test_url = "https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w" + assert test_url == self.archiver.sanitize_url(test_url) + + + def test_get_username_tweet_id_from_url(self): + + # test valid twitter URL + url = "https://twitter.com/bellingcat/status/1874097816571961839" + username, tweet_id = self.archiver.get_username_tweet_id(url) + assert "bellingcat" == username + assert "1874097816571961839" == tweet_id + + # test valid x URL + url = "https://x.com/bellingcat/status/1874097816571961839" + username, tweet_id = self.archiver.get_username_tweet_id(url) + assert "bellingcat" == username + assert "1874097816571961839" == tweet_id + + # test invalid URL + # TODO: should this return None, False or raise an exception? Right now it returns False + url = "https://www.bellingcat.com/category/resources/" + username, tweet_id = self.archiver.get_username_tweet_id(url) + assert not username + assert not tweet_id + + def test_youtube_dlp_archiver(self): + + url = "https://x.com/bellingcat/status/1874097816571961839" + post = self.archiver.download_yt_dlp(self.create_item(url), url, "1874097816571961839") + assert post + self.assertValidResponseMetadata( + post, + "As 2024 comes to a close, here’s some examples of what Bellingcat investigated per month in our 10th year! 🧵", + datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc) + ) + breakpoint() + + + def test_download_media_with_images(self): + # url https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w + + post = self.archiver.download() + + # just make sure twitter haven't changed their format, images should be under "record/embed/media/images" + # there should be 2 images + assert "record" in post + assert "embed" in post["record"] + assert "media" in post["record"]["embed"] + assert "images" in post["record"]["embed"]["media"] + assert len(post["record"]["embed"]["media"]["images"]) == 2 + + # try downloading the media files + media = self.archiver.download(post) + assert len(media) == 2 + + # check the IDs + assert "bafkreiflrkfihcvwlhka5tb2opw2qog6gfvywsdzdlibveys2acozh75tq" in media[0].get('src') + assert "bafkreibsprmwchf7r6xcstqkdvvuj3ijw7efciw7l3y4crxr4cmynseo7u" in media[1].get('src') \ No newline at end of file