Merge branch 'main' into feat/unittest

pull/163/head
Patrick Robertson 2025-01-13 13:15:13 +01:00
commit e2bc84ccb9
4 zmienionych plików z 129 dodań i 63 usunięć

Wyświetl plik

@ -2,7 +2,6 @@ import re, requests, mimetypes, json
from typing import Union
from datetime import datetime
from loguru import logger
from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo
from yt_dlp import YoutubeDL
from yt_dlp.extractor.twitter import TwitterIE
from slugify import slugify
@ -49,7 +48,7 @@ class TwitterArchiver(Archiver):
username, tweet_id = self.get_username_tweet_id(url)
if not username: return False
strategies = [self.download_yt_dlp, self.download_snscrape, self.download_syndication]
strategies = [self.download_yt_dlp, self.download_syndication]
for strategy in strategies:
logger.debug(f"Trying {strategy.__name__} for {url=}")
try:
@ -61,45 +60,6 @@ class TwitterArchiver(Archiver):
logger.warning(f"No free strategy worked for {url}")
return False
def download_snscrape(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]:
scr = TwitterTweetScraper(tweet_id)
try:
tweet = next(scr.get_items())
except Exception as ex:
logger.warning(f"SNSCRAPE FAILED, can't get tweet: {type(ex).__name__} occurred. args: {ex.args}")
return False
result = Metadata()
result.set_title(tweet.content).set_content(tweet.json()).set_timestamp(tweet.date)
if tweet.media is None:
logger.debug(f'No media found, archiving tweet text only')
return result
for i, tweet_media in enumerate(tweet.media):
media = Media(filename="")
mimetype = ""
if type(tweet_media) == Video:
variant = max(
[v for v in tweet_media.variants if v.bitrate], key=lambda v: v.bitrate)
media.set("src", variant.url).set("duration", tweet_media.duration)
mimetype = variant.contentType
elif type(tweet_media) == Gif:
variant = tweet_media.variants[0]
media.set("src", variant.url)
mimetype = variant.contentType
elif type(tweet_media) == Photo:
media.set("src", UrlUtil.twitter_best_quality_url(tweet_media.fullUrl))
mimetype = "image/jpeg"
else:
logger.warning(f"Could not get media URL of {tweet_media}")
continue
ext = mimetypes.guess_extension(mimetype)
media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}')
result.add_media(media)
return result.success("twitter-snscrape")
def download_syndication(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]:
"""
Hack alternative working again.

Wyświetl plik

@ -0,0 +1,22 @@
from auto_archiver.core import Metadata
class TestArchiverBase(object):
archiver_class = None
config = None
def setUp(self):
assert self.archiver_class is not None, "self.archiver_class must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"
self.archiver = self.archiver_class(self.config)
def create_item(self, url, **kwargs):
item = Metadata().set_url(url)
for key, value in kwargs.items():
item.set(key, value)
return item
def assertValidResponseMetadata(self, test_response, title, timestamp):
assert test_response.is_success()
assert title == test_response.get_title()
assert timestamp, test_response.get("timestamp")

Wyświetl plik

@ -1,8 +1,9 @@
from auto_archiver.archivers.bluesky_archiver import BlueskyArchiver
from .test_archiver_base import TestArchiverBase
from vcr.unittest import VCRMixin
import unittest
class TestBlueskyArchiver(VCRMixin, unittest.TestCase):
class TestBlueskyArchiver(TestArchiverBase, unittest.TestCase):
"""Tests Bluesky Archiver
Note that these tests will download API responses from the bluesky API, so they may be slow.
@ -10,24 +11,12 @@ class TestBlueskyArchiver(VCRMixin, unittest.TestCase):
and also test the archiver's ability to download media.
"""
# def _download_bsky_embeds(self, post):
# # method to override actual method, and monkey patch requests.get so as to not actually download
# # the media files
# old_requests_get = requests.get
# def mock_requests_get(*args, **kwargs):
# return {"status_code": 200, "json": lambda: {"data": "fake data"}}
# requests.get = mock_requests_get
# media = self.bsky._download_bsky_embeds(post)
# requests.get = old_requests_get
# return media
archiver_class = BlueskyArchiver
config = {}
def setUp(self):
self.bsky = BlueskyArchiver({})
return super().setUp()
def test_download_media_with_images(self):
# url https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y
post = self.bsky._get_post_from_uri("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y")
post = self.archiver._get_post_from_uri("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y")
# just make sure bsky haven't changed their format, images should be under "record/embed/media/images"
# there should be 2 images
@ -38,7 +27,7 @@ class TestBlueskyArchiver(VCRMixin, unittest.TestCase):
assert len(post["record"]["embed"]["media"]["images"]) == 2
# try downloading the media files
media = self.bsky._download_bsky_embeds(post)
media = self.archiver._download_bsky_embeds(post)
assert len(media) == 2
# check the IDs
@ -47,7 +36,7 @@ class TestBlueskyArchiver(VCRMixin, unittest.TestCase):
def test_download_post_with_single_image(self):
# url https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l
post = self.bsky._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l")
post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l")
# just make sure bsky haven't changed their format, images should be under "record/embed/images"
# there should be 1 image
@ -56,7 +45,7 @@ class TestBlueskyArchiver(VCRMixin, unittest.TestCase):
assert "images" in post["record"]["embed"]
assert len(post["record"]["embed"]["images"]) == 1
media = self.bsky._download_bsky_embeds(post)
media = self.archiver._download_bsky_embeds(post)
assert len(media) == 1
# check the ID
@ -65,17 +54,17 @@ class TestBlueskyArchiver(VCRMixin, unittest.TestCase):
def test_download_post_with_video(self):
# url https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i
post = self.bsky._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i")
post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i")
# just make sure bsky haven't changed their format, video should be under "record/embed/video"
assert "record" in post
assert "embed" in post["record"]
assert "video" in post["record"]["embed"]
media = self.bsky._download_bsky_embeds(post)
media = self.archiver._download_bsky_embeds(post)
assert len(media) == 1
# check the ID
assert "bafkreiaiskn2nt5cxjnxbgcqqcrnurvkr2ni3unekn6zvhvgr5nrqg6u2q" in media[0].get('src')

Wyświetl plik

@ -0,0 +1,95 @@
import unittest
import datetime
from auto_archiver.archivers.twitter_archiver import TwitterArchiver
from .test_archiver_base import TestArchiverBase
class TestTwitterArchiver(TestArchiverBase, unittest.TestCase):
archiver_class = TwitterArchiver
config = {}
def test_sanitize_url(self):
# should expand t.co URLs
t_co_url = "https://t.co/yl3oOJatFp"
t_co_resolved_url = "https://www.bellingcat.com/category/resources/"
assert t_co_resolved_url == self.archiver.sanitize_url(t_co_url)
# shouldn't alter valid x URLs
x_url = "https://x.com/bellingcat/status/1874097816571961839"
assert x_url == self.archiver.sanitize_url(x_url)
# shouldn't alter valid twitter.com URLs
twitter_url = "https://twitter.com/bellingcat/status/1874097816571961839"
assert twitter_url == self.archiver.sanitize_url(twitter_url)
# should strip tracking params
tracking_url = "https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"
assert "https://twitter.com/bellingcat/status/1874097816571961839" == self.archiver.sanitize_url(tracking_url)
# shouldn't alter non-twitter/x URLs
test_url = "https://www.bellingcat.com/category/resources/"
assert test_url == self.archiver.sanitize_url(test_url)
# shouldn't strip params from non-twitter/x URLs
test_url = "https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"
assert test_url == self.archiver.sanitize_url(test_url)
def test_get_username_tweet_id_from_url(self):
# test valid twitter URL
url = "https://twitter.com/bellingcat/status/1874097816571961839"
username, tweet_id = self.archiver.get_username_tweet_id(url)
assert "bellingcat" == username
assert "1874097816571961839" == tweet_id
# test valid x URL
url = "https://x.com/bellingcat/status/1874097816571961839"
username, tweet_id = self.archiver.get_username_tweet_id(url)
assert "bellingcat" == username
assert "1874097816571961839" == tweet_id
# test invalid URL
# TODO: should this return None, False or raise an exception? Right now it returns False
url = "https://www.bellingcat.com/category/resources/"
username, tweet_id = self.archiver.get_username_tweet_id(url)
assert not username
assert not tweet_id
def test_youtube_dlp_archiver(self):
url = "https://x.com/bellingcat/status/1874097816571961839"
post = self.archiver.download_yt_dlp(self.create_item(url), url, "1874097816571961839")
assert post
self.assertValidResponseMetadata(
post,
"As 2024 comes to a close, heres some examples of what Bellingcat investigated per month in our 10th year! 🧵",
datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc)
)
breakpoint()
def test_download_media_with_images(self):
# url https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w
post = self.archiver.download()
# just make sure twitter haven't changed their format, images should be under "record/embed/media/images"
# there should be 2 images
assert "record" in post
assert "embed" in post["record"]
assert "media" in post["record"]["embed"]
assert "images" in post["record"]["embed"]["media"]
assert len(post["record"]["embed"]["media"]["images"]) == 2
# try downloading the media files
media = self.archiver.download(post)
assert len(media) == 2
# check the IDs
assert "bafkreiflrkfihcvwlhka5tb2opw2qog6gfvywsdzdlibveys2acozh75tq" in media[0].get('src')
assert "bafkreibsprmwchf7r6xcstqkdvvuj3ijw7efciw7l3y4crxr4cmynseo7u" in media[1].get('src')