From fd2e7f973b75d7bc5fe8cf8956464da283a34ff3 Mon Sep 17 00:00:00 2001 From: Patrick Robertson Date: Mon, 20 Jan 2025 16:17:57 +0100 Subject: [PATCH] Further tidy-ups, also adds some ytdlp utils to 'utils' --- .../archivers/generic_archiver/bluesky.py | 27 +++------ .../generic_archiver/generic_archiver.py | 55 +++++++++---------- .../archivers/generic_archiver/truth.py | 31 +++++++++++ src/auto_archiver/utils/__init__.py | 5 +- tests/archivers/test_archiver_base.py | 8 +-- tests/archivers/test_generic_archiver.py | 55 ++++++++++++++++++- 6 files changed, 123 insertions(+), 58 deletions(-) create mode 100644 src/auto_archiver/archivers/generic_archiver/truth.py diff --git a/src/auto_archiver/archivers/generic_archiver/bluesky.py b/src/auto_archiver/archivers/generic_archiver/bluesky.py index 176808b..684124b 100644 --- a/src/auto_archiver/archivers/generic_archiver/bluesky.py +++ b/src/auto_archiver/archivers/generic_archiver/bluesky.py @@ -18,13 +18,13 @@ def create_metadata(post: dict, archiver: Archiver, url: str) -> Metadata: if v: result.set(k, v) # download if embeds present (1 video XOR >=1 images) - for media in _download_bsky_embeds(post): + for media in _download_bsky_embeds(post, archiver): result.add_media(media) logger.debug(f"Downloaded {len(result.media)} media files") return result -def _download_bsky_embeds(post: dict) -> list[Media]: +def _download_bsky_embeds(post: dict, archiver: Archiver) -> list[Media]: """ Iterates over image(s) or video in a Bluesky post and downloads them """ @@ -33,30 +33,17 @@ def _download_bsky_embeds(post: dict) -> list[Media]: image_medias = embed.get("images", []) + embed.get("media", {}).get("images", []) video_medias = [e for e in [embed.get("video"), embed.get("media", {}).get("video")] if e] + media_url = "https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={}&did={}" for image_media in image_medias: - image_media = _download_bsky_file_as_media(image_media["image"]["ref"]["$link"], post["author"]["did"]) + url = media_url.format(image_media['image']['ref']['$link'], post['author']['did']) + image_media = archiver.download_from_url(url) media.append(image_media) for video_media in video_medias: - video_media = _download_bsky_file_as_media(video_media["ref"]["$link"], post["author"]["did"]) + url = media_url.format(video_media['ref']['$link'], post['author']['did']) + video_media = archiver.download_from_url(url) media.append(video_media) return media -def _download_bsky_file_as_media(cid: str, did: str) -> Media: - """ - Uses the Bluesky API to download a file by its `cid` and `did`. - """ - # TODO: replace with self.download_from_url once that function has been cleaned-up - file_url = f"https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={cid}&did={did}" - response = requests.get(file_url, stream=True) - response.raise_for_status() - ext = mimetypes.guess_extension(response.headers["Content-Type"]) - filename = os.path.join(ArchivingContext.get_tmp_dir(), f"{cid}{ext}") - with open(filename, "wb") as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - media = Media(filename=filename) - media.set("src", file_url) - return media def _get_post_data(post: dict) -> dict: """ diff --git a/src/auto_archiver/archivers/generic_archiver/generic_archiver.py b/src/auto_archiver/archivers/generic_archiver/generic_archiver.py index 573f47f..00119f7 100644 --- a/src/auto_archiver/archivers/generic_archiver/generic_archiver.py +++ b/src/auto_archiver/archivers/generic_archiver/generic_archiver.py @@ -4,7 +4,7 @@ from yt_dlp.extractor.common import InfoExtractor from loguru import logger -from . import bluesky, twitter +from . import bluesky, twitter, truth from auto_archiver.archivers.archiver import Archiver from ...core import Metadata, Media, ArchivingContext @@ -91,13 +91,6 @@ class GenericArchiver(Archiver): # keep both 'title' and 'fulltitle', but prefer 'title', falling back to 'fulltitle' if it doesn't exist result.set_title(video_data.pop('title', video_data.pop('fulltitle', ""))) - - # then add the platform specific additional metadata - for key, mapping in self.video_data_metadata_mapping(extractor_key, video_data).items(): - if isinstance(mapping, str): - result.set(key, eval(f"video_data{mapping}")) - elif callable(mapping): - result.set(key, mapping(video_data)) result.set_url(url) # extract comments if enabled @@ -126,13 +119,6 @@ class GenericArchiver(Archiver): result.set(k, v) return result - - def video_data_metadata_mapping(self, extractor_key: str, video_data: dict) -> dict: - """ - Returns a key->value mapping to map from the yt-dlp produced 'video_data' to the Metadata object. - Can be either a string for direct mapping, or a function, or a lambda. - """ - return {} def suitable_extractors(self, url: str) -> list[str]: """ @@ -148,14 +134,20 @@ class GenericArchiver(Archiver): """ return any(self.suitable_extractors(url)) - def create_metadata_for_post(self, info_extractor: InfoExtractor, video_data: dict, url: str) -> Metadata: + def create_metadata_for_post(self, info_extractor: InfoExtractor, post_data: dict, url: str) -> Metadata: """ - Standardizes the output of the ytdlp InfoExtractor to a common format + Standardizes the output of the 'post' data from a ytdlp InfoExtractor to Metadata object. + + This is only required for platforms that don't have videos, and therefore cannot be converted into ytdlp valid 'video_data'. + In these instances, we need to use the extractor's _extract_post (or similar) method to get the post metadata, and then convert + it into a Metadata object via a platform-specific function. """ if info_extractor.ie_key() == 'Bluesky': - return bluesky.create_metadata(video_data, self, url) + return bluesky.create_metadata(post_data, self, url) if info_extractor.ie_key() == 'Twitter': - return twitter.create_metadata(video_data, self, url) + return twitter.create_metadata(post_data, self, url) + if info_extractor.ie_key() == 'Truth': + return truth.create_metadata(post_data, self, url) def get_metatdata_for_post(self, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL) -> Metadata: """ @@ -174,23 +166,22 @@ class GenericArchiver(Archiver): twid = ie_instance._match_valid_url(url).group('id') # TODO: if ytdlp PR https://github.com/yt-dlp/yt-dlp/pull/12098 is merged, change to _extract_post post_data = ie_instance._extract_status(twid=twid) - - elif info_extractor.ie_key() == 'TikTok': - pass - + elif info_extractor.ie_key() == 'Truth': + video_id = ie_instance._match_id(url) + truthsocial_url = f'https://truthsocial.com/api/v1/statuses/{video_id}' + post_data = ie_instance._download_json(truthsocial_url, video_id) else: # lame attempt at trying to get data for an unknown extractor # TODO: test some more video platforms and see if there's any improvement to be made try: post_data = ie_instance._extract_post(url) except (NotImplementedError, AttributeError) as e: - logger.debug(f"Extractor {info_extractor.ie_key()} does not support extracting post info: {e}") + logger.debug(f"Extractor {info_extractor.ie_key()} does not support extracting post info from non-video URLs: {e}") return False return self.create_metadata_for_post(ie_instance, post_data, url) def get_metatdata_for_video(self, info: dict, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL) -> Metadata: - # this time download ydl.params['getcomments'] = self.comments #TODO: for playlist or long lists of videos, how to download one at a time so they can be stored before the next one is downloaded? @@ -250,12 +241,16 @@ class GenericArchiver(Archiver): # it's a valid video, that the youtubdedl can download out of the box result = self.get_metatdata_for_video(info, info_extractor, url, ydl) - except yt_dlp.utils.DownloadError as e: - logger.debug(f'No video found, attempting to use extractor directly: {e}') - result = self.get_metatdata_for_post(info_extractor, url, ydl) except Exception as e: - logger.debug(f'ytdlp exception which is normal for example a facebook page with images only will cause a IndexError: list index out of range. Exception is: \n {e}') - return False + logger.debug(f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use extractor to get post data instead') + try: + result = self.get_metatdata_for_post(info_extractor, url, ydl) + except (yt_dlp.utils.DownloadError, yt_dlp.utils.ExtractorError) as post_e: + logger.error(f'Error downloading metadata for post: {post_e}') + return False + except Exception as generic_e: + logger.debug(f'Attempt to extract using ytdlp extractor "{info_extractor.IE_NAME}" failed: \n {repr(generic_e)}', exc_info=True) + return False if result: extractor_name = "yt-dlp" diff --git a/src/auto_archiver/archivers/generic_archiver/truth.py b/src/auto_archiver/archivers/generic_archiver/truth.py new file mode 100644 index 0000000..780a56e --- /dev/null +++ b/src/auto_archiver/archivers/generic_archiver/truth.py @@ -0,0 +1,31 @@ +import datetime + +from auto_archiver.utils import clean_html, traverse_obj +from auto_archiver.core.metadata import Metadata +from auto_archiver.archivers.archiver import Archiver + +def create_metadata(post: dict, archiver: Archiver, url: str) -> Metadata: + """ + Creates metaata from a truth social post + + Only used for posts that contains no media. ytdlp.TruthIE extractor can handle posts with media + + Format is: + + {'id': '109598702184774628', 'created_at': '2022-12-29T19:51:18.161Z', 'in_reply_to_id': None, 'quote_id': None, 'in_reply_to_account_id': None, 'sensitive': False, 'spoiler_text': '', 'visibility': 'public', 'language': 'en', 'uri': 'https://truthsocial.com/@bbcnewa/109598702184774628', 'url': 'https://truthsocial.com/@bbcnewa/109598702184774628', 'content': '

Pele, regarded by many as football\'s greatest ever player, has died in Brazil at the age of 82. bbc.com/sport/football/4275151

', 'account': {'id': '107905163010312793', 'username': 'bbcnewa', 'acct': 'bbcnewa', 'display_name': 'BBC News', 'locked': False, 'bot': False, 'discoverable': True, 'group': False, 'created_at': '2022-03-05T17:42:01.159Z', 'note': '

News, features and analysis by the BBC

', 'url': 'https://truthsocial.com/@bbcnewa', 'avatar': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/avatars/107/905/163/010/312/793/original/e7c07550dc22c23a.jpeg', 'avatar_static': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/avatars/107/905/163/010/312/793/original/e7c07550dc22c23a.jpeg', 'header': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/headers/107/905/163/010/312/793/original/a00eeec2b57206c7.jpeg', 'header_static': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/headers/107/905/163/010/312/793/original/a00eeec2b57206c7.jpeg', 'followers_count': 1131, 'following_count': 3, 'statuses_count': 9, 'last_status_at': '2024-11-12', 'verified': False, 'location': '', 'website': 'https://www.bbc.com/news', 'unauth_visibility': True, 'chats_onboarded': True, 'feeds_onboarded': True, 'accepting_messages': False, 'show_nonmember_group_statuses': None, 'emojis': [], 'fields': [], 'tv_onboarded': True, 'tv_account': False}, 'media_attachments': [], 'mentions': [], 'tags': [], 'card': None, 'group': None, 'quote': None, 'in_reply_to': None, 'reblog': None, 'sponsored': False, 'replies_count': 1, 'reblogs_count': 0, 'favourites_count': 2, 'favourited': False, 'reblogged': False, 'muted': False, 'pinned': False, 'bookmarked': False, 'poll': None, 'emojis': []} + """ + result = Metadata() + result.set_url(url) + timestamp = post['created_at'] # format is 2022-12-29T19:51:18.161Z + result.set_timestamp(datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")) + result.set('description', post['content']) + result.set('author', post['account']['username']) + + for key in ['replies_count', 'reblogs_count', 'favourites_count', ('account', 'followers_count'), ('account', 'following_count'), ('account', 'statuses_count'), ('account', 'display_name'), 'language', 'in_reply_to_account', 'replies_count']: + if isinstance(key, tuple): + store_key = u" ".join(key) + else: + store_key = key + result.set(store_key, traverse_obj(post, key)) + + return result \ No newline at end of file diff --git a/src/auto_archiver/utils/__init__.py b/src/auto_archiver/utils/__init__.py index fe5cb58..50bddca 100644 --- a/src/auto_archiver/utils/__init__.py +++ b/src/auto_archiver/utils/__init__.py @@ -4,4 +4,7 @@ from .misc import * from .webdriver import Webdriver from .gsheet import Gsheets from .url import UrlUtil -from .atlos import get_atlos_config_options \ No newline at end of file +from .atlos import get_atlos_config_options + +# handy utils from ytdlp +from yt_dlp.utils import (clean_html, traverse_obj, strip_or_none) \ No newline at end of file diff --git a/tests/archivers/test_archiver_base.py b/tests/archivers/test_archiver_base.py index ed77739..d793706 100644 --- a/tests/archivers/test_archiver_base.py +++ b/tests/archivers/test_archiver_base.py @@ -3,17 +3,17 @@ import pytest from auto_archiver.core import Metadata from auto_archiver.core import Step from auto_archiver.core.metadata import Metadata - +from auto_archiver.archivers.archiver import Archiver class TestArchiverBase(object): - archiver_class = None - config = None + archiver_class: str = None + config: dict = None @pytest.fixture(autouse=True) def setup_archiver(self): assert self.archiver_class is not None, "self.archiver_class must be set on the subclass" assert self.config is not None, "self.config must be a dict set on the subclass" - self.archiver = self.archiver_class({self.archiver_class.name: self.config}) + self.archiver: Archiver = self.archiver_class({self.archiver_class.name: self.config}) def assertValidResponseMetadata(self, test_response: Metadata, title: str, timestamp: str, status: str = ""): assert test_response is not False diff --git a/tests/archivers/test_generic_archiver.py b/tests/archivers/test_generic_archiver.py index b6f460e..a35d28d 100644 --- a/tests/archivers/test_generic_archiver.py +++ b/tests/archivers/test_generic_archiver.py @@ -46,6 +46,23 @@ class TestGenericArchiver(TestArchiverBase): result = self.archiver.download(item) assert result.get_url() == "https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970" + @pytest.mark.download + @pytest.mark.parametrize("url", [ + "https://bsky.app/profile/colborne.bsky.social/post/3lcxcpgt6j42l", + "twitter.com/bellingcat/status/123", + "https://www.youtube.com/watch?v=1" + ]) + def test_download_nonexistend_media(self, make_item, url): + """ + Test to make sure that the extractor doesn't break on non-existend posts/media + + It should return 'False' + """ + item = make_item(url) + result = self.archiver.download(item) + assert not result + + @pytest.mark.download def test_youtube_download(self, make_item): # url https://www.youtube.com/watch?v=5qap5aO4i9A @@ -60,14 +77,13 @@ class TestGenericArchiver(TestArchiverBase): @pytest.mark.download def test_bluesky_download_multiple_images(self, make_item): - item = make_item("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y") + item = make_item("https://bsky.app/profile/bellingcat.com/post/3lffjoxcu7k2w") result = self.archiver.download(item) assert result is not False - @pytest.mark.skip("ytdlp supports bluesky, but there's currently no way to extract info from pages without videos") @pytest.mark.download def test_bluesky_download_single_image(self, make_item): - item = make_item("https://bsky.app/profile/colborne.bsky.social/post/3lcxcpgt6j42l") + item = make_item("https://bsky.app/profile/bellingcat.com/post/3lfn3hbcxgc2q") result = self.archiver.download(item) assert result is not False @@ -82,6 +98,39 @@ class TestGenericArchiver(TestArchiverBase): item = make_item("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i") result = self.archiver.download(item) assert result is not False + + @pytest.mark.download + def test_truthsocial_download_video(self, make_item): + item = make_item("https://truthsocial.com/@DaynaTrueman/posts/110602446619561579") + result = self.archiver.download(item) + assert len(result.media) == 1 + assert result is not False + + @pytest.mark.download + def test_truthsocial_download_no_media(self, make_item): + item = make_item("https://truthsocial.com/@bbcnewa/posts/109598702184774628") + result = self.archiver.download(item) + assert result is not False + + @pytest.mark.download + def test_truthsocial_download_poll(self, make_item): + item = make_item("https://truthsocial.com/@CNN_US/posts/113724326568555098") + result = self.archiver.download(item) + assert result is not False + + @pytest.mark.download + def test_truthsocial_download_single_image(self, make_item): + item = make_item("https://truthsocial.com/@mariabartiromo/posts/113861116433335006") + result = self.archiver.download(item) + assert len(result.media) == 1 + assert result is not False + + @pytest.mark.skip("Currently failing, multiple images are not being downloaded - this is due to an issue with ytdlp extractor") + @pytest.mark.download + def test_truthsocial_download_multiple_images(self, make_item): + item = make_item("https://truthsocial.com/@trrth/posts/113861302149349135") + result = self.archiver.download(item) + assert len(result.media) == 3 @pytest.mark.download def test_twitter_download_nonexistend_tweet(self, make_item):