Merge pull request #165 from bellingcat/fix/snscrape

Remove snscrape from the twitter_archiver
pull/162/head
Patrick Robertson 2025-01-09 11:06:14 +01:00 zatwierdzone przez GitHub
commit 8e99d62c97
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
4 zmienionych plików z 128 dodań i 62 usunięć

Wyświetl plik

@ -2,7 +2,6 @@ import re, requests, mimetypes, json
from typing import Union from typing import Union
from datetime import datetime from datetime import datetime
from loguru import logger from loguru import logger
from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo
from yt_dlp import YoutubeDL from yt_dlp import YoutubeDL
from yt_dlp.extractor.twitter import TwitterIE from yt_dlp.extractor.twitter import TwitterIE
from slugify import slugify from slugify import slugify
@ -49,7 +48,7 @@ class TwitterArchiver(Archiver):
username, tweet_id = self.get_username_tweet_id(url) username, tweet_id = self.get_username_tweet_id(url)
if not username: return False if not username: return False
strategies = [self.download_yt_dlp, self.download_snscrape, self.download_syndication] strategies = [self.download_yt_dlp, self.download_syndication]
for strategy in strategies: for strategy in strategies:
logger.debug(f"Trying {strategy.__name__} for {url=}") logger.debug(f"Trying {strategy.__name__} for {url=}")
try: try:
@ -61,45 +60,6 @@ class TwitterArchiver(Archiver):
logger.warning(f"No free strategy worked for {url}") logger.warning(f"No free strategy worked for {url}")
return False return False
def download_snscrape(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]:
scr = TwitterTweetScraper(tweet_id)
try:
tweet = next(scr.get_items())
except Exception as ex:
logger.warning(f"SNSCRAPE FAILED, can't get tweet: {type(ex).__name__} occurred. args: {ex.args}")
return False
result = Metadata()
result.set_title(tweet.content).set_content(tweet.json()).set_timestamp(tweet.date)
if tweet.media is None:
logger.debug(f'No media found, archiving tweet text only')
return result
for i, tweet_media in enumerate(tweet.media):
media = Media(filename="")
mimetype = ""
if type(tweet_media) == Video:
variant = max(
[v for v in tweet_media.variants if v.bitrate], key=lambda v: v.bitrate)
media.set("src", variant.url).set("duration", tweet_media.duration)
mimetype = variant.contentType
elif type(tweet_media) == Gif:
variant = tweet_media.variants[0]
media.set("src", variant.url)
mimetype = variant.contentType
elif type(tweet_media) == Photo:
media.set("src", UrlUtil.twitter_best_quality_url(tweet_media.fullUrl))
mimetype = "image/jpeg"
else:
logger.warning(f"Could not get media URL of {tweet_media}")
continue
ext = mimetypes.guess_extension(mimetype)
media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}')
result.add_media(media)
return result.success("twitter-snscrape")
def download_syndication(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]: def download_syndication(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]:
""" """
Hack alternative working again. Hack alternative working again.

Wyświetl plik

@ -0,0 +1,22 @@
from auto_archiver.core import Metadata
class TestArchiverBase(object):
archiver_class = None
config = None
def setUp(self):
assert self.archiver_class is not None, "self.archiver_class must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"
self.archiver = self.archiver_class(self.config)
def create_item(self, url, **kwargs):
item = Metadata().set_url(url)
for key, value in kwargs.items():
item.set(key, value)
return item
def assertValidResponseMetadata(self, test_response, title, timestamp):
self.assertTrue(test_response.is_success())
self.assertEqual(title, test_response.get_title())
self.assertTrue(timestamp, test_response.get("timestamp"))

Wyświetl plik

@ -1,7 +1,8 @@
from auto_archiver.archivers.bluesky_archiver import BlueskyArchiver from auto_archiver.archivers.bluesky_archiver import BlueskyArchiver
from .test_archiver_base import TestArchiverBase
import unittest import unittest
class TestBlueskyArchiver(unittest.TestCase): class TestBlueskyArchiver(TestArchiverBase, unittest.TestCase):
"""Tests Bluesky Archiver """Tests Bluesky Archiver
Note that these tests will download API responses from the bluesky API, so they may be slow. Note that these tests will download API responses from the bluesky API, so they may be slow.
@ -9,24 +10,12 @@ class TestBlueskyArchiver(unittest.TestCase):
and also test the archiver's ability to download media. and also test the archiver's ability to download media.
""" """
# def _download_bsky_embeds(self, post): archiver_class = BlueskyArchiver
# # method to override actual method, and monkey patch requests.get so as to not actually download config = {}
# # the media files
# old_requests_get = requests.get
# def mock_requests_get(*args, **kwargs):
# return {"status_code": 200, "json": lambda: {"data": "fake data"}}
# requests.get = mock_requests_get
# media = self.bsky._download_bsky_embeds(post)
# requests.get = old_requests_get
# return media
def setUp(self):
self.bsky = BlueskyArchiver({})
return super().setUp()
def test_download_media_with_images(self): def test_download_media_with_images(self):
# url https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y # url https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y
post = self.bsky._get_post_from_uri("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y") post = self.archiver._get_post_from_uri("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y")
# just make sure bsky haven't changed their format, images should be under "record/embed/media/images" # just make sure bsky haven't changed their format, images should be under "record/embed/media/images"
# there should be 2 images # there should be 2 images
@ -37,7 +26,7 @@ class TestBlueskyArchiver(unittest.TestCase):
self.assertEqual(len(post["record"]["embed"]["media"]["images"]), 2) self.assertEqual(len(post["record"]["embed"]["media"]["images"]), 2)
# try downloading the media files # try downloading the media files
media = self.bsky._download_bsky_embeds(post) media = self.archiver._download_bsky_embeds(post)
self.assertEqual(len(media), 2) self.assertEqual(len(media), 2)
# check the IDs # check the IDs
@ -46,7 +35,7 @@ class TestBlueskyArchiver(unittest.TestCase):
def test_download_post_with_single_image(self): def test_download_post_with_single_image(self):
# url https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l # url https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l
post = self.bsky._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l") post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l")
# just make sure bsky haven't changed their format, images should be under "record/embed/images" # just make sure bsky haven't changed their format, images should be under "record/embed/images"
# there should be 1 image # there should be 1 image
@ -55,7 +44,7 @@ class TestBlueskyArchiver(unittest.TestCase):
self.assertTrue("images" in post["record"]["embed"]) self.assertTrue("images" in post["record"]["embed"])
self.assertEqual(len(post["record"]["embed"]["images"]), 1) self.assertEqual(len(post["record"]["embed"]["images"]), 1)
media = self.bsky._download_bsky_embeds(post) media = self.archiver._download_bsky_embeds(post)
self.assertEqual(len(media), 1) self.assertEqual(len(media), 1)
# check the ID # check the ID
@ -64,14 +53,14 @@ class TestBlueskyArchiver(unittest.TestCase):
def test_download_post_with_video(self): def test_download_post_with_video(self):
# url https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i # url https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i
post = self.bsky._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i") post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i")
# just make sure bsky haven't changed their format, video should be under "record/embed/video" # just make sure bsky haven't changed their format, video should be under "record/embed/video"
self.assertTrue("record" in post) self.assertTrue("record" in post)
self.assertTrue("embed" in post["record"]) self.assertTrue("embed" in post["record"])
self.assertTrue("video" in post["record"]["embed"]) self.assertTrue("video" in post["record"]["embed"])
media = self.bsky._download_bsky_embeds(post) media = self.archiver._download_bsky_embeds(post)
self.assertEqual(len(media), 1) self.assertEqual(len(media), 1)
# check the ID # check the ID

Wyświetl plik

@ -0,0 +1,95 @@
import unittest
import datetime
from auto_archiver.archivers.twitter_archiver import TwitterArchiver
from .test_archiver_base import TestArchiverBase
class TestTwitterArchiver(TestArchiverBase, unittest.TestCase):
archiver_class = TwitterArchiver
config = {}
def test_sanitize_url(self):
# should expand t.co URLs
t_co_url = "https://t.co/yl3oOJatFp"
t_co_resolved_url = "https://www.bellingcat.com/category/resources/"
self.assertEqual(t_co_resolved_url, self.archiver.sanitize_url(t_co_url))
# shouldn't alter valid x URLs
x_url = "https://x.com/bellingcat/status/1874097816571961839"
self.assertEqual(x_url, self.archiver.sanitize_url(x_url))
# shouldn't alter valid twitter.com URLs
twitter_url = "https://twitter.com/bellingcat/status/1874097816571961839"
self.assertEqual(twitter_url, self.archiver.sanitize_url(twitter_url))
# should strip tracking params
tracking_url = "https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"
self.assertEqual("https://twitter.com/bellingcat/status/1874097816571961839", self.archiver.sanitize_url(tracking_url))
# shouldn't alter non-twitter/x URLs
test_url = "https://www.bellingcat.com/category/resources/"
self.assertEqual(test_url, self.archiver.sanitize_url(test_url))
# shouldn't strip params from non-twitter/x URLs
test_url = "https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"
self.assertEqual(test_url, self.archiver.sanitize_url(test_url))
def test_get_username_tweet_id_from_url(self):
# test valid twitter URL
url = "https://twitter.com/bellingcat/status/1874097816571961839"
username, tweet_id = self.archiver.get_username_tweet_id(url)
self.assertEqual("bellingcat", username)
self.assertEqual("1874097816571961839", tweet_id)
# test valid x URL
url = "https://x.com/bellingcat/status/1874097816571961839"
username, tweet_id = self.archiver.get_username_tweet_id(url)
self.assertEqual("bellingcat", username)
self.assertEqual("1874097816571961839", tweet_id)
# test invalid URL
# TODO: should this return None, False or raise an exception? Right now it returns False
url = "https://www.bellingcat.com/category/resources/"
username, tweet_id = self.archiver.get_username_tweet_id(url)
self.assertFalse(username)
self.assertFalse(tweet_id)
def test_youtube_dlp_archiver(self):
url = "https://x.com/bellingcat/status/1874097816571961839"
post = self.archiver.download_yt_dlp(self.create_item(url), url, "1874097816571961839")
self.assertTrue(post)
self.assertValidResponseMetadata(
post,
"As 2024 comes to a close, heres some examples of what Bellingcat investigated per month in our 10th year! 🧵",
datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc)
)
breakpoint()
def test_download_media_with_images(self):
# url https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w
post = self.archiver.download()
# just make sure twitter haven't changed their format, images should be under "record/embed/media/images"
# there should be 2 images
self.assertTrue("record" in post)
self.assertTrue("embed" in post["record"])
self.assertTrue("media" in post["record"]["embed"])
self.assertTrue("images" in post["record"]["embed"]["media"])
self.assertEqual(len(post["record"]["embed"]["media"]["images"]), 2)
# try downloading the media files
media = self.archiver.download(post)
self.assertEqual(len(media), 2)
# check the IDs
self.assertTrue("bafkreiflrkfihcvwlhka5tb2opw2qog6gfvywsdzdlibveys2acozh75tq" in media[0].get('src'))
self.assertTrue("bafkreibsprmwchf7r6xcstqkdvvuj3ijw7efciw7l3y4crxr4cmynseo7u" in media[1].get('src'))