Further tidy-ups, also adds some ytdlp utils to 'utils'

pull/175/head
Patrick Robertson 2025-01-20 16:17:57 +01:00
rodzic befc92deb4
commit fd2e7f973b
6 zmienionych plików z 123 dodań i 58 usunięć

Wyświetl plik

@ -18,13 +18,13 @@ def create_metadata(post: dict, archiver: Archiver, url: str) -> Metadata:
if v: result.set(k, v)
# download if embeds present (1 video XOR >=1 images)
for media in _download_bsky_embeds(post):
for media in _download_bsky_embeds(post, archiver):
result.add_media(media)
logger.debug(f"Downloaded {len(result.media)} media files")
return result
def _download_bsky_embeds(post: dict) -> list[Media]:
def _download_bsky_embeds(post: dict, archiver: Archiver) -> list[Media]:
"""
Iterates over image(s) or video in a Bluesky post and downloads them
"""
@ -33,30 +33,17 @@ def _download_bsky_embeds(post: dict) -> list[Media]:
image_medias = embed.get("images", []) + embed.get("media", {}).get("images", [])
video_medias = [e for e in [embed.get("video"), embed.get("media", {}).get("video")] if e]
media_url = "https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={}&did={}"
for image_media in image_medias:
image_media = _download_bsky_file_as_media(image_media["image"]["ref"]["$link"], post["author"]["did"])
url = media_url.format(image_media['image']['ref']['$link'], post['author']['did'])
image_media = archiver.download_from_url(url)
media.append(image_media)
for video_media in video_medias:
video_media = _download_bsky_file_as_media(video_media["ref"]["$link"], post["author"]["did"])
url = media_url.format(video_media['ref']['$link'], post['author']['did'])
video_media = archiver.download_from_url(url)
media.append(video_media)
return media
def _download_bsky_file_as_media(cid: str, did: str) -> Media:
"""
Uses the Bluesky API to download a file by its `cid` and `did`.
"""
# TODO: replace with self.download_from_url once that function has been cleaned-up
file_url = f"https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={cid}&did={did}"
response = requests.get(file_url, stream=True)
response.raise_for_status()
ext = mimetypes.guess_extension(response.headers["Content-Type"])
filename = os.path.join(ArchivingContext.get_tmp_dir(), f"{cid}{ext}")
with open(filename, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
media = Media(filename=filename)
media.set("src", file_url)
return media
def _get_post_data(post: dict) -> dict:
"""

Wyświetl plik

@ -4,7 +4,7 @@ from yt_dlp.extractor.common import InfoExtractor
from loguru import logger
from . import bluesky, twitter
from . import bluesky, twitter, truth
from auto_archiver.archivers.archiver import Archiver
from ...core import Metadata, Media, ArchivingContext
@ -91,13 +91,6 @@ class GenericArchiver(Archiver):
# keep both 'title' and 'fulltitle', but prefer 'title', falling back to 'fulltitle' if it doesn't exist
result.set_title(video_data.pop('title', video_data.pop('fulltitle', "")))
# then add the platform specific additional metadata
for key, mapping in self.video_data_metadata_mapping(extractor_key, video_data).items():
if isinstance(mapping, str):
result.set(key, eval(f"video_data{mapping}"))
elif callable(mapping):
result.set(key, mapping(video_data))
result.set_url(url)
# extract comments if enabled
@ -126,13 +119,6 @@ class GenericArchiver(Archiver):
result.set(k, v)
return result
def video_data_metadata_mapping(self, extractor_key: str, video_data: dict) -> dict:
"""
Returns a key->value mapping to map from the yt-dlp produced 'video_data' to the Metadata object.
Can be either a string for direct mapping, or a function, or a lambda.
"""
return {}
def suitable_extractors(self, url: str) -> list[str]:
"""
@ -148,14 +134,20 @@ class GenericArchiver(Archiver):
"""
return any(self.suitable_extractors(url))
def create_metadata_for_post(self, info_extractor: InfoExtractor, video_data: dict, url: str) -> Metadata:
def create_metadata_for_post(self, info_extractor: InfoExtractor, post_data: dict, url: str) -> Metadata:
"""
Standardizes the output of the ytdlp InfoExtractor to a common format
Standardizes the output of the 'post' data from a ytdlp InfoExtractor to Metadata object.
This is only required for platforms that don't have videos, and therefore cannot be converted into ytdlp valid 'video_data'.
In these instances, we need to use the extractor's _extract_post (or similar) method to get the post metadata, and then convert
it into a Metadata object via a platform-specific function.
"""
if info_extractor.ie_key() == 'Bluesky':
return bluesky.create_metadata(video_data, self, url)
return bluesky.create_metadata(post_data, self, url)
if info_extractor.ie_key() == 'Twitter':
return twitter.create_metadata(video_data, self, url)
return twitter.create_metadata(post_data, self, url)
if info_extractor.ie_key() == 'Truth':
return truth.create_metadata(post_data, self, url)
def get_metatdata_for_post(self, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL) -> Metadata:
"""
@ -174,23 +166,22 @@ class GenericArchiver(Archiver):
twid = ie_instance._match_valid_url(url).group('id')
# TODO: if ytdlp PR https://github.com/yt-dlp/yt-dlp/pull/12098 is merged, change to _extract_post
post_data = ie_instance._extract_status(twid=twid)
elif info_extractor.ie_key() == 'TikTok':
pass
elif info_extractor.ie_key() == 'Truth':
video_id = ie_instance._match_id(url)
truthsocial_url = f'https://truthsocial.com/api/v1/statuses/{video_id}'
post_data = ie_instance._download_json(truthsocial_url, video_id)
else:
# lame attempt at trying to get data for an unknown extractor
# TODO: test some more video platforms and see if there's any improvement to be made
try:
post_data = ie_instance._extract_post(url)
except (NotImplementedError, AttributeError) as e:
logger.debug(f"Extractor {info_extractor.ie_key()} does not support extracting post info: {e}")
logger.debug(f"Extractor {info_extractor.ie_key()} does not support extracting post info from non-video URLs: {e}")
return False
return self.create_metadata_for_post(ie_instance, post_data, url)
def get_metatdata_for_video(self, info: dict, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL) -> Metadata:
# this time download
ydl.params['getcomments'] = self.comments
#TODO: for playlist or long lists of videos, how to download one at a time so they can be stored before the next one is downloaded?
@ -250,12 +241,16 @@ class GenericArchiver(Archiver):
# it's a valid video, that the youtubdedl can download out of the box
result = self.get_metatdata_for_video(info, info_extractor, url, ydl)
except yt_dlp.utils.DownloadError as e:
logger.debug(f'No video found, attempting to use extractor directly: {e}')
result = self.get_metatdata_for_post(info_extractor, url, ydl)
except Exception as e:
logger.debug(f'ytdlp exception which is normal for example a facebook page with images only will cause a IndexError: list index out of range. Exception is: \n {e}')
return False
logger.debug(f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use extractor to get post data instead')
try:
result = self.get_metatdata_for_post(info_extractor, url, ydl)
except (yt_dlp.utils.DownloadError, yt_dlp.utils.ExtractorError) as post_e:
logger.error(f'Error downloading metadata for post: {post_e}')
return False
except Exception as generic_e:
logger.debug(f'Attempt to extract using ytdlp extractor "{info_extractor.IE_NAME}" failed: \n {repr(generic_e)}', exc_info=True)
return False
if result:
extractor_name = "yt-dlp"

Wyświetl plik

@ -0,0 +1,31 @@
import datetime
from auto_archiver.utils import clean_html, traverse_obj
from auto_archiver.core.metadata import Metadata
from auto_archiver.archivers.archiver import Archiver
def create_metadata(post: dict, archiver: Archiver, url: str) -> Metadata:
"""
Creates metaata from a truth social post
Only used for posts that contains no media. ytdlp.TruthIE extractor can handle posts with media
Format is:
{'id': '109598702184774628', 'created_at': '2022-12-29T19:51:18.161Z', 'in_reply_to_id': None, 'quote_id': None, 'in_reply_to_account_id': None, 'sensitive': False, 'spoiler_text': '', 'visibility': 'public', 'language': 'en', 'uri': 'https://truthsocial.com/@bbcnewa/109598702184774628', 'url': 'https://truthsocial.com/@bbcnewa/109598702184774628', 'content': '<p>Pele, regarded by many as football\'s greatest ever player, has died in Brazil at the age of 82. <a href="https://www.bbc.com/sport/football/42751517" rel="nofollow noopener noreferrer" target="_blank"><span class="invisible">https://www.</span><span class="ellipsis">bbc.com/sport/football/4275151</span><span class="invisible">7</span></a></p>', 'account': {'id': '107905163010312793', 'username': 'bbcnewa', 'acct': 'bbcnewa', 'display_name': 'BBC News', 'locked': False, 'bot': False, 'discoverable': True, 'group': False, 'created_at': '2022-03-05T17:42:01.159Z', 'note': '<p>News, features and analysis by the BBC</p>', 'url': 'https://truthsocial.com/@bbcnewa', 'avatar': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/avatars/107/905/163/010/312/793/original/e7c07550dc22c23a.jpeg', 'avatar_static': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/avatars/107/905/163/010/312/793/original/e7c07550dc22c23a.jpeg', 'header': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/headers/107/905/163/010/312/793/original/a00eeec2b57206c7.jpeg', 'header_static': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/headers/107/905/163/010/312/793/original/a00eeec2b57206c7.jpeg', 'followers_count': 1131, 'following_count': 3, 'statuses_count': 9, 'last_status_at': '2024-11-12', 'verified': False, 'location': '', 'website': 'https://www.bbc.com/news', 'unauth_visibility': True, 'chats_onboarded': True, 'feeds_onboarded': True, 'accepting_messages': False, 'show_nonmember_group_statuses': None, 'emojis': [], 'fields': [], 'tv_onboarded': True, 'tv_account': False}, 'media_attachments': [], 'mentions': [], 'tags': [], 'card': None, 'group': None, 'quote': None, 'in_reply_to': None, 'reblog': None, 'sponsored': False, 'replies_count': 1, 'reblogs_count': 0, 'favourites_count': 2, 'favourited': False, 'reblogged': False, 'muted': False, 'pinned': False, 'bookmarked': False, 'poll': None, 'emojis': []}
"""
result = Metadata()
result.set_url(url)
timestamp = post['created_at'] # format is 2022-12-29T19:51:18.161Z
result.set_timestamp(datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ"))
result.set('description', post['content'])
result.set('author', post['account']['username'])
for key in ['replies_count', 'reblogs_count', 'favourites_count', ('account', 'followers_count'), ('account', 'following_count'), ('account', 'statuses_count'), ('account', 'display_name'), 'language', 'in_reply_to_account', 'replies_count']:
if isinstance(key, tuple):
store_key = u" ".join(key)
else:
store_key = key
result.set(store_key, traverse_obj(post, key))
return result

Wyświetl plik

@ -4,4 +4,7 @@ from .misc import *
from .webdriver import Webdriver
from .gsheet import Gsheets
from .url import UrlUtil
from .atlos import get_atlos_config_options
from .atlos import get_atlos_config_options
# handy utils from ytdlp
from yt_dlp.utils import (clean_html, traverse_obj, strip_or_none)

Wyświetl plik

@ -3,17 +3,17 @@ import pytest
from auto_archiver.core import Metadata
from auto_archiver.core import Step
from auto_archiver.core.metadata import Metadata
from auto_archiver.archivers.archiver import Archiver
class TestArchiverBase(object):
archiver_class = None
config = None
archiver_class: str = None
config: dict = None
@pytest.fixture(autouse=True)
def setup_archiver(self):
assert self.archiver_class is not None, "self.archiver_class must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"
self.archiver = self.archiver_class({self.archiver_class.name: self.config})
self.archiver: Archiver = self.archiver_class({self.archiver_class.name: self.config})
def assertValidResponseMetadata(self, test_response: Metadata, title: str, timestamp: str, status: str = ""):
assert test_response is not False

Wyświetl plik

@ -46,6 +46,23 @@ class TestGenericArchiver(TestArchiverBase):
result = self.archiver.download(item)
assert result.get_url() == "https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970"
@pytest.mark.download
@pytest.mark.parametrize("url", [
"https://bsky.app/profile/colborne.bsky.social/post/3lcxcpgt6j42l",
"twitter.com/bellingcat/status/123",
"https://www.youtube.com/watch?v=1"
])
def test_download_nonexistend_media(self, make_item, url):
"""
Test to make sure that the extractor doesn't break on non-existend posts/media
It should return 'False'
"""
item = make_item(url)
result = self.archiver.download(item)
assert not result
@pytest.mark.download
def test_youtube_download(self, make_item):
# url https://www.youtube.com/watch?v=5qap5aO4i9A
@ -60,14 +77,13 @@ class TestGenericArchiver(TestArchiverBase):
@pytest.mark.download
def test_bluesky_download_multiple_images(self, make_item):
item = make_item("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y")
item = make_item("https://bsky.app/profile/bellingcat.com/post/3lffjoxcu7k2w")
result = self.archiver.download(item)
assert result is not False
@pytest.mark.skip("ytdlp supports bluesky, but there's currently no way to extract info from pages without videos")
@pytest.mark.download
def test_bluesky_download_single_image(self, make_item):
item = make_item("https://bsky.app/profile/colborne.bsky.social/post/3lcxcpgt6j42l")
item = make_item("https://bsky.app/profile/bellingcat.com/post/3lfn3hbcxgc2q")
result = self.archiver.download(item)
assert result is not False
@ -82,6 +98,39 @@ class TestGenericArchiver(TestArchiverBase):
item = make_item("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i")
result = self.archiver.download(item)
assert result is not False
@pytest.mark.download
def test_truthsocial_download_video(self, make_item):
item = make_item("https://truthsocial.com/@DaynaTrueman/posts/110602446619561579")
result = self.archiver.download(item)
assert len(result.media) == 1
assert result is not False
@pytest.mark.download
def test_truthsocial_download_no_media(self, make_item):
item = make_item("https://truthsocial.com/@bbcnewa/posts/109598702184774628")
result = self.archiver.download(item)
assert result is not False
@pytest.mark.download
def test_truthsocial_download_poll(self, make_item):
item = make_item("https://truthsocial.com/@CNN_US/posts/113724326568555098")
result = self.archiver.download(item)
assert result is not False
@pytest.mark.download
def test_truthsocial_download_single_image(self, make_item):
item = make_item("https://truthsocial.com/@mariabartiromo/posts/113861116433335006")
result = self.archiver.download(item)
assert len(result.media) == 1
assert result is not False
@pytest.mark.skip("Currently failing, multiple images are not being downloaded - this is due to an issue with ytdlp extractor")
@pytest.mark.download
def test_truthsocial_download_multiple_images(self, make_item):
item = make_item("https://truthsocial.com/@trrth/posts/113861302149349135")
result = self.archiver.download(item)
assert len(result.media) == 3
@pytest.mark.download
def test_twitter_download_nonexistend_tweet(self, make_item):